Skip to content

Instantly share code, notes, and snippets.

@Doooooo0o
Created December 8, 2016 14:02
Show Gist options
  • Select an option

  • Save Doooooo0o/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.

Select an option

Save Doooooo0o/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.

Revisions

  1. theonlydoo created this gist Dec 8, 2016.
    101 changes: 101 additions & 0 deletions logfinder.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,101 @@
    #!/usr/bin/env python
    import os, time, re, socket, getopt, sys
    from jinja2 import Template

    logfile=re.compile(ur'.*\.log$', re.IGNORECASE)

    def finder(path, brokers):
    paths={}
    here=os.getcwd()
    for dpath, dnames, fnames in os.walk(path):
    for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]):
    if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400:
    t=socket.gethostname().split('.')[0]+'/'+fname.strip('./')
    t=t.replace('/', '-').replace('.', '-')
    paths[t]=fname.replace('./', "/")
    return paths, brokers

    def agent(paths, brokers):
    flume_brokers=brokers
    here=os.getcwd()
    readers=1
    i=1
    tails=[]
    r=''
    s=''
    c=''
    t=Template('''## channel configuration
    a1.channels.c{{ reader }}.type = memory
    a1.channels.c{{ reader }}.capacity = 100
    a1.channels.c{{ reader }}.transactionCapacity = 100
    # source
    a1.sources.r{{ reader }}.type = exec
    a1.sources.r{{ reader }}.command = tail -F {{ log }}
    a1.sources.r{{ reader }}.channels = c{{ reader }}
    # sink
    a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }}
    a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }}
    a1.sinks.k{{ reader }}.requiredAcks = 1
    a1.sinks.k{{ reader }}.batchSize = 20
    a1.sources.r{{ reader }}.channels = c{{ reader }}
    a1.sinks.k{{ reader }}.channel = c{{ reader }}
    ''')

    for topic, path in paths.iteritems():
    tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers))
    r=r+' r%i' % readers # counting the number of sources to append in file
    s=s+' k%i' % readers # counting the number of sinks
    c=c+' c%i' % readers # counting the number of sinks
    readers=readers+1
    header=Template('''# Name the components on this agent
    a1.sources = {{ sources }}
    a1.sinks = {{ sinks }}
    a1.channels = {{ channels }}
    ''')
    print header.render(sources=r, sinks=s, channels=c)
    for i in tails:
    print i



    def usage():
    print """
    PURPOSE :
    Finds logs and configure flume to tail them and ship them to kafka brokers.
    USAGE :
    logfinder -b brokers -p logpath
    OPTIONS :
    -short, --long <value> description
    -b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092
    -p, --path= <logs directory> where to find the logs that you which to tail
    -h, --help prints help
    """

    if __name__ == '__main__':

    try:
    opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help'])
    if len(opts) <= 1:
    usage()
    sys.exit(2)
    except getopt.GetoptError:
    usage()
    sys.exit(2)

    for opt, arg in opts:

    if opt in ('-h', '--help'):
    usage()
    sys.exit(2)
    elif opt in ('-b', '--brokers'):
    brokers = arg
    elif opt in ('-p', '--path'):
    path = arg
    else:
    usage()
    sys.exit(2)
    path, brokers=finder(path, brokers)
    agent(path, brokers)