Created
January 16, 2025 00:44
-
-
Save marcellustavares/fe6e1afad4012d316b1c90fc82a0ecb9 to your computer and use it in GitHub Desktop.
BatchIndividualEventsReplicationPipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * SPDX-FileCopyrightText: (c) 2024 Liferay, Inc. https://liferay.com | |
| * SPDX-License-Identifier: LGPL-2.1-or-later OR LicenseRef-Liferay-DXP-EULA-2.0.0-2023-06 | |
| */ | |
| package com.liferay.osb.asah.dataflow.replica; | |
| import com.google.api.services.bigquery.model.TableRow; | |
| import com.google.firestore.v1.Document; | |
| import com.google.firestore.v1.MapValue; | |
| import com.google.firestore.v1.Value; | |
| import com.google.firestore.v1.Write; | |
| import java.util.Arrays; | |
| import java.util.List; | |
| import org.apache.beam.sdk.Pipeline; | |
| import org.apache.beam.sdk.PipelineResult; | |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
| import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; | |
| import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions; | |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
| import org.apache.beam.sdk.transforms.MapElements; | |
| import org.apache.beam.sdk.transforms.SimpleFunction; | |
| /** | |
| * @author Marcellus Tavares | |
| */ | |
| public class BatchIndividualEventReplicationPipeline { | |
| public static void main(String[] args) { | |
| run( | |
| PipelineOptionsFactory.fromArgs( | |
| args | |
| ).withValidation( | |
| ).as( | |
| BatchIndividualReplicationPipelineOptions.class | |
| )); | |
| } | |
| private static String createDocumentName(String individualId, String eventId) { | |
| String documentPath = | |
| String.format( | |
| "projects/%s/databases/%s/documents", | |
| "liferaycloud-staging-s2", "(default)"); | |
| return documentPath + "/individual/" + individualId + "/events/" + eventId; | |
| } | |
| public static void run( | |
| BatchIndividualReplicationPipelineOptions | |
| batchIndividualReplicationPipelineOptions) { | |
| Pipeline pipeline = Pipeline.create( | |
| batchIndividualReplicationPipelineOptions); | |
| RpcQosOptions rpcQosOptions = | |
| RpcQosOptions.newBuilder() | |
| .withHintMaxNumWorkers(batchIndividualReplicationPipelineOptions.getMaxNumWorkers()) | |
| .build(); | |
| pipeline.apply( | |
| BigQueryIO | |
| .readTableRows() | |
| .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) | |
| .fromQuery("SELECT e.id, eventId, i.individualId FROM `stg39b8692f04d643f9ab2023968b06fdd4.event` e JOIN `stg39b8692f04d643f9ab2023968b06fdd4.identity` i ON (e.userId = i.id) WHERE i.individualId is not null" | |
| ).usingStandardSql().withoutValidation() | |
| ) | |
| .apply("write to firestore", | |
| MapElements.via(new SimpleFunction<TableRow, Write>() { | |
| @Override | |
| public Write apply(TableRow tableRow) { | |
| String id = (String)tableRow.get("id"); | |
| String eventId = (String)tableRow.get("eventId"); | |
| String individualId = (String)tableRow.get("individualId"); | |
| Document.Builder builder = Document.newBuilder(); | |
| builder.setName(createDocumentName(individualId, id)); | |
| builder.putFields("eventId", Value.newBuilder().setStringValue(eventId).build()); | |
| return Write.newBuilder().setUpdate(builder).build(); | |
| } | |
| }) | |
| ).apply( | |
| FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build() | |
| ); | |
| PipelineResult pipelineResult = pipeline.run(); | |
| pipelineResult.waitUntilFinish(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment