Created
January 15, 2025 23:43
-
-
Save marcellustavares/ce3a5b23e1bbb6ab0702a6c565cd8550 to your computer and use it in GitHub Desktop.
Firestore Individual Replication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * SPDX-FileCopyrightText: (c) 2024 Liferay, Inc. https://liferay.com | |
| * SPDX-License-Identifier: LGPL-2.1-or-later OR LicenseRef-Liferay-DXP-EULA-2.0.0-2023-06 | |
| */ | |
| package com.liferay.osb.asah.dataflow.replica; | |
| import com.google.api.services.bigquery.model.TableRow; | |
| import com.google.firestore.v1.Document; | |
| import com.google.firestore.v1.MapValue; | |
| import com.google.firestore.v1.Value; | |
| import com.google.firestore.v1.Write; | |
| import java.util.Arrays; | |
| import java.util.List; | |
| import org.apache.beam.sdk.Pipeline; | |
| import org.apache.beam.sdk.PipelineResult; | |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
| import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; | |
| import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions; | |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
| import org.apache.beam.sdk.transforms.MapElements; | |
| import org.apache.beam.sdk.transforms.Sets; | |
| import org.apache.beam.sdk.transforms.SimpleFunction; | |
| /** | |
| * @author Marcellus Tavares | |
| */ | |
| public class BatchIndividualReplicationPipeline { | |
| public static void main(String[] args) { | |
| run( | |
| PipelineOptionsFactory.fromArgs( | |
| args | |
| ).withValidation( | |
| ).as( | |
| BatchIndividualReplicationPipelineOptions.class | |
| )); | |
| } | |
| private static String createDocumentName(String collectionId, String cityDocId) { | |
| String documentPath = | |
| String.format( | |
| "projects/%s/databases/%s/documents", | |
| "liferaycloud-staging-s2", "(default)"); | |
| return documentPath + "/" + collectionId + "/" + cityDocId; | |
| } | |
| public static void run( | |
| BatchIndividualReplicationPipelineOptions | |
| batchIndividualReplicationPipelineOptions) { | |
| Pipeline pipeline = Pipeline.create( | |
| batchIndividualReplicationPipelineOptions); | |
| RpcQosOptions rpcQosOptions = | |
| RpcQosOptions.newBuilder() | |
| .withHintMaxNumWorkers(batchIndividualReplicationPipelineOptions.getMaxNumWorkers()) | |
| .build(); | |
| pipeline.apply( | |
| BigQueryIO | |
| .readTableRows() | |
| .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) | |
| .from("stg39b8692f04d643f9ab2023968b06fdd4.individual" | |
| ).withSelectedFields(Arrays.asList("id", "fields")) | |
| ).apply( | |
| Sets.intersectDistinct(null) | |
| ) | |
| .apply("write to firestore", | |
| MapElements.via(new SimpleFunction<TableRow, Write>() { | |
| @Override | |
| public Write apply(TableRow tableRow) { | |
| String individualId = (String)tableRow.get("id"); | |
| Document.Builder builder = Document.newBuilder(); | |
| builder.setName(createDocumentName("individual", individualId)); | |
| builder.putFields("suppressed", Value.newBuilder().setBooleanValue(false).build()); | |
| MapValue.Builder mapValueBuilder = MapValue.newBuilder(); | |
| List<TableRow> fields = (List<TableRow>)tableRow.get("fields"); | |
| for (TableRow field : fields) { | |
| String fieldName = (String)field.get("name"); | |
| String fieldValue = (String)field.get("value"); | |
| mapValueBuilder.putFields(fieldName, Value.newBuilder().setStringValue(fieldValue).build()); | |
| } | |
| builder.putFields("demographics", Value.newBuilder().setMapValue(mapValueBuilder).build()); | |
| return Write.newBuilder().setUpdate(builder).build(); | |
| } | |
| }) | |
| ).apply( | |
| FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build() | |
| ); | |
| PipelineResult pipelineResult = pipeline.run(); | |
| pipelineResult.waitUntilFinish(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment