Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Last active January 16, 2020 13:30
Show Gist options
  • Select an option

  • Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.

Select an option

Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.

Revisions

  1. speeddragon revised this gist Jan 16, 2020. 1 changed file with 45 additions and 0 deletions.
    45 changes: 45 additions & 0 deletions custom_file_sink.scala
    Original file line number Diff line number Diff line change
    @@ -25,11 +25,56 @@ env
    )
    .timeWindow(Time.seconds(settings.windowTime()))
    .allowedLateness(Time.seconds(settings.allowedLateness()))
    .trigger(new DelayEventTimeTrigger())
    .apply(new GenericRecordAggregatorWindowFunction())
    .addSink(writer)

    env.execute()

    // DelayEventTimeTrigger

    /**
    * Copy from EventTimeTrigger
    */
    class DelayEventTimeTrigger extends Trigger[Object, TimeWindow] {
    val valueStateDescriptor = new ValueStateDescriptor[Boolean]("flag", classOf[Boolean])

    override def onElement(
    element: Object,
    timestamp: Long,
    window: TimeWindow,
    ctx: Trigger.TriggerContext
    ): TriggerResult = {
    val flag = ctx.getPartitionedState(valueStateDescriptor).value()

    // Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed.
    if (!flag) {
    val delay = window.getEnd - window.getStart
    ctx.getPartitionedState(valueStateDescriptor).update(true)
    ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
    ctx.registerEventTimeTimer(window.maxTimestamp())
    }

    TriggerResult.CONTINUE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.FIRE
    }

    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    if (time == window.maxTimestamp()) {
    TriggerResult.FIRE
    } else {
    TriggerResult.CONTINUE
    }
    }

    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
    ctx.deleteEventTimeTimer(window.maxTimestamp)
    }
    }

    // GenericRecordAggregatorWindowFunction

    class GenericRecordAggregatorWindowFunction
  2. speeddragon created this gist Jan 10, 2020.
    96 changes: 96 additions & 0 deletions custom_file_sink.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,96 @@
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    ...

    val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[GenericRecord] {
    def extractAscendingTimestamp(element: GenericRecord): Long =
    Instant
    .parse(element.get("timestamp").asInstanceOf[String])
    .toEpochMilli()
    })
    val writer = WindowParquetGenericRecordListFileSink(settings.s3Path(), GenericRecordSchema.schema.toString())

    ...

    val backend = new RocksDBStateBackend("file:///tmp/rocksdb-checkpoint", true);

    env
    .setStateBackend(backend)
    .enableCheckpointing(settings.checkpointInterval())
    .addSource(source)
    .keyBy((record: GenericRecord) =>
    record.get("key").asInstanceOf[String]
    )
    .timeWindow(Time.seconds(settings.windowTime()))
    .allowedLateness(Time.seconds(settings.allowedLateness()))
    .apply(new GenericRecordAggregatorWindowFunction())
    .addSink(writer)

    env.execute()

    // GenericRecordAggregatorWindowFunction

    class GenericRecordAggregatorWindowFunction
    extends WindowFunction[GenericRecord, Iterable[GenericRecord], String, TimeWindow] {

    override def apply(key: String, window: TimeWindow, input: lang.Iterable[GenericRecord],
    out: Collector[Iterable[GenericRecord]]): Unit = {
    out.collect(input.asScala)
    }
    }

    // WindowParquetGenericRecordListFileSink

    case class WindowParquetGenericRecordListFileSink(filePath: String, schema: String)
    extends ParquetGenericRecordListFileSink[GenericRecord] {

    def getBucketId(element: GenericRecord): String =
    "account_id=" + element.get(KEY.name).asInstanceOf[String] +
    "/partition_date=" + formatDateString(element.get(TIMESTAMP.name).asInstanceOf[String])

    private def formatDateString(date: String) =
    new SimpleDateFormat("yyyyMM").format(
    new SimpleDateFormat("yyyy-MM-dd").parse(date))

    def getFileName(genericRecord: GenericRecord): String = {
    genericRecord.get("logger_timestamp").asInstanceOf[String]
    .replace(" ", "_")
    .replace("/", "-")
    .replace(":", "-")
    .concat( ".parquet")
    }
    }

    // ParquetGenericRecordListFileSink

    trait ParquetGenericRecordListFileSink[IN] extends SinkFunction[Iterable[IN]] with LazyLogging {

    def filePath: String
    def schema: String

    override def invoke(elements: Iterable[IN], context: Context[_]) {
    val fileName = getFileName(elements.head)
    val finalFilePath = s"${filePath}/${getBucketId(elements.head)}/${fileName}";

    val writer = AvroParquetWriter
    .builder[IN](new Path(finalFilePath))
    .withSchema(new Schema.Parser().parse(schema))
    .withDataModel(GenericData.get)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .build()

    elements.foreach(
    (element) => writer.write(element)
    )

    writer.close()

    logger.info(s"Writing to ${finalFilePath}")
    }

    def getBucketId(element: IN): String

    def getFileName(genericRecord: IN): String
    }