Last active
December 21, 2017 19:03
-
-
Save lanre-ade/cbb9775b694b06f01e5784356090709b to your computer and use it in GitHub Desktop.
Concurrently list items in large s3 bucket/path with millions of objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bytes" | |
| "flag" | |
| "fmt" | |
| "log" | |
| "os" | |
| "regexp" | |
| "strconv" | |
| "sync" | |
| "time" | |
| "github.com/aws/aws-sdk-go/aws" | |
| "github.com/aws/aws-sdk-go/aws/session" | |
| "github.com/aws/aws-sdk-go/service/s3" | |
| ) | |
| var prefixes = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "A", "b", "B", "c", "C", "d", "D", "e", "E", "f", "F", "g", "G", "h", "H", "i", "I", "j", "J", "k", "K", "l", "L", "m", "M", "n", "N", "o", "O", "p", "P", "q", "Q", "r", "R", "s", "S", "t", "T", "u", "U", "n", "N", "w", "W", "x", "X", "y", "Y", "z", "Z"} | |
| func check(e error) { | |
| if e != nil { | |
| panic(e) | |
| } | |
| } | |
| func main() { | |
| var bucket, region, path, regx, out string | |
| startTime := time.Now() | |
| defaultOut := "listing-" + strconv.Itoa(int(startTime.Unix())) + ".log" | |
| flag.StringVar(®ion, "region", "us-east-1", "AWS region") | |
| flag.StringVar(&bucket, "bucket", "s3-bucket", "Bucket name") | |
| flag.StringVar(&path, "path", "", "Path to recurse under") | |
| flag.StringVar(®x, "regx", "", "Regx string for pattern matching") | |
| flag.StringVar(&out, "out", defaultOut, "filename or absolute path to file for listing output") | |
| flag.Parse() | |
| var counter int64 | |
| var mutx sync.Mutex | |
| var listingFile *os.File | |
| regxCompiled := regexp.MustCompile(regx) // e.g regexp.MustCompile(`.*.mp4$`) | |
| pending := sync.WaitGroup{} | |
| svc := s3.New(session.New(), &aws.Config{ | |
| Region: aws.String(region), | |
| }) | |
| // logging | |
| listingFile, fileErr := os.Create(out) | |
| check(fileErr) | |
| defer listingFile.Close() | |
| go func() { // 30 second periodic updates | |
| for { | |
| log.Println("Elapsed time: ", time.Since(startTime).Truncate(time.Second), ", Processed files: ", counter) | |
| time.Sleep(30 * time.Second) | |
| } | |
| }() | |
| handleS3Listing := func(p *s3.ListObjectsOutput, lp bool) { | |
| var buffer bytes.Buffer | |
| var localCount int64 | |
| pending.Add(1) | |
| defer pending.Done() | |
| for _, object := range p.Contents { // extract matches | |
| key := *object.Key | |
| if regx == "" || regxCompiled.MatchString(key) { | |
| buffer.WriteString(key + "\n") | |
| localCount++ | |
| } | |
| } | |
| mutx.Lock() | |
| counter += localCount | |
| _, err := listingFile.Write(buffer.Bytes()) | |
| mutx.Unlock() | |
| check(err) | |
| } | |
| for i := 0; i < len(prefixes); i++ { | |
| go func(prefix string) { | |
| pending.Add(1) | |
| defer pending.Done() | |
| err := svc.ListObjectsPages(&s3.ListObjectsInput{ | |
| MaxKeys: aws.Int64(1000), | |
| Bucket: aws.String(bucket), | |
| Prefix: aws.String(path + prefix), | |
| }, func(page *s3.ListObjectsOutput, lastPage bool) bool { | |
| go handleS3Listing(page, lastPage) | |
| return true | |
| }) | |
| check(err) | |
| }(prefixes[i]) | |
| } | |
| pending.Wait() | |
| fmt.Println(" - ") | |
| log.Println("Total matching files: ", counter) | |
| log.Println("Elapsed time: ", time.Since(startTime)) | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
NOTE$ cat out.log | sort >> sorted.logTODO