Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save asifshaikat/2ac56b3d02ddc59be5e9dd1c133c998c to your computer and use it in GitHub Desktop.

Select an option

Save asifshaikat/2ac56b3d02ddc59be5e9dd1c133c998c to your computer and use it in GitHub Desktop.

Revisions

  1. @MichaelDrogalis MichaelDrogalis renamed this gist Jul 30, 2019. 1 changed file with 1 addition and 1 deletion.
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    I've been working with Kafka for over 7 years. I inevitably find myself doing the same set of activities while I'm developing or working with someone else's system. Here's a set of Kafka productivity hacks for doing a few things way faster than you're probably doing them now. :fire:
    I've been working with Apache Kafka for over 7 years. I inevitably find myself doing the same set of activities while I'm developing or working with someone else's system. Here's a set of Kafka productivity hacks for doing a few things way faster than you're probably doing them now. :fire:

    - [Show me all my Kafka topics and their partitions, replicas, and consumers](#show-me-all-my-kafka-topics-and-their-partitions-replicas-and-consumers)
    - [Show me the contents of a topic](#show-me-the-contents-of-a-topic)
  2. @MichaelDrogalis MichaelDrogalis created this gist Jul 22, 2019.
    188 changes: 188 additions & 0 deletions kafka-productivity-hacks.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,188 @@
    I've been working with Kafka for over 7 years. I inevitably find myself doing the same set of activities while I'm developing or working with someone else's system. Here's a set of Kafka productivity hacks for doing a few things way faster than you're probably doing them now. :fire:

    - [Show me all my Kafka topics and their partitions, replicas, and consumers](#show-me-all-my-kafka-topics-and-their-partitions-replicas-and-consumers)
    - [Show me the contents of a topic](#show-me-the-contents-of-a-topic)
    - [Create a Kafka topic](#create-a-kafka-topic)
    - [Produce messages to a Kafka topic](#produce-messages-to-a-kafka-topic)
    - [Validate the schema of messages before producing to a topic](#validate-the-schema-of-messages-before-producing-to-a-topic)
    - [Do all of this at a distance](#do-all-of-this-at-a-distance)

    ### Get the tools

    Most of these tricks rely on having KSQL, and the easiest way to get that is with Docker. KSQL let's you work with the data in Kafka in a client/server fashion. It won't attach/disrupt your existing Kafka setup. Launch this Docker Compose file with `docker-compose up`. Adjust `KSQL_BOOTSTRAP_SERVERS` to point to your Kafka installation if it's not locally available on port 9092.

    ```
    ---
    version: '2'
    services:
    ksql-server:
    image: confluentinc/cp-ksql-server:5.3.0
    hostname: ksql-server
    container_name: ksql-server
    network_mode: host
    ports:
    - "8088:8088"
    environment:
    KSQL_BOOTSTRAP_SERVERS: "127.0.0.1:9092"
    KSQL_HOST_NAME: ksql-server
    KSQL_LISTENERS: "http://0.0.0.0:8088"
    KSQL_CACHE_MAX_BYTES_BUFFERING: 0
    ksql-cli:
    image: confluentinc/cp-ksql-cli:5.3.0
    container_name: ksql-cli
    network_mode: host
    depends_on:
    - ksql-server
    entrypoint: /bin/sh
    tty: true
    ```

    Now to the fun part.

    ---------------------------------------

    ### Show me all my Kafka topics and their partitions, replicas, and consumers

    What's in the cluster? No command `kafka-topics.sh` + a long string of flags needed. No UI needed. It can even approximate how much outbound activity there is on the topics by the consumer count.

    ```
    ksql> show topics;
    Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    -----------------------------------------------------------------------------------------
    auctions | false | 3 | 1 | 2 | 1
    bids | false | 12 | 1 | 3 | 3
    people | false | 6 | 1 | 3 | 1
    -----------------------------------------------------------------------------------------
    ```

    ---------------------------------------

    ### Show me the contents of a topic

    What's in this topic? Forget `kafka-console-consumer.sh`. `print` can find out. It will also guess the serialization format of your data. This is handy if you don't know what the data in the topic is or who produced it.

    ```
    ksql> print 'bids' from beginning;
    Format:JSON
    {"ROWTIME":1562876975855,"ROWKEY":"null","item":"glow stick","price_usd":1,"bid_time":"8:13"}
    {"ROWTIME":1562876977966,"ROWKEY":"null","item":"magnet","price_usd":6,"bid_time":"8:17"}
    {"ROWTIME":1562876971480,"ROWKEY":"null","item":"tweezers","price_usd":4,"bid_time":"8:05"}
    {"ROWTIME":1562876969350,"ROWKEY":"null","item":"sponge","price_usd":3,"bid_time":"8:11"}
    {"ROWTIME":1562876967096,"ROWKEY":"null","item":"spoon","price_usd":2,"bid_time":"8:07"}
    {"ROWTIME":1562876973673,"ROWKEY":"null","item":"key chain","price_usd":5,"bid_time":"8:09"}
    ...
    ```

    In this case, it auto-discovered that I have JSON in the topic. `ROWKEY` and `ROWTIME` are KSQL's anolog for the key and timestamp of each record.

    ---------------------------------------

    ### Create a Kafka topic

    No more fishing around the broker's machine for `kafka-topics.sh` to make a topic. Make one right at the prompt. Can specify partitions/replication factor as usual.

    ```
    ksql> CREATE STREAM cheese_shipments (shipmentId INT, cheese VARCHAR, shipmentTimestamp VARCHAR)
    > WITH (kafka_topic='cheese_shipments', partitions=3, key = 'shipmentId', value_format='json');
    Message
    ----------------
    Stream created
    ----------------
    ```

    This creates a Kafka topic and registers a stream over it, which is basically a schema for data validation. Check it out:

    ```
    ksql> show streams;
    Stream Name | Kafka Topic | Format
    ----------------------------------------------
    CHEESE_SHIPMENTS | cheese_shipments | JSON
    ----------------------------------------------
    ```

    It's a stream, but really it's just a topic. There's those `cheese_shipments`:

    ```
    ksql> show topics;
    Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    ----------------------------------------------------------------------------------------------
    auctions | false | 3 | 1 | 2 | 1
    bids | false | 12 | 1 | 3 | 3
    cheese_shipments | true | 3 | 1 | 0 | 0
    people | false | 6 | 1 | 3 | 1
    ----------------------------------------------------------------------------------------------
    ```

    If the topic is active, you can get some runtime metrics about what it's doing. I don't have any here though, so it's blank at the bottom.

    ```
    ksql> describe extended cheese_shipments;
    Name : CHEESE_SHIPMENTS
    Type : STREAM
    Key field : SHIPMENTID
    Key format : STRING
    Timestamp field : Not set - using <ROWTIME>
    Value format : JSON
    Kafka topic : cheese_shipments (partitions: 3, replication: 1)
    Field | Type
    -----------------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    SHIPMENTID | INTEGER
    CHEESE | VARCHAR(STRING)
    SHIPMENTTIMESTAMP | VARCHAR(STRING)
    -----------------------------------------------
    Local runtime statistics
    ------------------------
    (Statistics of the local KSQL server interaction with the Kafka topic cheese_shipments)
    ```

    ---------------------------------------

    ### Produce messages to a Kafka topic

    Gotta get some data in a topic. I can either write a program which uses the producer API (ugh) or figure out how to use kafkacat (I need to relearn that every time). How about `insert into` instead?

    ```
    ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (42, 'provolone', 'june 5th 2019');
    ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (45, 'chedar', 'june 8th 2019');
    ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (47, 'swiss', 'june 8th 2019');
    ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (51, 'cooper', 'june 11th 2019');
    ```

    It's definitely in the topic. Check it out. Note that the topic is keyed by shipment ID and there's 3 partitions, so the order seen here doesn't match how I inserted it.

    ```
    ksql> print 'cheese_shipments' from beginning;
    Format:JSON
    {"ROWTIME":1562882124666,"ROWKEY":"45","SHIPMENTID":45,"CHEESE":"chedar","SHIPMENTTIMESTAMP":"june 8th 2019"}
    {"ROWTIME":1562882127795,"ROWKEY":"45","SHIPMENTID":47,"CHEESE":"swiss","SHIPMENTTIMESTAMP":"june 8th 2019"}
    {"ROWTIME":1562882078365,"ROWKEY":"42","SHIPMENTID":42,"CHEESE":"provolone","SHIPMENTTIMESTAMP":"june 5th 2019"}
    {"ROWTIME":1562882163295,"ROWKEY":"51","SHIPMENTID":51,"CHEESE":"cooper","SHIPMENTTIMESTAMP":"june 11th 2019"}
    ```

    ---------------------------------------

    ### Validate the schema of messages before producing to a topic

    I often wish I had better client-side validation of my messages before I put them on a topic. `insert into` does that for free because we placed a schema over it. :tada:

    ```
    ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values ('bad shipment id', 'american', 'june 12th 2019');
    Expected type INT32 for field SHIPMENTID but got bad shipment id(STRING)
    ```

    ### Do all of this at a distance

    Bonus: all of this works over [KSQL's REST API](https://docs.confluent.io/current/ksql/docs/developer-guide/api.html). So if the command line doesn't work for your situation, you can take all of this to the programming language you're using. One less container to run, too, so you can reasonably grab the tools you need with one `docker run` command.