Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save marcellustavares/fe6e1afad4012d316b1c90fc82a0ecb9 to your computer and use it in GitHub Desktop.

Select an option

Save marcellustavares/fe6e1afad4012d316b1c90fc82a0ecb9 to your computer and use it in GitHub Desktop.
BatchIndividualEventsReplicationPipeline
/**
* SPDX-FileCopyrightText: (c) 2024 Liferay, Inc. https://liferay.com
* SPDX-License-Identifier: LGPL-2.1-or-later OR LicenseRef-Liferay-DXP-EULA-2.0.0-2023-06
*/
package com.liferay.osb.asah.dataflow.replica;
import com.google.api.services.bigquery.model.TableRow;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
/**
* @author Marcellus Tavares
*/
public class BatchIndividualEventReplicationPipeline {
public static void main(String[] args) {
run(
PipelineOptionsFactory.fromArgs(
args
).withValidation(
).as(
BatchIndividualReplicationPipelineOptions.class
));
}
private static String createDocumentName(String individualId, String eventId) {
String documentPath =
String.format(
"projects/%s/databases/%s/documents",
"liferaycloud-staging-s2", "(default)");
return documentPath + "/individual/" + individualId + "/events/" + eventId;
}
public static void run(
BatchIndividualReplicationPipelineOptions
batchIndividualReplicationPipelineOptions) {
Pipeline pipeline = Pipeline.create(
batchIndividualReplicationPipelineOptions);
RpcQosOptions rpcQosOptions =
RpcQosOptions.newBuilder()
.withHintMaxNumWorkers(batchIndividualReplicationPipelineOptions.getMaxNumWorkers())
.build();
pipeline.apply(
BigQueryIO
.readTableRows()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
.fromQuery("SELECT e.id, eventId, i.individualId FROM `stg39b8692f04d643f9ab2023968b06fdd4.event` e JOIN `stg39b8692f04d643f9ab2023968b06fdd4.identity` i ON (e.userId = i.id) WHERE i.individualId is not null"
).usingStandardSql().withoutValidation()
)
.apply("write to firestore",
MapElements.via(new SimpleFunction<TableRow, Write>() {
@Override
public Write apply(TableRow tableRow) {
String id = (String)tableRow.get("id");
String eventId = (String)tableRow.get("eventId");
String individualId = (String)tableRow.get("individualId");
Document.Builder builder = Document.newBuilder();
builder.setName(createDocumentName(individualId, id));
builder.putFields("eventId", Value.newBuilder().setStringValue(eventId).build());
return Write.newBuilder().setUpdate(builder).build();
}
})
).apply(
FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()
);
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment