Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save marcellustavares/ce3a5b23e1bbb6ab0702a6c565cd8550 to your computer and use it in GitHub Desktop.

Select an option

Save marcellustavares/ce3a5b23e1bbb6ab0702a6c565cd8550 to your computer and use it in GitHub Desktop.
Firestore Individual Replication
/**
* SPDX-FileCopyrightText: (c) 2024 Liferay, Inc. https://liferay.com
* SPDX-License-Identifier: LGPL-2.1-or-later OR LicenseRef-Liferay-DXP-EULA-2.0.0-2023-06
*/
package com.liferay.osb.asah.dataflow.replica;
import com.google.api.services.bigquery.model.TableRow;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sets;
import org.apache.beam.sdk.transforms.SimpleFunction;
/**
* @author Marcellus Tavares
*/
public class BatchIndividualReplicationPipeline {
public static void main(String[] args) {
run(
PipelineOptionsFactory.fromArgs(
args
).withValidation(
).as(
BatchIndividualReplicationPipelineOptions.class
));
}
private static String createDocumentName(String collectionId, String cityDocId) {
String documentPath =
String.format(
"projects/%s/databases/%s/documents",
"liferaycloud-staging-s2", "(default)");
return documentPath + "/" + collectionId + "/" + cityDocId;
}
public static void run(
BatchIndividualReplicationPipelineOptions
batchIndividualReplicationPipelineOptions) {
Pipeline pipeline = Pipeline.create(
batchIndividualReplicationPipelineOptions);
RpcQosOptions rpcQosOptions =
RpcQosOptions.newBuilder()
.withHintMaxNumWorkers(batchIndividualReplicationPipelineOptions.getMaxNumWorkers())
.build();
pipeline.apply(
BigQueryIO
.readTableRows()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
.from("stg39b8692f04d643f9ab2023968b06fdd4.individual"
).withSelectedFields(Arrays.asList("id", "fields"))
).apply(
Sets.intersectDistinct(null)
)
.apply("write to firestore",
MapElements.via(new SimpleFunction<TableRow, Write>() {
@Override
public Write apply(TableRow tableRow) {
String individualId = (String)tableRow.get("id");
Document.Builder builder = Document.newBuilder();
builder.setName(createDocumentName("individual", individualId));
builder.putFields("suppressed", Value.newBuilder().setBooleanValue(false).build());
MapValue.Builder mapValueBuilder = MapValue.newBuilder();
List<TableRow> fields = (List<TableRow>)tableRow.get("fields");
for (TableRow field : fields) {
String fieldName = (String)field.get("name");
String fieldValue = (String)field.get("value");
mapValueBuilder.putFields(fieldName, Value.newBuilder().setStringValue(fieldValue).build());
}
builder.putFields("demographics", Value.newBuilder().setMapValue(mapValueBuilder).build());
return Write.newBuilder().setUpdate(builder).build();
}
})
).apply(
FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()
);
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment