package net.gutefrage.connector.transforms import java.util import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.ConnectRecord import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct} import org.apache.kafka.connect.transforms.Transformation import org.apache.kafka.connect.transforms.util.Requirements.{requireMap, requireStruct} import org.apache.kafka.connect.transforms.util.SimpleConfig import scala.collection.JavaConverters._ object ExtractFields { private val FIELDS_CONFIG = "fields" private val STRUCT_NAME_CONFIG = "structName" private val PURPOSE = "fields extraction" val OVERVIEW_DOC: String = "Extract the specified fields from a Struct when schema present, or a Map in the case of schemaless data." + "

Use the concrete transformation type designed for the record key (" + classOf[ExtractFields.Key[_ <: Nothing]].getName + ") " + "or value (" + classOf[ExtractFields.Value[_ <: Nothing]].getName + ")." val CONFIG_DEF: ConfigDef = new ConfigDef() .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Field names to extract.") .define(STRUCT_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Name for the extracted struct") class Key[R <: ConnectRecord[R]] extends ExtractFields[R] { override protected def operatingSchema(record: R): Schema = record.keySchema override protected def operatingValue(record: R): AnyRef = record.key override protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R = record.newRecord(record.topic, record.kafkaPartition, updatedSchema, updatedValue, record.valueSchema, record.value, record.timestamp) } class Value[R <: ConnectRecord[R]] extends ExtractFields[R] { override protected def operatingSchema(record: R): Schema = record.valueSchema override protected def operatingValue(record: R): AnyRef = record.value override protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R = record.newRecord(record.topic, record.kafkaPartition, record.keySchema, record.key, updatedSchema, updatedValue, record.timestamp) } } abstract class ExtractFields[R <: ConnectRecord[R]] extends Transformation[R] { @transient private var fieldNames: List[String] = null @transient private var structName: String = null def configure(props: util.Map[String, _]): Unit = { val config = new SimpleConfig(ExtractFields.CONFIG_DEF, props) fieldNames = config.getList(ExtractFields.FIELDS_CONFIG).asScala.toList structName = config.getString(ExtractFields.STRUCT_NAME_CONFIG) } def schemaFromNestedField(remainingFieldsInTree: List[String], value: Struct): Schema = { remainingFieldsInTree match { case fieldName :: Nil => value.schema().field(fieldName).schema() case fieldName :: rest => schemaFromNestedField(rest, value.getStruct(fieldName)) case Nil => value.schema() } } def valueFromNestedField(remainingFieldsInTree: List[String], value: Struct): Any = { remainingFieldsInTree match { case fieldName :: Nil => if(value == null) throw new Exception(s"Unable to fetch field $fieldName from $value") else value.get(fieldName) case fieldName :: rest => valueFromNestedField(rest, value.getStruct(fieldName)) case Nil => value } } def apply(record: R): R = { val schema = operatingSchema(record) if (schema == null) { val value = requireMap(operatingValue(record), ExtractFields.PURPOSE) newRecord(record, null, value.get(fieldNames)) } else { val hierarchyFieldNames = fieldNames.map { fieldName => fieldName.split('.').toList } hierarchyFieldNames.foreach(l => println(l)) val value = requireStruct(operatingValue(record), ExtractFields.PURPOSE) val newSchema = hierarchyFieldNames.foldLeft(SchemaBuilder.struct().name(structName)) { case (builder, fieldNameHierarchy) => val fieldSchema: Schema = schemaFromNestedField(fieldNameHierarchy, value) builder.field(fieldNameHierarchy.last, fieldSchema) }.build() val newStruct = hierarchyFieldNames.foldLeft(new Struct(newSchema)) { case (struct, fieldNameHierarchy) => struct.put(fieldNameHierarchy.last, valueFromNestedField(fieldNameHierarchy, value)) } newRecord(record, newSchema, newStruct) } } def close(): Unit = {} def config: ConfigDef = ExtractFields.CONFIG_DEF protected def operatingSchema(record: R): Schema protected def operatingValue(record: R): AnyRef protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R }