Last active
March 20, 2025 14:27
-
-
Save dalelane/3931c17b14c51fa4a1cf25496237d188 to your computer and use it in GitHub Desktop.
Revisions
-
dalelane revised this gist
Feb 25, 2021 . 1 changed file with 6 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -448,7 +448,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml headers: type: object properties: @@ -604,7 +604,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml headers: type: object properties: @@ -760,7 +760,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml headers: type: object properties: @@ -916,7 +916,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml headers: type: object properties: @@ -957,7 +957,7 @@ Some common definitions of message headers based on well-known serdes libraries ### Apicurio e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml ```yaml name: apicurio @@ -978,7 +978,7 @@ headers: ### IBM Event Streams e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml ```yaml name: ibm-eventstreams -
dalelane revised this gist
Feb 23, 2021 . 1 changed file with 6 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -448,7 +448,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml headers: type: object properties: @@ -604,7 +604,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml headers: type: object properties: @@ -760,7 +760,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml headers: type: object properties: @@ -916,7 +916,7 @@ channels: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml headers: type: object properties: @@ -957,7 +957,7 @@ Some common definitions of message headers based on well-known serdes libraries ### Apicurio e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/apicurio-v0.0.1.yaml ```yaml name: apicurio @@ -978,7 +978,7 @@ headers: ### IBM Event Streams e.g. https://github.com/asyncapi/bindings/raw/master/kafka/traits/ibm-v0.0.1.yaml ```yaml name: ibm-eventstreams -
dalelane revised this gist
Feb 23, 2021 . 1 changed file with 10 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -197,7 +197,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "confluent" schemaRegistryAvailable: true ... channels: my.topic.name: @@ -276,7 +276,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "confluent" schemaRegistryAvailable: false ... channels: my.topic.name: @@ -369,7 +369,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" schemaRegistryAvailable: false ... channels: my.topic.name: @@ -437,7 +437,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" schemaRegistryAvailable: true ... channels: my.topic.name: @@ -525,7 +525,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" schemaRegistryAvailable: false ... channels: my.topic.name: @@ -593,7 +593,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" schemaRegistryAvailable: true ... channels: my.topic.name: @@ -681,7 +681,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" schemaRegistryAvailable: false ... channels: my.topic.name: @@ -749,7 +749,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" schemaRegistryAvailable: true ... channels: my.topic.name: @@ -837,7 +837,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" schemaRegistryAvailable: false ... channels: my.topic.name: @@ -905,7 +905,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" schemaRegistryAvailable: true ... channels: my.topic.name: -
dalelane revised this gist
Feb 23, 2021 . 1 changed file with 76 additions and 129 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -38,14 +38,13 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: key: type: string payload: $ref: "my-avro-schema.avsc" ``` @@ -60,19 +59,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "none" key: type: string payload: $ref: "my-avro-schema.avsc" ``` @@ -119,14 +115,13 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: key: type: string payload: $ref: my-avro-schema.avsc ``` @@ -141,19 +136,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: schemaIdLocation: "none" key: type: string payload: $ref: my-avro-schema.avsc ``` @@ -212,19 +204,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "payload" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -263,19 +252,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "payload" key: type: string payload: $ref: my-avro-schema.avsc ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) @@ -297,19 +283,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "payload" key: type: string payload: $ref: my-avro-schema.avsc ``` @@ -362,19 +345,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) @@ -396,19 +376,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -467,11 +444,6 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" @@ -484,11 +456,13 @@ channels: type: string enum: - "BINARY" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -527,19 +501,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) @@ -561,19 +532,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -632,11 +600,6 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" @@ -649,11 +612,13 @@ channels: type: string enum: - "JSON" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -692,19 +657,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) @@ -726,19 +688,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -797,11 +756,6 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" @@ -814,11 +768,13 @@ channels: type: string enum: - "BINARY" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -857,19 +813,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) @@ -891,19 +844,16 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -962,11 +912,6 @@ channels: ... subscribe: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" @@ -979,11 +924,13 @@ channels: type: string enum: - "JSON" bindings: kafka: schemaIdLocation: "header" key: type: string payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 52 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -50,6 +50,32 @@ channels: $ref: "my-avro-schema.avsc" ``` or (optionally, explicitly specifying that a schema registry was not used) ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: "my-avro-schema.avsc" bindings: kafka: schemaIdLocation: "none" ``` ### How a Java developer could consume messages from this spec ```java @@ -105,6 +131,32 @@ channels: $ref: my-avro-schema.avsc ``` or (optionally, explicitly specifying that a schema registry was not used) ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "none" ``` ### How a Java developer could consume messages from this spec ```java -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 10 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -152,8 +152,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "confluent" useSchemaRegistry: true ... channels: my.topic.name: @@ -237,8 +237,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "confluent" useSchemaRegistry: false ... channels: my.topic.name: @@ -336,8 +336,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" useSchemaRegistry: false ... channels: my.topic.name: @@ -407,8 +407,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" useSchemaRegistry: true ... channels: my.topic.name: @@ -501,8 +501,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" useSchemaRegistry: false ... channels: my.topic.name: @@ -572,8 +572,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "apicurio" useSchemaRegistry: true ... channels: my.topic.name: @@ -666,8 +666,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" useSchemaRegistry: false ... channels: my.topic.name: @@ -737,8 +737,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" useSchemaRegistry: true ... channels: my.topic.name: @@ -831,8 +831,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" useSchemaRegistry: false ... channels: my.topic.name: @@ -902,8 +902,8 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" schemaRegistryVendor: "ibm" useSchemaRegistry: true ... channels: my.topic.name: -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 15 additions and 15 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -153,7 +153,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true schemaRegistryVendor: "confluent" ... channels: my.topic.name: @@ -226,7 +226,7 @@ channels: schemaIdLocation: "payload" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" @@ -238,7 +238,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false schemaRegistryVendor: "confluent" ... channels: my.topic.name: @@ -325,7 +325,7 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" @@ -337,7 +337,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false schemaRegistryVendor: "apicurio" ... channels: my.topic.name: @@ -408,7 +408,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true schemaRegistryVendor: "apicurio" ... channels: my.topic.name: @@ -490,7 +490,7 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" @@ -502,7 +502,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false schemaRegistryVendor: "apicurio" ... channels: my.topic.name: @@ -573,7 +573,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true schemaRegistryVendor: "apicurio" ... channels: my.topic.name: @@ -655,7 +655,7 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" @@ -667,7 +667,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false schemaRegistryVendor: "ibm" ... channels: my.topic.name: @@ -738,7 +738,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true schemaRegistryVendor: "ibm" ... channels: my.topic.name: @@ -820,7 +820,7 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" @@ -832,7 +832,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false schemaRegistryVendor: "ibm" ... channels: my.topic.name: @@ -903,7 +903,7 @@ servers: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true schemaRegistryVendor: "ibm" ... channels: my.topic.name: -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 176 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -152,6 +152,7 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true vendor: "confluent" ... channels: @@ -225,6 +226,41 @@ channels: schemaIdLocation: "payload" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false vendor: "confluent" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "payload" ``` ### How a Java developer could consume messages from this spec ```java @@ -289,6 +325,40 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false vendor: "apicurio" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java @@ -337,6 +407,7 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true vendor: "apicurio" ... channels: @@ -419,6 +490,40 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false vendor: "apicurio" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java @@ -467,6 +572,7 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true vendor: "apicurio" ... channels: @@ -549,6 +655,40 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false vendor: "ibm" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java @@ -597,6 +737,7 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true vendor: "ibm" ... channels: @@ -679,6 +820,40 @@ channels: schemaIdLocation: "header" ``` or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers) ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: false vendor: "ibm" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java @@ -727,6 +902,7 @@ servers: bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" useSchemaRegistry: true vendor: "ibm" ... channels: -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 10 additions and 20 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -171,8 +171,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "payload" ``` ### How a Java developer could consume messages from this spec @@ -223,8 +222,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "payload" ``` ### How a Java developer could consume messages from this spec @@ -288,8 +286,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -368,8 +365,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -420,8 +416,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -500,8 +495,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -552,8 +546,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -632,8 +625,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -684,8 +676,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -764,8 +755,7 @@ channels: $ref: my-avro-schema.avsc bindings: kafka: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec -
dalelane revised this gist
Feb 22, 2021 . 1 changed file with 13 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,7 +2,7 @@ I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic. _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2JV0U/p1613385141226400) and [Github](https://github.com/asyncapi/bindings/issues/41) - see [revisions](https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188/revisions) to see what has changed_ - [Binary-encoded Avro](#binary-encoded-avro) - [JSON-encoded Avro](#json-encoded-avro) @@ -45,12 +45,9 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: "my-avro-schema.avsc" ``` ### How a Java developer could consume messages from this spec @@ -103,12 +100,9 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc ``` ### How a Java developer could consume messages from this spec @@ -172,13 +166,13 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "payload" ``` ### How a Java developer could consume messages from this spec @@ -224,13 +218,13 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "payload" ``` ### How a Java developer could consume messages from this spec @@ -289,12 +283,12 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" ``` @@ -360,6 +354,7 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1 headers: @@ -375,8 +370,6 @@ channels: kafka: avro: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -422,12 +415,12 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" ``` @@ -493,6 +486,7 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1 headers: @@ -508,8 +502,6 @@ channels: kafka: avro: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -555,12 +547,12 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" ``` @@ -626,6 +618,7 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/octet-stream" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1 headers: @@ -641,8 +634,6 @@ channels: kafka: avro: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec @@ -688,12 +679,12 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" ``` @@ -759,6 +750,7 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" contentType: "application/json" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1 headers: @@ -774,8 +766,6 @@ channels: kafka: avro: schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -796,7 +796,7 @@ for (ConsumerRecord<String, GenericRecord> record : records) { --- ## Message traits Some common definitions of message headers based on well-known serdes libraries -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -17,6 +17,9 @@ _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2 - [Event Streams using JSON encoding](#event-streams-using-json-encoding) - [Event Streams using JSON encoding using schema registry](#event-streams-using-json-encoding-using-schema-registry) also: - [Reusable message traits with common headers used by serdes clients](#message-traits) --- ## Binary-encoded Avro -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 54 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -357,13 +357,11 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1 headers: type: object properties: apicurio.value.encoding: type: string enum: @@ -492,13 +490,11 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1 headers: type: object properties: apicurio.value.encoding: type: string enum: @@ -627,13 +623,11 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1 headers: type: object properties: com.ibm.eventstreams.schemaregistry.encoding: type: string enum: @@ -762,13 +756,11 @@ channels: ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" traits: - $ref: https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1 headers: type: object properties: com.ibm.eventstreams.schemaregistry.encoding: type: string enum: @@ -801,3 +793,49 @@ for (ConsumerRecord<String, GenericRecord> record : records) { --- ## Message traits Some common definitions of message headers based on well-known serdes libraries ### Apicurio e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/apicurio-v0.0.1 ```yaml name: apicurio summary: Message headers used by Apicurio serdes library headers: type: object properties: apicurio.value.globalId: type: string apicurio.value.version: type: integer apicurio.value.encoding: type: string enum: - "BINARY" - "JSON" ``` ### IBM Event Streams e.g. https://github.com/asyncapi/bindings/raw/master/traits/kafka/schemas/ibm-v0.0.1 ```yaml name: ibm-eventstreams summary: Message headers used by IBM Event Streams serdes library headers: type: object properties: com.ibm.eventstreams.schemaregistry.schema.id: type: string com.ibm.eventstreams.schemaregistry.schema.version: type: integer com.ibm.eventstreams.schemaregistry.encoding: type: string enum: - "BINARY" - "JSON" ``` --- -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 19 additions and 19 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -60,7 +60,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -99,7 +99,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -118,7 +118,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -168,7 +168,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -185,7 +185,7 @@ props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeseria props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(100); @@ -220,7 +220,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -240,7 +240,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -285,7 +285,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -305,7 +305,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -356,7 +356,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" headers: type: object properties: @@ -420,7 +420,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -440,7 +440,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -491,7 +491,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" headers: type: object properties: @@ -555,7 +555,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -575,7 +575,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -626,7 +626,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" headers: type: object properties: @@ -690,7 +690,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: my-avro-schema.avsc bindings: @@ -710,7 +710,7 @@ GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord> props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); @@ -761,7 +761,7 @@ channels: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" headers: type: object properties: -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,6 +2,8 @@ I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic. _Updated following [feedback in Slack](https://asyncapi.slack.com/archives/C34F2JV0U/p1613385141226400) - see [revisions](https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188/revisions) to see what has changed_ - [Binary-encoded Avro](#binary-encoded-avro) - [JSON-encoded Avro](#json-encoded-avro) - [Confluent using schema registry](#confluent-using-schema-registry) -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 8 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -328,6 +328,8 @@ Messages on a Kafka topic were produced using an Apicurio serdes library configu The Apicurio schema registry is available for use by consuming applications. The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile. ### How this could be described in AsyncAPI ```yaml @@ -461,6 +463,8 @@ Messages on a Kafka topic were produced using an Apicurio serdes library configu The Apicurio schema registry is available for use by consuming applications. The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile. ### How this could be described in AsyncAPI ```yaml @@ -594,6 +598,8 @@ Messages on a Kafka topic were produced using an Event Streams serdes library co The Event Streams schema registry is available for use by consuming applications. The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile. ### How this could be described in AsyncAPI ```yaml @@ -727,6 +733,8 @@ Messages on a Kafka topic were produced using an Event Streams serdes library co The Event Streams schema registry is available for use by consuming applications. The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile. ### How this could be described in AsyncAPI ```yaml -
dalelane revised this gist
Feb 15, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -777,7 +777,7 @@ channels: ```java props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); -
dalelane created this gist
Feb 15, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,793 @@ # Describing Kafka schema usage using AsyncAPI I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic. - [Binary-encoded Avro](#binary-encoded-avro) - [JSON-encoded Avro](#json-encoded-avro) - [Confluent using schema registry](#confluent-using-schema-registry) - [Confluent](#confluent) - [Apicurio using binary encoding](#apicurio-using-binary-encoding) - [Apicurio using binary encoding using schema registry](#apicurio-using-binary-encoding-using-schema-registry) - [Apicurio using JSON encoding](#apicurio-using-json-encoding) - [Apicurio using JSON encoding using schema registry](#apicurio-using-json-encoding-using-schema-registry) - [Event Streams using binary encoding](#event-streams-using-binary-encoding) - [Event Streams using binary encoding using schema registry](#event-streams-using-binary-encoding-using-schema-registry) - [Event Streams using JSON encoding](#event-streams-using-json-encoding) - [Event Streams using JSON encoding using schema registry](#event-streams-using-json-encoding-using-schema-registry) --- ## Binary-encoded Avro ### Scenario Messages on a Kafka topic are being produced using a GenericDatumWriter with binary encoding. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: "application/vnd.apache.avro;version=1.9.0" payload: $ref: "my-avro-schema.avsc" bindings: kafka: avro: binding: "binary" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## JSON-encoded Avro ### Scenario Messages on a Kafka topic were produced using a GenericDatumWriter with JSON encoding. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: binding: "json" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Confluent using schema registry ### Scenario Messages on a Kafka topic were produced using a Confluent serdes library. The Confluent schema registry is available for use by consuming applications. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" vendor: "confluent" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "payload" binding: "binary" ``` ### How a Java developer could consume messages from this spec ```java props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props); consumer.subscribe(Arrays.asList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(100); for (ConsumerRecord<String, GenericRecord> record : records) { String somefield = record.get("field-from-your-schema"); } ``` --- ## Confluent ### Scenario Messages on a Kafka topic were produced using a Confluent serdes library. The Confluent schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "payload" binding: "binary" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); // skip the first five bytes that contain the "magic" byte and 4 byte global ID bias.skip(5); Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Apicurio using binary encoding ### Scenario Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding. The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: binding: "binary" schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Apicurio using binary encoding using schema registry ### Scenario Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding. The Apicurio schema registry is available for use by consuming applications. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" vendor: "apicurio" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' headers: type: object properties: apicurio.value.globalId: type: string apicurio.value.version: type: integer apicurio.value.encoding: type: string enum: - "BINARY" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" binding: "binary" ``` ### How a Java developer could consume messages from this spec ```java props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> record : records) { String somefield = record.get("field-from-your-schema"); } ``` --- ## Apicurio using JSON encoding ### Scenario Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding. The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: binding: "json" schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Apicurio using JSON encoding using schema registry ### Scenario Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding. The Apicurio schema registry is available for use by consuming applications. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" vendor: "apicurio" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' headers: type: object properties: apicurio.value.globalId: type: string apicurio.value.version: type: integer apicurio.value.encoding: type: string enum: - "JSON" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" binding: "json" ``` ### How a Java developer could consume messages from this spec ```java props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> record : records) { String somefield = record.get("field-from-your-schema"); } ``` --- ## Event Streams using binary encoding ### Scenario Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding. The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: binding: "binary" schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Event Streams using binary encoding using schema registry ### Scenario Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding. The Event Streams schema registry is available for use by consuming applications. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" vendor: "ibm" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' headers: type: object properties: com.ibm.eventstreams.schemaregistry.schema.id: type: string com.ibm.eventstreams.schemaregistry.schema.version: type: integer com.ibm.eventstreams.schemaregistry.encoding: type: string enum: - "BINARY" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" binding: "binary" ``` ### How a Java developer could consume messages from this spec ```java props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> record : records) { String somefield = record.get("field-from-your-schema"); } ``` --- ## Event Streams using JSON encoding ### Scenario Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding. The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' payload: $ref: my-avro-schema.avsc bindings: kafka: avro: binding: "json" schemaIdLocation: "header" ``` ### How a Java developer could consume messages from this spec ```java Schema.Parser schemaDefinitionParser = new Schema.Parser(); Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc")); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] bytes = record.value(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null); GenericRecord record = reader.read(null, decoder); String somefield = record.get("field-from-the-schema"); } ``` --- ## Event Streams using JSON encoding using schema registry ### Scenario Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding. The Event Streams schema registry is available for use by consuming applications. ### How this could be described in AsyncAPI ```yaml asyncapi: "2.0.0" ... servers: prod: ... bindings: kafka: schemaRegistryUrl: "https://my-schema-registry.com" vendor: "ibm" ... channels: my.topic.name: ... subscribe: ... bindings: kafka: key: type: string ... message: schemaFormat: 'application/vnd.apache.avro;version=1.9.0' headers: type: object properties: com.ibm.eventstreams.schemaregistry.schema.id: type: string com.ibm.eventstreams.schemaregistry.schema.version: type: integer com.ibm.eventstreams.schemaregistry.encoding: type: string enum: - "JSON" payload: $ref: my-avro-schema.avsc bindings: kafka: avro: schemaIdLocation: "header" binding: "json" ``` ### How a Java developer could consume messages from this spec ```java props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer"); props.put("schema.registry.url", "https://my-schema-registry.com"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my.topic.name")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> record : records) { String somefield = record.get("field-from-your-schema"); } ``` ---