Skip to content

Instantly share code, notes, and snippets.

@lanre-ade
Last active December 21, 2017 19:03
Show Gist options
  • Select an option

  • Save lanre-ade/cbb9775b694b06f01e5784356090709b to your computer and use it in GitHub Desktop.

Select an option

Save lanre-ade/cbb9775b694b06f01e5784356090709b to your computer and use it in GitHub Desktop.
Concurrently list items in large s3 bucket/path with millions of objects
package main
import (
"bytes"
"flag"
"fmt"
"log"
"os"
"regexp"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
var prefixes = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "A", "b", "B", "c", "C", "d", "D", "e", "E", "f", "F", "g", "G", "h", "H", "i", "I", "j", "J", "k", "K", "l", "L", "m", "M", "n", "N", "o", "O", "p", "P", "q", "Q", "r", "R", "s", "S", "t", "T", "u", "U", "n", "N", "w", "W", "x", "X", "y", "Y", "z", "Z"}
func check(e error) {
if e != nil {
panic(e)
}
}
func main() {
var bucket, region, path, regx, out string
startTime := time.Now()
defaultOut := "listing-" + strconv.Itoa(int(startTime.Unix())) + ".log"
flag.StringVar(&region, "region", "us-east-1", "AWS region")
flag.StringVar(&bucket, "bucket", "s3-bucket", "Bucket name")
flag.StringVar(&path, "path", "", "Path to recurse under")
flag.StringVar(&regx, "regx", "", "Regx string for pattern matching")
flag.StringVar(&out, "out", defaultOut, "filename or absolute path to file for listing output")
flag.Parse()
var counter int64
var mutx sync.Mutex
var listingFile *os.File
regxCompiled := regexp.MustCompile(regx) // e.g regexp.MustCompile(`.*.mp4$`)
pending := sync.WaitGroup{}
svc := s3.New(session.New(), &aws.Config{
Region: aws.String(region),
})
// logging
listingFile, fileErr := os.Create(out)
check(fileErr)
defer listingFile.Close()
go func() { // 30 second periodic updates
for {
log.Println("Elapsed time: ", time.Since(startTime).Truncate(time.Second), ", Processed files: ", counter)
time.Sleep(30 * time.Second)
}
}()
handleS3Listing := func(p *s3.ListObjectsOutput, lp bool) {
var buffer bytes.Buffer
var localCount int64
pending.Add(1)
defer pending.Done()
for _, object := range p.Contents { // extract matches
key := *object.Key
if regx == "" || regxCompiled.MatchString(key) {
buffer.WriteString(key + "\n")
localCount++
}
}
mutx.Lock()
counter += localCount
_, err := listingFile.Write(buffer.Bytes())
mutx.Unlock()
check(err)
}
for i := 0; i < len(prefixes); i++ {
go func(prefix string) {
pending.Add(1)
defer pending.Done()
err := svc.ListObjectsPages(&s3.ListObjectsInput{
MaxKeys: aws.Int64(1000),
Bucket: aws.String(bucket),
Prefix: aws.String(path + prefix),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
go handleS3Listing(page, lastPage)
return true
})
check(err)
}(prefixes[i])
}
pending.Wait()
fmt.Println(" - ")
log.Println("Total matching files: ", counter)
log.Println("Elapsed time: ", time.Since(startTime))
}
@lanre-ade
Copy link
Author

lanre-ade commented Dec 21, 2017

NOTE

  • This assumes all filenames start with alphanumeric characters
  • To sort output file -> $ cat out.log | sort >> sorted.log

TODO

  • limit pool of goroutines

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment