Skip to content

Instantly share code, notes, and snippets.

@akmamun
Last active September 16, 2023 09:54
Show Gist options
  • Select an option

  • Save akmamun/3babb80b228d6009c72b0b806152cab2 to your computer and use it in GitHub Desktop.

Select an option

Save akmamun/3babb80b228d6009c72b0b806152cab2 to your computer and use it in GitHub Desktop.

Revisions

  1. akmamun renamed this gist Sep 16, 2023. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. akmamun created this gist Sep 11, 2023.
    55 changes: 55 additions & 0 deletions event_listener.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,55 @@
    import json
    import logging
    from time import sleep

    from confluent_kafka import Consumer
    from django.core.management import BaseCommand

    logger = logging.getLogger(__name__)

    # pip install confluent_kafka

    """run commands
    python manage.py event_listener
    """
    CONSUMER_TIMEOUT = 5

    KAFKA_CONSUMER_RECONNECT_TIME = 5

    def connect_consumer():
    consumer_config = {'bootstrap.servers':"KAFKA_SERVERS_URLS", "group.id": "KAFKA_GROUP_ID"}
    consumer = Consumer(consumer_config)
    logger.info("started kafka consumer")
    return consumer


    class Command(BaseCommand):
    help = "Runs the event listener"

    def handle(self, *args, **options):
    SUBSCRIBE_LIST = []
    consumer = connect_consumer()
    consumer.subscribe(SUBSCRIBE_LIST)
    logger.debug("subscribed")
    try:
    while True:
    msg = consumer.poll(timeout=CONSUMER_TIMEOUT)
    logger.debug("message pulled success")
    if msg is None:
    logger.info("no message found to process")
    continue

    results = json.loads(msg.value().decode("utf-8"))

    # if msg.topic() == 'test_topic_name':
    # agent_settlement_event.settlement_status_update(results)

    except Exception as ex:
    logger.error(repr(ex))
    consumer.close()
    logger.error("kafka closed connection")
    sleep(KAFKA_CONSUMER_RECONNECT_TIME)
    consumer = connect_consumer()
    logger.info("connection reconnect success")
    consumer.subscribe(SUBSCRIBE_LIST)
    logger.debug("subscribed after reconnect")