/** * Handle records with a timestamp in their Avro value. * Expects a LONG field named "timestamp". * Any problem makes this extractor return the record's internal timestamp. */ public class InValueTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord record) { if (record != null && record.value() != null) { // Is it an Avro record ? if (record.value() instanceof GenericRecord) { GenericRecord value = (GenericRecord) record.value(); // Does it have a timestamp field, and is the field a LONG ? Schema.Field field = value.getSchema().getField("timestamp"); if (field != null && field.schema().getType().equals(Schema.Type.LONG)) { // Get the timestamp from the record value return (long) value.get(field.pos()); } } } return record.timestamp(); } }