Skip to content

Instantly share code, notes, and snippets.

@leozilla
Created October 7, 2021 16:37
Show Gist options
  • Select an option

  • Save leozilla/2fb36fa3ec98face4fff5d1161c50a3a to your computer and use it in GitHub Desktop.

Select an option

Save leozilla/2fb36fa3ec98face4fff5d1161c50a3a to your computer and use it in GitHub Desktop.
Load and Export Avro from BigQuery
1. writeObjectContainerFile
2. loadAvroFromLocal
3. exportToGcs
def writeObjectContainerFile[T <: GenericRecord](data: Iterable[T], schema: Schema): File = {
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
val file = File.createTempFile("avro-container", "")
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val fileWriter = new DataFileWriter[GenericRecord](datumWriter)
fileWriter.create(schema, file)
data.foreach(fileWriter.append)
fileWriter.close()
file
}
def loadAvroFromLocal(client: BigQuery, file: File, tableId: TableId)(implicit bec: BlockingExecutionContext): Future[Job] = {
import scala.collection.JavaConverters._
val writeChannelConfiguration = WriteChannelConfiguration
.newBuilder(tableId)
.setFormatOptions(FormatOptions.avro())
.setUseAvroLogicalTypes(true)
.setSchemaUpdateOptions(Seq(SchemaUpdateOption.ALLOW_FIELD_ADDITION, SchemaUpdateOption.ALLOW_FIELD_RELAXATION).asJava)
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.build
val writer = client.writer(writeChannelConfiguration)
val stream = Channels.newOutputStream(writer)
Files.copy(file.toPath, stream)
stream.close()
awaitJobResult(writer.getJob)
}
def exportToGcs(client: BigQuery, sourceTableId: TableId, destinationUri: String, format: String = FormatOptions.avro.getType, useLogicalTypes: Boolean = true)
(implicit bec: BlockingExecutionContext): Future[Job] = {
val jobConfig = ExtractJobConfiguration.newBuilder(sourceTableId, destinationUri)
.setUseAvroLogicalTypes(useLogicalTypes)
.setFormat(format)
.build()
awaitJobResult(client.create(JobInfo.of(jobConfig)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment