Skip to content

Instantly share code, notes, and snippets.

@dalelane
Last active March 20, 2025 14:27
Show Gist options
  • Select an option

  • Save dalelane/3931c17b14c51fa4a1cf25496237d188 to your computer and use it in GitHub Desktop.

Select an option

Save dalelane/3931c17b14c51fa4a1cf25496237d188 to your computer and use it in GitHub Desktop.

Revisions

  1. dalelane revised this gist Feb 25, 2021. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -448,7 +448,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -604,7 +604,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -760,7 +760,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -916,7 +916,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -957,7 +957,7 @@ Some common definitions of message headers based on well-known serdes libraries

    ### Apicurio

    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml
    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml

    ```yaml
    name: apicurio
    @@ -978,7 +978,7 @@ headers:
    ### IBM Event Streams
    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
    ```yaml
    name: ibm-eventstreams
  2. dalelane revised this gist Feb 23, 2021. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -448,7 +448,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -604,7 +604,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -760,7 +760,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -916,7 +916,7 @@ channels:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    headers:
    type: object
    properties:
    @@ -957,7 +957,7 @@ Some common definitions of message headers based on well-known serdes libraries

    ### Apicurio

    e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml

    ```yaml
    name: apicurio
    @@ -978,7 +978,7 @@ headers:
    ### IBM Event Streams
    e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml
    ```yaml
    name: ibm-eventstreams
  3. dalelane revised this gist Feb 23, 2021. 1 changed file with 10 additions and 10 deletions.
    20 changes: 10 additions & 10 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -197,7 +197,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "confluent"
    useSchemaRegistry: true
    schemaRegistryAvailable: true
    ...
    channels:
    my.topic.name:
    @@ -276,7 +276,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "confluent"
    useSchemaRegistry: false
    schemaRegistryAvailable: false
    ...
    channels:
    my.topic.name:
    @@ -369,7 +369,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: false
    schemaRegistryAvailable: false
    ...
    channels:
    my.topic.name:
    @@ -437,7 +437,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: true
    schemaRegistryAvailable: true
    ...
    channels:
    my.topic.name:
    @@ -525,7 +525,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: false
    schemaRegistryAvailable: false
    ...
    channels:
    my.topic.name:
    @@ -593,7 +593,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: true
    schemaRegistryAvailable: true
    ...
    channels:
    my.topic.name:
    @@ -681,7 +681,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: false
    schemaRegistryAvailable: false
    ...
    channels:
    my.topic.name:
    @@ -749,7 +749,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: true
    schemaRegistryAvailable: true
    ...
    channels:
    my.topic.name:
    @@ -837,7 +837,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: false
    schemaRegistryAvailable: false
    ...
    channels:
    my.topic.name:
    @@ -905,7 +905,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: true
    schemaRegistryAvailable: true
    ...
    channels:
    my.topic.name:
  4. dalelane revised this gist Feb 23, 2021. 1 changed file with 76 additions and 129 deletions.
    205 changes: 76 additions & 129 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -38,14 +38,13 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    bindings:
    kafka:
    key:
    type: string
    payload:
    $ref: "my-avro-schema.avsc"
    ```
    @@ -60,19 +59,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: "my-avro-schema.avsc"
    bindings:
    kafka:
    schemaIdLocation: "none"
    key:
    type: string
    payload:
    $ref: "my-avro-schema.avsc"
    ```
    @@ -119,14 +115,13 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    bindings:
    kafka:
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    @@ -141,19 +136,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "none"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    @@ -212,19 +204,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "payload"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -263,19 +252,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "payload"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    @@ -297,19 +283,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "payload"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    @@ -362,19 +345,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    @@ -396,19 +376,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -467,11 +444,6 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    @@ -484,11 +456,13 @@ channels:
    type: string
    enum:
    - "BINARY"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -527,19 +501,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    @@ -561,19 +532,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -632,11 +600,6 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    @@ -649,11 +612,13 @@ channels:
    type: string
    enum:
    - "JSON"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -692,19 +657,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    @@ -726,19 +688,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -797,11 +756,6 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    @@ -814,11 +768,13 @@ channels:
    type: string
    enum:
    - "BINARY"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -857,19 +813,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    @@ -891,19 +844,16 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
    @@ -962,11 +912,6 @@ channels:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    @@ -979,11 +924,13 @@ channels:
    type: string
    enum:
    - "JSON"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    key:
    type: string
    payload:
    $ref: my-avro-schema.avsc
    ```
    ### How a Java developer could consume messages from this spec
  5. dalelane revised this gist Feb 22, 2021. 1 changed file with 52 additions and 0 deletions.
    52 changes: 52 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -50,6 +50,32 @@ channels:
    $ref: "my-avro-schema.avsc"
    ```
    or (optionally, explicitly specifying that a schema registry was not used)
    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: "my-avro-schema.avsc"
    bindings:
    kafka:
    schemaIdLocation: "none"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -105,6 +131,32 @@ channels:
    $ref: my-avro-schema.avsc
    ```
    or (optionally, explicitly specifying that a schema registry was not used)
    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "none"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
  6. dalelane revised this gist Feb 22, 2021. 1 changed file with 10 additions and 10 deletions.
    20 changes: 10 additions & 10 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -152,8 +152,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    schemaRegistryVendor: "confluent"
    useSchemaRegistry: true
    ...
    channels:
    my.topic.name:
    @@ -237,8 +237,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    schemaRegistryVendor: "confluent"
    useSchemaRegistry: false
    ...
    channels:
    my.topic.name:
    @@ -336,8 +336,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: false
    ...
    channels:
    my.topic.name:
    @@ -407,8 +407,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: true
    ...
    channels:
    my.topic.name:
    @@ -501,8 +501,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: false
    ...
    channels:
    my.topic.name:
    @@ -572,8 +572,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    schemaRegistryVendor: "apicurio"
    useSchemaRegistry: true
    ...
    channels:
    my.topic.name:
    @@ -666,8 +666,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: false
    ...
    channels:
    my.topic.name:
    @@ -737,8 +737,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: true
    ...
    channels:
    my.topic.name:
    @@ -831,8 +831,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: false
    ...
    channels:
    my.topic.name:
    @@ -902,8 +902,8 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    schemaRegistryVendor: "ibm"
    useSchemaRegistry: true
    ...
    channels:
    my.topic.name:
  7. dalelane revised this gist Feb 22, 2021. 1 changed file with 15 additions and 15 deletions.
    30 changes: 15 additions & 15 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -153,7 +153,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "confluent"
    schemaRegistryVendor: "confluent"
    ...
    channels:
    my.topic.name:
    @@ -226,7 +226,7 @@ channels:
    schemaIdLocation: "payload"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    @@ -238,7 +238,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "confluent"
    schemaRegistryVendor: "confluent"
    ...
    channels:
    my.topic.name:
    @@ -325,7 +325,7 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    @@ -337,7 +337,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "apicurio"
    schemaRegistryVendor: "apicurio"
    ...
    channels:
    my.topic.name:
    @@ -408,7 +408,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "apicurio"
    schemaRegistryVendor: "apicurio"
    ...
    channels:
    my.topic.name:
    @@ -490,7 +490,7 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    @@ -502,7 +502,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "apicurio"
    schemaRegistryVendor: "apicurio"
    ...
    channels:
    my.topic.name:
    @@ -573,7 +573,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "apicurio"
    schemaRegistryVendor: "apicurio"
    ...
    channels:
    my.topic.name:
    @@ -655,7 +655,7 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    @@ -667,7 +667,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "ibm"
    schemaRegistryVendor: "ibm"
    ...
    channels:
    my.topic.name:
    @@ -738,7 +738,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "ibm"
    schemaRegistryVendor: "ibm"
    ...
    channels:
    my.topic.name:
    @@ -820,7 +820,7 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    @@ -832,7 +832,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "ibm"
    schemaRegistryVendor: "ibm"
    ...
    channels:
    my.topic.name:
    @@ -903,7 +903,7 @@ servers:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "ibm"
    schemaRegistryVendor: "ibm"
    ...
    channels:
    my.topic.name:
  8. dalelane revised this gist Feb 22, 2021. 1 changed file with 176 additions and 0 deletions.
    176 changes: 176 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -152,6 +152,7 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "confluent"
    ...
    channels:
    @@ -225,6 +226,41 @@ channels:
    schemaIdLocation: "payload"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "confluent"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "payload"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -289,6 +325,40 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "apicurio"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -337,6 +407,7 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "apicurio"
    ...
    channels:
    @@ -419,6 +490,40 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "apicurio"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -467,6 +572,7 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "apicurio"
    ...
    channels:
    @@ -549,6 +655,40 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "ibm"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -597,6 +737,7 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "ibm"
    ...
    channels:
    @@ -679,6 +820,40 @@ channels:
    schemaIdLocation: "header"
    ```
    or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: false
    vendor: "ibm"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    @@ -727,6 +902,7 @@ servers:
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    useSchemaRegistry: true
    vendor: "ibm"
    ...
    channels:
  9. dalelane revised this gist Feb 22, 2021. 1 changed file with 10 additions and 20 deletions.
    30 changes: 10 additions & 20 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -171,8 +171,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    schemaIdLocation: "payload"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -223,8 +222,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    schemaIdLocation: "payload"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -288,8 +286,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -368,8 +365,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -420,8 +416,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -500,8 +495,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -552,8 +546,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -632,8 +625,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -684,8 +676,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -764,8 +755,7 @@ channels:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
  10. dalelane revised this gist Feb 22, 2021. 1 changed file with 13 additions and 23 deletions.
    36 changes: 13 additions & 23 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -2,7 +2,7 @@

    I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.

    _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2JV0U/p1613385141226400) - see [revisions](https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188/revisions) to see what has changed_
    _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2JV0U/p1613385141226400) and [Github](https://github.com/asyncapi/bindings/issues/41) - see [revisions](https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188/revisions) to see what has changed_

    - [Binary-encoded Avro](#binary-encoded-avro)
    - [JSON-encoded Avro](#json-encoded-avro)
    @@ -45,12 +45,9 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: "my-avro-schema.avsc"
    bindings:
    kafka:
    avro:
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -103,12 +100,9 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -172,13 +166,13 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -224,13 +218,13 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    @@ -289,12 +283,12 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "binary"
    schemaIdLocation: "header"
    ```
    @@ -360,6 +354,7 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    headers:
    @@ -375,8 +370,6 @@ channels:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "binary"

    ```
    ### How a Java developer could consume messages from this spec
    @@ -422,12 +415,12 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    schemaIdLocation: "header"
    ```
    @@ -493,6 +486,7 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    headers:
    @@ -508,8 +502,6 @@ channels:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "json"

    ```
    ### How a Java developer could consume messages from this spec
    @@ -555,12 +547,12 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "binary"
    schemaIdLocation: "header"
    ```
    @@ -626,6 +618,7 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/octet-stream"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    headers:
    @@ -641,8 +634,6 @@ channels:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "binary"

    ```
    ### How a Java developer could consume messages from this spec
    @@ -688,12 +679,12 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    schemaIdLocation: "header"
    ```
    @@ -759,6 +750,7 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    contentType: "application/json"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    headers:
    @@ -774,8 +766,6 @@ channels:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "json"

    ```
    ### How a Java developer could consume messages from this spec
  11. dalelane revised this gist Feb 15, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion proposal.md
    Original file line number Diff line number Diff line change
    @@ -796,7 +796,7 @@ for (ConsumerRecord<String, GenericRecord> record : records) {

    ---

    ## Message traits
    ## Message traits

    Some common definitions of message headers based on well-known serdes libraries

  12. dalelane revised this gist Feb 15, 2021. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -17,6 +17,9 @@ _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2
    - [Event Streams using JSON encoding](#event-streams-using-json-encoding)
    - [Event Streams using JSON encoding using schema registry](#event-streams-using-json-encoding-using-schema-registry)

    also:
    - [Reusable message traits with common headers used by serdes clients](#message-traits)

    ---

    ## Binary-encoded Avro
  13. dalelane revised this gist Feb 15, 2021. 1 changed file with 54 additions and 16 deletions.
    70 changes: 54 additions & 16 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -357,13 +357,11 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    headers:
    type: object
    properties:
    apicurio.value.globalId:
    type: string
    apicurio.value.version:
    type: integer
    apicurio.value.encoding:
    type: string
    enum:
    @@ -492,13 +490,11 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1
    headers:
    type: object
    properties:
    apicurio.value.globalId:
    type: string
    apicurio.value.version:
    type: integer
    apicurio.value.encoding:
    type: string
    enum:
    @@ -627,13 +623,11 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    headers:
    type: object
    properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
    type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
    type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
    type: string
    enum:
    @@ -762,13 +756,11 @@ channels:
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    traits:
    - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    headers:
    type: object
    properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
    type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
    type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
    type: string
    enum:
    @@ -801,3 +793,49 @@ for (ConsumerRecord<String, GenericRecord> record : records) {

    ---

    ## Message traits

    Some common definitions of message headers based on well-known serdes libraries

    ### Apicurio

    e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1

    ```yaml
    name: apicurio
    summary: Message headers used by Apicurio serdes library
    headers:
    type: object
    properties:
    apicurio.value.globalId:
    type: string
    apicurio.value.version:
    type: integer
    apicurio.value.encoding:
    type: string
    enum:
    - "BINARY"
    - "JSON"
    ```
    ### IBM Event Streams
    e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1
    ```yaml
    name: ibm-eventstreams
    summary: Message headers used by IBM Event Streams serdes library
    headers:
    type: object
    properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
    type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
    type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
    type: string
    enum:
    - "BINARY"
    - "JSON"
    ```
    ---
  14. dalelane revised this gist Feb 15, 2021. 1 changed file with 19 additions and 19 deletions.
    38 changes: 19 additions & 19 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -60,7 +60,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -99,7 +99,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -118,7 +118,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -168,7 +168,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -185,7 +185,7 @@ props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeseria
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
    Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
    @@ -220,7 +220,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -240,7 +240,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -285,7 +285,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -305,7 +305,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -356,7 +356,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    headers:
    type: object
    properties:
    @@ -420,7 +420,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -440,7 +440,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -491,7 +491,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    headers:
    type: object
    properties:
    @@ -555,7 +555,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -575,7 +575,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -626,7 +626,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    headers:
    type: object
    properties:
    @@ -690,7 +690,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    @@ -710,7 +710,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    @@ -761,7 +761,7 @@ channels:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    headers:
    type: object
    properties:
  15. dalelane revised this gist Feb 15, 2021. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -2,6 +2,8 @@

    I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.

    _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2JV0U/p1613385141226400) - see [revisions](https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188/revisions) to see what has changed_

    - [Binary-encoded Avro](#binary-encoded-avro)
    - [JSON-encoded Avro](#json-encoded-avro)
    - [Confluent using schema registry](#confluent-using-schema-registry)
  16. dalelane revised this gist Feb 15, 2021. 1 changed file with 8 additions and 0 deletions.
    8 changes: 8 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -328,6 +328,8 @@ Messages on a Kafka topic were produced using an Apicurio serdes library configu

    The Apicurio schema registry is available for use by consuming applications.

    The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

    ### How this could be described in AsyncAPI

    ```yaml
    @@ -461,6 +463,8 @@ Messages on a Kafka topic were produced using an Apicurio serdes library configu

    The Apicurio schema registry is available for use by consuming applications.

    The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

    ### How this could be described in AsyncAPI

    ```yaml
    @@ -594,6 +598,8 @@ Messages on a Kafka topic were produced using an Event Streams serdes library co

    The Event Streams schema registry is available for use by consuming applications.

    The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

    ### How this could be described in AsyncAPI

    ```yaml
    @@ -727,6 +733,8 @@ Messages on a Kafka topic were produced using an Event Streams serdes library co

    The Event Streams schema registry is available for use by consuming applications.

    The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

    ### How this could be described in AsyncAPI

    ```yaml
  17. dalelane revised this gist Feb 15, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion proposal.md
    Original file line number Diff line number Diff line change
    @@ -777,7 +777,7 @@ channels:

    ```java
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
    props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
  18. dalelane created this gist Feb 15, 2021.
    793 changes: 793 additions & 0 deletions proposal.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,793 @@
    # Describing Kafka schema usage using AsyncAPI

    I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.

    - [Binary-encoded Avro](#binary-encoded-avro)
    - [JSON-encoded Avro](#json-encoded-avro)
    - [Confluent using schema registry](#confluent-using-schema-registry)
    - [Confluent](#confluent)
    - [Apicurio using binary encoding](#apicurio-using-binary-encoding)
    - [Apicurio using binary encoding using schema registry](#apicurio-using-binary-encoding-using-schema-registry)
    - [Apicurio using JSON encoding](#apicurio-using-json-encoding)
    - [Apicurio using JSON encoding using schema registry](#apicurio-using-json-encoding-using-schema-registry)
    - [Event Streams using binary encoding](#event-streams-using-binary-encoding)
    - [Event Streams using binary encoding using schema registry](#event-streams-using-binary-encoding-using-schema-registry)
    - [Event Streams using JSON encoding](#event-streams-using-json-encoding)
    - [Event Streams using JSON encoding using schema registry](#event-streams-using-json-encoding-using-schema-registry)

    ---

    ## Binary-encoded Avro

    ### Scenario

    Messages on a Kafka topic are being produced using a GenericDatumWriter with binary encoding.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: "application/vnd.apache.avro;version=1.9.0"
    payload:
    $ref: "my-avro-schema.avsc"
    bindings:
    kafka:
    avro:
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## JSON-encoded Avro

    ### Scenario

    Messages on a Kafka topic were produced using a GenericDatumWriter with JSON encoding.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }

    ```

    ---

    ## Confluent using schema registry

    ### Scenario

    Messages on a Kafka topic were produced using a Confluent serdes library.

    The Confluent schema registry is available for use by consuming applications.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    vendor: "confluent"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
    consumer.subscribe(Arrays.asList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
    for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
    }
    ```

    ---

    ## Confluent

    ### Scenario

    Messages on a Kafka topic were produced using a Confluent serdes library.

    The Confluent schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "payload"
    binding: "binary"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

    // skip the first five bytes that contain the "magic" byte and 4 byte global ID
    bias.skip(5);

    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## Apicurio using binary encoding

    ### Scenario

    Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

    The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "binary"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## Apicurio using binary encoding using schema registry

    ### Scenario

    Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

    The Apicurio schema registry is available for use by consuming applications.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    vendor: "apicurio"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    headers:
    type: object
    properties:
    apicurio.value.globalId:
    type: string
    apicurio.value.version:
    type: integer
    apicurio.value.encoding:
    type: string
    enum:
    - "BINARY"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "binary"

    ```

    ### How a Java developer could consume messages from this spec

    ```java
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
    }
    ```

    ---

    ## Apicurio using JSON encoding

    ### Scenario

    Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

    The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## Apicurio using JSON encoding using schema registry

    ### Scenario

    Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

    The Apicurio schema registry is available for use by consuming applications.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    vendor: "apicurio"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    headers:
    type: object
    properties:
    apicurio.value.globalId:
    type: string
    apicurio.value.version:
    type: integer
    apicurio.value.encoding:
    type: string
    enum:
    - "JSON"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "json"

    ```

    ### How a Java developer could consume messages from this spec

    ```java
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
    }
    ```

    ---

    ## Event Streams using binary encoding

    ### Scenario

    Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

    The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "binary"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## Event Streams using binary encoding using schema registry

    ### Scenario

    Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

    The Event Streams schema registry is available for use by consuming applications.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    vendor: "ibm"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    headers:
    type: object
    properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
    type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
    type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
    type: string
    enum:
    - "BINARY"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "binary"

    ```

    ### How a Java developer could consume messages from this spec

    ```java
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
    }
    ```

    ---

    ## Event Streams using JSON encoding

    ### Scenario

    Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

    The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    binding: "json"
    schemaIdLocation: "header"
    ```
    ### How a Java developer could consume messages from this spec
    ```java
    Schema.Parser schemaDefinitionParser = new Schema.Parser();
    Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
    }
    ```

    ---

    ## Event Streams using JSON encoding using schema registry

    ### Scenario

    Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

    The Event Streams schema registry is available for use by consuming applications.

    ### How this could be described in AsyncAPI

    ```yaml
    asyncapi: "2.0.0"
    ...
    servers:
    prod:
    ...
    bindings:
    kafka:
    schemaRegistryUrl: "https://my-schema-registry.com"
    vendor: "ibm"
    ...
    channels:
    my.topic.name:
    ...
    subscribe:
    ...
    bindings:
    kafka:
    key:
    type: string
    ...
    message:
    schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
    headers:
    type: object
    properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
    type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
    type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
    type: string
    enum:
    - "JSON"
    payload:
    $ref: my-avro-schema.avsc
    bindings:
    kafka:
    avro:
    schemaIdLocation: "header"
    binding: "json"

    ```

    ### How a Java developer could consume messages from this spec

    ```java
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
    props.put("schema.registry.url", "https://my-schema-registry.com");

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("my.topic.name"));

    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
    }
    ```

    ---