#! /usr/bin/python import threading, Queue import boto, sys, time import argparse import logging parser = argparse.ArgumentParser(description="Multithreaded mass copier for Amazon S3") parser.add_argument("-s", help="Source bucket", dest="src_bucket", type=str, required=True) parser.add_argument("-d", help="Destination bucket", dest="dst_bucket", type=str, required=True) parser.add_argument("-n", help="Number of copier threads", dest="num_threads", type=int, default=10) parser.add_argument("-l", help="Logging level", dest="log_level", type=str, default="WARN") parser.add_argument("-L", help="Logging file, default is STDOUT", dest="log_dest", type=str, default="STDOUT") options = parser.parse_args() logger = logging.getLogger("S3 MultiThreadedCopy") if options.log_dest == "STDOUT": formatter = logging.Formatter('%(name)-2s: %(levelname)-8s %(message)s') log_dst = logging.StreamHandler(sys.stdout) log_dst.setFormatter(formatter) else: log_dst = logging.StreamHandler(options.log_dst) log_dst.setLevel(getattr(logging, options.log_level.upper())) logger.addHandler(log_dst) logger.setLevel(getattr(logging, options.log_level.upper())) logger.info("Connecting to S3") s3 = boto.connect_s3() logger.warn("copying from %s to %s" % (options.src_bucket, options.dst_bucket)) def get_q_worker(dst_bucket, q): def thread_worker(): while not q.empty(): try: k = q.get(True, 1) except Queue.Empty: logger.warn("Queue is empty") return logger.info("Copying %s" % k.name) try: k.copy(dst_bucket, k.name) except Exception as e: logger.error("Error while copying %s: %s" % (k.name, e)) q.task_done() return thread_worker def fill_queue(src_bucket, q): for k in src_bucket.list(): q.put(k) q = Queue.Queue() src_bucket = s3.get_bucket(options.src_bucket) src_listing = threading.Thread(target=lambda: fill_queue(src_bucket, q)) src_listing.start() workers = [] logger.info("Wating for keys from source bucket listing...") while q.empty(): time.sleep(1) for i in range(options.num_threads): worker = threading.Thread(target=get_q_worker(options.dst_bucket, q)) logger.info("startning worker %d" % i) worker.daemon = True worker.start() workers.append(worker) src_listing.join() logger.info("Done listing source") while not q.empty(): print "Q size is %d" % q.qsize() time.sleep(5) q.join() print "Done"