Skip to content

Instantly share code, notes, and snippets.

@eshafik
Created January 16, 2024 18:27
Show Gist options
  • Select an option

  • Save eshafik/3adb55391af1c2d94abeb826f60875a9 to your computer and use it in GitHub Desktop.

Select an option

Save eshafik/3adb55391af1c2d94abeb826f60875a9 to your computer and use it in GitHub Desktop.
FastAPI and Kafka Integration with aiokafka [ Consumer and Producer]

Producer:

Create a sperate file like producer.py -

import json
import ssl

from aiokafka import AIOKafkaProducer

from conf.app_vars import KAFKA_HOST, KAFKA_USERNAME, KAFKA_PASSWORD

__all__ = ['startup_producer', 'shutdown_producer']


def producer_serializer(message):
    return json.dumps(message).encode('utf-8')


async def startup_producer():
    producer = AIOKafkaProducer(
        bootstrap_servers=KAFKA_HOST,
        sasl_mechanism='SCRAM-SHA-256',
        security_protocol='SASL_SSL',
        sasl_plain_username=KAFKA_USERNAME,
        sasl_plain_password=KAFKA_PASSWORD,
        ssl_context=ssl.create_default_context(),
        value_serializer=producer_serializer
    )
    await producer.start()
    return producer


async def shutdown_producer(producer: AIOKafkaProducer):
    await producer.stop()

To start producer at the beginning of the server starting call this "startup_producer". Assume we have server.py or main.py -

from fastapi import FastAPI

from conf.producer import startup_producer, shutdown_producer


middleware = [#middleware classes]
app = FastAPI(middleware=middleware)

@app.on_event("startup")
async def startup():
    app.producer = await startup_producer()
    
@app.on_event("shutdown")
async def shutdown():
    await shutdown_producer(app.producer)
    

Now you can use "app.producer" to send/publish data to kafka like this way -

from conf.main import app

producer = app.producer
await producer.send(topic='test', value=data)

Consumer

Now we need to write a consumer so that continuously we can get the message from the specific topic. For getting the kafka message continuously, we need to create asyncio task along with the uvicron server.

Write a BaseConsumer class in consumer.py file -

import ssl
import json
from json.decoder import JSONDecodeError

from aiokafka import AIOKafkaConsumer

from conf import KAFKA_HOST, KAFKA_USERNAME, KAFKA_PASSWORD

__all__ = ['BaseConsumer']


class BaseConsumer(AIOKafkaConsumer):
    def __init__(self, topic: list[str] | str):
        super().__init__(topic,
                         bootstrap_servers=KAFKA_HOST,
                         sasl_mechanism='SCRAM-SHA-256',
                         security_protocol='SASL_SSL',
                         sasl_plain_username=KAFKA_USERNAME,
                         sasl_plain_password=KAFKA_PASSWORD,
                         ssl_context=ssl.create_default_context(),
                         value_deserializer=self.deserializer)

    @staticmethod
    def deserializer(message):
        try:
            return json.loads(message.decode('utf-8'))
        except JSONDecodeError:
            if isinstance(message, bytes):
                return message.decode('utf-8')
            else:
                return message

Now you can instantiate BaseConsumer class with topic name any where like this way -

consumer = BaseConsumer(topic='webhook')

Also you need to write a consume method who will be responsible to consume the consumer continuously. So the overall code looks like -

from conf.consumer import BaseConsumer

consumer = BaseConsumer(topic='webhook')


async def consume():
    await consumer.start()
    try:
        async for msg in consumer:
            print(
                "consumed: ",
                msg.topic,
                msg.partition,
                msg.offset,
                msg.key,
                msg.value,
                msg.timestamp,
            )
            # Your business logic here...

    finally:
        await consumer.stop()

After this, you need to create a asyncio task for the consume() method and also need to stop the consumer during shutdown the application -

server.py or main.py -

@app.on_event("startup")
async def startup():
    app.producer = await startup_producer()
    asyncio.create_task(consume())
    
@app.on_event("shutdown")
async def shutdown():
    await shutdown_producer(app.producer)
    await consumer.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment