Skip to content

Instantly share code, notes, and snippets.

@copernicus
Forked from nukemberg/s3_multicopy.py
Created February 12, 2012 13:44
Show Gist options
  • Select an option

  • Save copernicus/1808631 to your computer and use it in GitHub Desktop.

Select an option

Save copernicus/1808631 to your computer and use it in GitHub Desktop.

Revisions

  1. Avishai Ish-Shalom created this gist Feb 1, 2012.
    76 changes: 76 additions & 0 deletions s3_multicopy.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    #! /usr/bin/python

    import threading, Queue
    import boto, sys, time
    import argparse
    import logging

    parser = argparse.ArgumentParser(description="Multithreaded mass copier for Amazon S3")
    parser.add_argument("-s", help="Source bucket", dest="src_bucket", type=str, required=True)
    parser.add_argument("-d", help="Destination bucket", dest="dst_bucket", type=str, required=True)
    parser.add_argument("-n", help="Number of copier threads", dest="num_threads", type=int, default=10)
    parser.add_argument("-l", help="Logging level", dest="log_level", type=str, default="WARN")
    parser.add_argument("-L", help="Logging file, default is STDOUT", dest="log_dest", type=str, default="STDOUT")

    options = parser.parse_args()
    logger = logging.getLogger("S3 MultiThreadedCopy")
    if options.log_dest == "STDOUT":
    formatter = logging.Formatter('%(name)-2s: %(levelname)-8s %(message)s')
    log_dst = logging.StreamHandler(sys.stdout)
    log_dst.setFormatter(formatter)
    else:
    log_dst = logging.StreamHandler(options.log_dst)
    log_dst.setLevel(getattr(logging, options.log_level.upper()))
    logger.addHandler(log_dst)
    logger.setLevel(getattr(logging, options.log_level.upper()))

    logger.info("Connecting to S3")
    s3 = boto.connect_s3()
    logger.warn("copying from %s to %s" % (options.src_bucket, options.dst_bucket))

    def get_q_worker(dst_bucket, q):
    def thread_worker():
    while not q.empty():
    try:
    k = q.get(True, 1)
    except Queue.Empty:
    logger.warn("Queue is empty")
    return
    logger.info("Copying %s" % k.name)
    try:
    k.copy(dst_bucket, k.name)
    except Exception as e:
    logger.error("Error while copying %s: %s" % (k.name, e))
    q.task_done()
    return thread_worker


    def fill_queue(src_bucket, q):
    for k in src_bucket.list(): q.put(k)

    q = Queue.Queue()

    src_bucket = s3.get_bucket(options.src_bucket)
    src_listing = threading.Thread(target=lambda: fill_queue(src_bucket, q))
    src_listing.start()

    workers = []
    logger.info("Wating for keys from source bucket listing...")
    while q.empty():
    time.sleep(1)

    for i in range(options.num_threads):
    worker = threading.Thread(target=get_q_worker(options.dst_bucket, q))
    logger.info("startning worker %d" % i)
    worker.daemon = True
    worker.start()
    workers.append(worker)

    src_listing.join()
    logger.info("Done listing source")
    while not q.empty():
    print "Q size is %d" % q.qsize()
    time.sleep(5)

    q.join()
    print "Done"