Created
December 8, 2016 14:02
-
-
Save Doooooo0o/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.
Revisions
-
theonlydoo created this gist
Dec 8, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,101 @@ #!/usr/bin/env python import os, time, re, socket, getopt, sys from jinja2 import Template logfile=re.compile(ur'.*\.log$', re.IGNORECASE) def finder(path, brokers): paths={} here=os.getcwd() for dpath, dnames, fnames in os.walk(path): for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]): if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400: t=socket.gethostname().split('.')[0]+'/'+fname.strip('./') t=t.replace('/', '-').replace('.', '-') paths[t]=fname.replace('./', "/") return paths, brokers def agent(paths, brokers): flume_brokers=brokers here=os.getcwd() readers=1 i=1 tails=[] r='' s='' c='' t=Template('''## channel configuration a1.channels.c{{ reader }}.type = memory a1.channels.c{{ reader }}.capacity = 100 a1.channels.c{{ reader }}.transactionCapacity = 100 # source a1.sources.r{{ reader }}.type = exec a1.sources.r{{ reader }}.command = tail -F {{ log }} a1.sources.r{{ reader }}.channels = c{{ reader }} # sink a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }} a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }} a1.sinks.k{{ reader }}.requiredAcks = 1 a1.sinks.k{{ reader }}.batchSize = 20 a1.sources.r{{ reader }}.channels = c{{ reader }} a1.sinks.k{{ reader }}.channel = c{{ reader }} ''') for topic, path in paths.iteritems(): tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers)) r=r+' r%i' % readers # counting the number of sources to append in file s=s+' k%i' % readers # counting the number of sinks c=c+' c%i' % readers # counting the number of sinks readers=readers+1 header=Template('''# Name the components on this agent a1.sources = {{ sources }} a1.sinks = {{ sinks }} a1.channels = {{ channels }} ''') print header.render(sources=r, sinks=s, channels=c) for i in tails: print i def usage(): print """ PURPOSE : Finds logs and configure flume to tail them and ship them to kafka brokers. USAGE : logfinder -b brokers -p logpath OPTIONS : -short, --long <value> description -b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092 -p, --path= <logs directory> where to find the logs that you which to tail -h, --help prints help """ if __name__ == '__main__': try: opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help']) if len(opts) <= 1: usage() sys.exit(2) except getopt.GetoptError: usage() sys.exit(2) for opt, arg in opts: if opt in ('-h', '--help'): usage() sys.exit(2) elif opt in ('-b', '--brokers'): brokers = arg elif opt in ('-p', '--path'): path = arg else: usage() sys.exit(2) path, brokers=finder(path, brokers) agent(path, brokers)