Skip to content

Instantly share code, notes, and snippets.

@livsi
Forked from dalelane/proposal.md
Created September 14, 2022 13:36
Show Gist options
  • Select an option

  • Save livsi/3dece646b29789072c62e1f05ff31083 to your computer and use it in GitHub Desktop.

Select an option

Save livsi/3dece646b29789072c62e1f05ff31083 to your computer and use it in GitHub Desktop.
Describing Kafka schema usage using AsyncAPI

Describing Kafka schema usage using AsyncAPI

I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.

Updated following feedback in Slack and Github - see revisions to see what has changed

also:


Binary-encoded Avro

Scenario

Messages on a Kafka topic are being produced using a GenericDatumWriter with binary encoding.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            key:
              type: string
        payload:
          $ref: "my-avro-schema.avsc"

or (optionally, explicitly specifying that a schema registry was not used)

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "none"
            key:
              type: string
        payload:
          $ref: "my-avro-schema.avsc"

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

JSON-encoded Avro

Scenario

Messages on a Kafka topic were produced using a GenericDatumWriter with JSON encoding.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, explicitly specifying that a schema registry was not used)

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "none"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Confluent using schema registry

Scenario

Messages on a Kafka topic were produced using a Confluent serdes library.

The Confluent schema registry is available for use by consuming applications.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "confluent"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Confluent

Scenario

Messages on a Kafka topic were produced using a Confluent serdes library.

The Confluent schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "confluent"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
          key:
            type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

    // skip the first five bytes that contain the "magic" byte and 4 byte global ID
    bias.skip(5);

    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using binary encoding

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using binary encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

The Apicurio schema registry is available for use by consuming applications.

The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
        headers:
          type: object
          properties:
            apicurio.value.encoding:
              type: string
              enum:
              - "BINARY"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Apicurio using JSON encoding

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using JSON encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

The Apicurio schema registry is available for use by consuming applications.

The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
        headers:
          type: object
          properties:
            apicurio.value.encoding:
              type: string
              enum:
              - "JSON"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Event Streams using binary encoding

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Event Streams using binary encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

The Event Streams schema registry is available for use by consuming applications.

The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
        headers:
          type: object
          properties:
            com.ibm.eventstreams.schemaregistry.encoding:
              type: string
              enum:
              - "BINARY"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Event Streams using JSON encoding

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Event Streams using JSON encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

The Event Streams schema registry is available for use by consuming applications.

The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
        headers:
          type: object
          properties:
            com.ibm.eventstreams.schemaregistry.encoding:
              type: string
              enum:
              - "JSON"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Message traits

Some common definitions of message headers based on well-known serdes libraries

Apicurio

e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml

name: apicurio
summary: Message headers used by Apicurio serdes library
headers:
  type: object
  properties:
    apicurio.value.globalId:
      type: string
    apicurio.value.version:
      type: integer
    apicurio.value.encoding:
      type: string
      enum:
      - "BINARY"
      - "JSON"

IBM Event Streams

e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml

name: ibm-eventstreams
summary: Message headers used by IBM Event Streams serdes library
headers:
  type: object
  properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
      type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
      type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
      type: string
      enum:
      - "BINARY"
      - "JSON"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment