import dateutil.parser import optparse import csv import tempodb from threading import Thread from Queue import Queue class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def main(): # This script assumes that the input file is sorted by key parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1") parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME") parser.add_option("-k", "--key", dest="key", help="tempodb database key") parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret") parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host") parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port") parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure") (options, args) = parser.parse_args() if not options.filename: parser.error("Enter a file to read from.") ########################################################################## # Create a connection to TempoDB client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure) # Init a Thread pool with the desired number of threads pool = ThreadPool(3) # Define how many records to send in each batch batch_size = 60 # Begin parsing and enqueing data in_filename = options.filename with open(in_filename) as source_file: reader = csv.reader(source_file) # Discover how many series exist in this file line_one = reader.next() column_count = len(line_one) # Reset to the beginning of the file for futher reading source_file.seek(0) series = {} for index in range(column_count): series['series.' + str(index)] = [] # Process each line for line in reader: for i, value in enumerate(line): if i == 0: # Pull out and store the timestamp for this line and move on input_date = dateutil.parser.parse(value) continue else: # Regenerate our series name series_name = 'series.'+ str(i) if len(series[series_name]) >= batch_size: # Send to TempoDB when we have some lines and flush the buffer pool.add_task(client.write_key, series_name, series[series_name]) series[series_name] = [] else: # Add this new value to the relevant series series[series_name].append(tempodb.DataPoint(input_date, float(value))) # Pick up any scraps for series_name, data in series.iteritems(): if len(data) > 0: pool.add_task(client.write_key, series_name, data) # Wait for completion pool.wait_completion() if __name__ == '__main__': main()