Skip to content

Instantly share code, notes, and snippets.

@ehazlett
Created December 27, 2011 01:02
Show Gist options
  • Select an option

  • Save ehazlett/1522383 to your computer and use it in GitHub Desktop.

Select an option

Save ehazlett/1522383 to your computer and use it in GitHub Desktop.

Revisions

  1. ehazlett created this gist Dec 27, 2011.
    75 changes: 75 additions & 0 deletions transcribe.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,75 @@
    #!/usr/bin/env
    #
    # Requirements:
    # argparse==1.2.1
    # pymongo==2.1
    # redis==2.4.10

    import redis
    import pymongo
    import argparse
    import sys
    import logging
    from datetime import datetime
    import time

    logging.basicConfig(level=logging.DEBUG)

    def transcribe(redis_host='localhost', redis_port=6379, redis_password=None, redis_db=0,\
    redis_channels='*', mongo_host='127.0.0.1', mongo_port=27017, mongo_user=None, \
    mongo_password=None, mongo_db='transcribe'):
    """
    Logs messages from Redis channels into Mongo
    :keyword redis_host: Redis host/ip
    :keyword redis_port: Redis port
    :keyword redis_password: Redis password
    :keyword redis_db: Redis database
    :keyword redis_channels: Channel pattern to listen
    :keyword mongo_host: MongoDB host/ip
    :keyword mongo_port: MongoDB port
    :keyword mongo_user: MongoDB user
    :keyword mongo_password: MongoDB password
    :keyword mongo_db: MongoDB database
    """
    log = logging.getLogger('core')
    log.debug('Connecting to Redis: {0}:{1} with db {2}'.format(redis_host, redis_port, redis_db))
    rds = redis.Redis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)
    log.debug('Connecting to MongoDB: {0}:{1} with db {2} (user: {3})'.format(mongo_host, mongo_port, mongo_db, mongo_user))
    mgo_db = pymongo.Connection(host=mongo_host, port=mongo_port)[mongo_db]
    if mongo_user and mongo_password:
    mgo_db.authenticate(mongo_user, mongo_pass)
    # subscribe
    ps = rds.pubsub()
    ps.psubscribe(redis_channels)
    # listen
    log.info('Listening for messages on channels: {0}'.format(redis_channels))
    for m in ps.listen():
    data = {
    'date': time.time(),
    'channel': m['channel'],
    'data': m['data'],
    }
    log.debug(' {0}:{1}: {2}'.format(datetime.fromtimestamp(data['date']), data['channel'], data['data']))
    mgo_db.messages.insert(data)

    if __name__=='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--redis-host', action="store", dest="redis_host", default="127.0.0.1", help="Redis host/ip")
    parser.add_argument('--redis-port', action="store", type=int, dest="redis_port", default=6379, help="Redis port")
    parser.add_argument('--redis-password', action="store", dest="redis_password", default=None, help="Redis password")
    parser.add_argument('--redis-db', action="store", type=int, dest="redis_db", default=0, help="Redis DB")
    parser.add_argument('--redis-channels', action="store", dest="redis_channels", default='*', help="Redis Pub/Sub channel pattern")
    parser.add_argument('--mongo-host', action="store", dest="mongo_host", default='127.0.0.1', help="MongoDB host/ip")
    parser.add_argument('--mongo-port', action="store", type=int, dest="mongo_port", default=27017, help="MongoDB port")
    parser.add_argument('--mongo-user', action="store", dest="mongo_user", default=None, help="MongoDB user")
    parser.add_argument('--mongo-password', action="store", dest="mongo_password", default=None, help="MongoDB password")
    parser.add_argument('--mongo-db', action="store", dest="mongo_db", default='transcribe', help="MongoDB DB")

    opts = parser.parse_args()
    try:
    transcribe(**opts.__dict__)
    except KeyboardInterrupt:
    sys.exit(0)