@@ -0,0 +1,76 @@
#! /usr/bin/python
import threading , Queue
import boto , sys , time
import argparse
import logging
parser = argparse .ArgumentParser (description = "Multithreaded mass copier for Amazon S3" )
parser .add_argument ("-s" , help = "Source bucket" , dest = "src_bucket" , type = str , required = True )
parser .add_argument ("-d" , help = "Destination bucket" , dest = "dst_bucket" , type = str , required = True )
parser .add_argument ("-n" , help = "Number of copier threads" , dest = "num_threads" , type = int , default = 10 )
parser .add_argument ("-l" , help = "Logging level" , dest = "log_level" , type = str , default = "WARN" )
parser .add_argument ("-L" , help = "Logging file, default is STDOUT" , dest = "log_dest" , type = str , default = "STDOUT" )
options = parser .parse_args ()
logger = logging .getLogger ("S3 MultiThreadedCopy" )
if options .log_dest == "STDOUT" :
formatter = logging .Formatter ('%(name)-2s: %(levelname)-8s %(message)s' )
log_dst = logging .StreamHandler (sys .stdout )
log_dst .setFormatter (formatter )
else :
log_dst = logging .StreamHandler (options .log_dst )
log_dst .setLevel (getattr (logging , options .log_level .upper ()))
logger .addHandler (log_dst )
logger .setLevel (getattr (logging , options .log_level .upper ()))
logger .info ("Connecting to S3" )
s3 = boto .connect_s3 ()
logger .warn ("copying from %s to %s" % (options .src_bucket , options .dst_bucket ))
def get_q_worker (dst_bucket , q ):
def thread_worker ():
while not q .empty ():
try :
k = q .get (True , 1 )
except Queue .Empty :
logger .warn ("Queue is empty" )
return
logger .info ("Copying %s" % k .name )
try :
k .copy (dst_bucket , k .name )
except Exception as e :
logger .error ("Error while copying %s: %s" % (k .name , e ))
q .task_done ()
return thread_worker
def fill_queue (src_bucket , q ):
for k in src_bucket .list (): q .put (k )
q = Queue .Queue ()
src_bucket = s3 .get_bucket (options .src_bucket )
src_listing = threading .Thread (target = lambda : fill_queue (src_bucket , q ))
src_listing .start ()
workers = []
logger .info ("Wating for keys from source bucket listing..." )
while q .empty ():
time .sleep (1 )
for i in range (options .num_threads ):
worker = threading .Thread (target = get_q_worker (options .dst_bucket , q ))
logger .info ("startning worker %d" % i )
worker .daemon = True
worker .start ()
workers .append (worker )
src_listing .join ()
logger .info ("Done listing source" )
while not q .empty ():
print "Q size is %d" % q .qsize ()
time .sleep (5 )
q .join ()
print "Done"