Create a sperate file like producer.py -
import json
import ssl
from aiokafka import AIOKafkaProducer
from conf.app_vars import KAFKA_HOST, KAFKA_USERNAME, KAFKA_PASSWORD
__all__ = ['startup_producer', 'shutdown_producer']
def producer_serializer(message):
return json.dumps(message).encode('utf-8')
async def startup_producer():
producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_HOST,
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=KAFKA_USERNAME,
sasl_plain_password=KAFKA_PASSWORD,
ssl_context=ssl.create_default_context(),
value_serializer=producer_serializer
)
await producer.start()
return producer
async def shutdown_producer(producer: AIOKafkaProducer):
await producer.stop()To start producer at the beginning of the server starting call this "startup_producer". Assume we have server.py or main.py -
from fastapi import FastAPI
from conf.producer import startup_producer, shutdown_producer
middleware = [#middleware classes]
app = FastAPI(middleware=middleware)
@app.on_event("startup")
async def startup():
app.producer = await startup_producer()
@app.on_event("shutdown")
async def shutdown():
await shutdown_producer(app.producer)
Now you can use "app.producer" to send/publish data to kafka like this way -
from conf.main import app
producer = app.producer
await producer.send(topic='test', value=data)Now we need to write a consumer so that continuously we can get the message from the specific topic. For getting the kafka message continuously, we need to create asyncio task along with the uvicron server.
Write a BaseConsumer class in consumer.py file -
import ssl
import json
from json.decoder import JSONDecodeError
from aiokafka import AIOKafkaConsumer
from conf import KAFKA_HOST, KAFKA_USERNAME, KAFKA_PASSWORD
__all__ = ['BaseConsumer']
class BaseConsumer(AIOKafkaConsumer):
def __init__(self, topic: list[str] | str):
super().__init__(topic,
bootstrap_servers=KAFKA_HOST,
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=KAFKA_USERNAME,
sasl_plain_password=KAFKA_PASSWORD,
ssl_context=ssl.create_default_context(),
value_deserializer=self.deserializer)
@staticmethod
def deserializer(message):
try:
return json.loads(message.decode('utf-8'))
except JSONDecodeError:
if isinstance(message, bytes):
return message.decode('utf-8')
else:
return messageNow you can instantiate BaseConsumer class with topic name any where like this way -
consumer = BaseConsumer(topic='webhook')Also you need to write a consume method who will be responsible to consume the consumer continuously. So the overall code looks like -
from conf.consumer import BaseConsumer
consumer = BaseConsumer(topic='webhook')
async def consume():
await consumer.start()
try:
async for msg in consumer:
print(
"consumed: ",
msg.topic,
msg.partition,
msg.offset,
msg.key,
msg.value,
msg.timestamp,
)
# Your business logic here...
finally:
await consumer.stop()After this, you need to create a asyncio task for the consume() method and also need to stop the consumer during shutdown the application -
server.py or main.py -
@app.on_event("startup")
async def startup():
app.producer = await startup_producer()
asyncio.create_task(consume())
@app.on_event("shutdown")
async def shutdown():
await shutdown_producer(app.producer)
await consumer.stop()