Last active
January 16, 2020 13:30
-
-
Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.
Revisions
-
speeddragon revised this gist
Jan 16, 2020 . 1 changed file with 45 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -25,11 +25,56 @@ env ) .timeWindow(Time.seconds(settings.windowTime())) .allowedLateness(Time.seconds(settings.allowedLateness())) .trigger(new DelayEventTimeTrigger()) .apply(new GenericRecordAggregatorWindowFunction()) .addSink(writer) env.execute() // DelayEventTimeTrigger /** * Copy from EventTimeTrigger */ class DelayEventTimeTrigger extends Trigger[Object, TimeWindow] { val valueStateDescriptor = new ValueStateDescriptor[Boolean]("flag", classOf[Boolean]) override def onElement( element: Object, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext ): TriggerResult = { val flag = ctx.getPartitionedState(valueStateDescriptor).value() // Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed. if (!flag) { val delay = window.getEnd - window.getStart ctx.getPartitionedState(valueStateDescriptor).update(true) ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay) ctx.registerEventTimeTimer(window.maxTimestamp()) } TriggerResult.CONTINUE } override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { TriggerResult.FIRE } override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { if (time == window.maxTimestamp()) { TriggerResult.FIRE } else { TriggerResult.CONTINUE } } override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { ctx.deleteEventTimeTimer(window.maxTimestamp) } } // GenericRecordAggregatorWindowFunction class GenericRecordAggregatorWindowFunction -
speeddragon created this gist
Jan 10, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,96 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ... val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[GenericRecord] { def extractAscendingTimestamp(element: GenericRecord): Long = Instant .parse(element.get("timestamp").asInstanceOf[String]) .toEpochMilli() }) val writer = WindowParquetGenericRecordListFileSink(settings.s3Path(), GenericRecordSchema.schema.toString()) ... val backend = new RocksDBStateBackend("file:///tmp/rocksdb-checkpoint", true); env .setStateBackend(backend) .enableCheckpointing(settings.checkpointInterval()) .addSource(source) .keyBy((record: GenericRecord) => record.get("key").asInstanceOf[String] ) .timeWindow(Time.seconds(settings.windowTime())) .allowedLateness(Time.seconds(settings.allowedLateness())) .apply(new GenericRecordAggregatorWindowFunction()) .addSink(writer) env.execute() // GenericRecordAggregatorWindowFunction class GenericRecordAggregatorWindowFunction extends WindowFunction[GenericRecord, Iterable[GenericRecord], String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: lang.Iterable[GenericRecord], out: Collector[Iterable[GenericRecord]]): Unit = { out.collect(input.asScala) } } // WindowParquetGenericRecordListFileSink case class WindowParquetGenericRecordListFileSink(filePath: String, schema: String) extends ParquetGenericRecordListFileSink[GenericRecord] { def getBucketId(element: GenericRecord): String = "account_id=" + element.get(KEY.name).asInstanceOf[String] + "/partition_date=" + formatDateString(element.get(TIMESTAMP.name).asInstanceOf[String]) private def formatDateString(date: String) = new SimpleDateFormat("yyyyMM").format( new SimpleDateFormat("yyyy-MM-dd").parse(date)) def getFileName(genericRecord: GenericRecord): String = { genericRecord.get("logger_timestamp").asInstanceOf[String] .replace(" ", "_") .replace("/", "-") .replace(":", "-") .concat( ".parquet") } } // ParquetGenericRecordListFileSink trait ParquetGenericRecordListFileSink[IN] extends SinkFunction[Iterable[IN]] with LazyLogging { def filePath: String def schema: String override def invoke(elements: Iterable[IN], context: Context[_]) { val fileName = getFileName(elements.head) val finalFilePath = s"${filePath}/${getBucketId(elements.head)}/${fileName}"; val writer = AvroParquetWriter .builder[IN](new Path(finalFilePath)) .withSchema(new Schema.Parser().parse(schema)) .withDataModel(GenericData.get) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() elements.foreach( (element) => writer.write(element) ) writer.close() logger.info(s"Writing to ${finalFilePath}") } def getBucketId(element: IN): String def getFileName(genericRecord: IN): String }