Created
September 21, 2024 14:06
-
-
Save marcusadair/3c13e83b1b7c39ab6f9b355952989cde to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env zsh | |
| ## Snippets for data munging and import to DynamoDB | |
| ## Requires AWS CLI and jq | |
| ## Playground data is CSV with 200k Jeopardy questions found on Kaggle: | |
| ## https://www.kaggle.com/datasets/tunguz/200000-jeopardy-questions | |
| ## DynamoDB local: | |
| ## https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html | |
| ## Snippet highlights: | |
| ## * JQ to convert some JSON format to DynamoDB PutRequest | |
| ## * JQ to convert stream of PutRequest to BatchWriteItem | |
| ## * GNU Parallel to run many DynamoDB import processes | |
| ## * zsh to find the previous log num and increments for the next | |
| ## * zsh to interpolate jq program into subcommand string for GNU Parallel | |
| ## * zsh to set a var from a heredoc without any escaping | |
| ## * zsh to confirm before destructive actions | |
| ## * aws cli with DynamoDB local | |
| ## * aws cli for DynamoDB table management | |
| ## * aws cli for batch writes | |
| emulate -LR zsh -euo pipefail -o extended_glob | |
| basedir=$ZSH_SCRIPT:a:h | |
| if autoload -Uz colors && colors && [[ -t 2 ]] | |
| then okclr=$fg[yellow]; errclr=$fg[red] | |
| else okclr=; errclr=; reset_color= | |
| fi | |
| info() { print "${okclr}$*${reset_color}" >&2 } | |
| error() { print "${errclr}$*${reset_color}" >&2 } | |
| cmd() { | |
| print "$okclr${noop+[NOOP] }Command:$reset_color ${${(@q-)*}}" >&2 | |
| (($+noop)) || (){ emulate -LR zsh; "$@" } "$@" || { | |
| local err=$? | |
| print "${errclr}Command failed (err=$err):$reset_color ${${(@q-)*}}" >&2 | |
| return $err | |
| } | |
| } | |
| confirm() { | |
| info "${1:-"Continue?"} [y/n]" | |
| if ! read -q yes || [[ $yes != y ]] | |
| then error "\nCancelled by user"; return 1 | |
| else info "\nConfirmed"; return 0 | |
| fi | |
| } | |
| # Usage: req-arg $1 ${2:-} | |
| # If $2 is empty, print error and exit | |
| req-arg() { [[ -n ${2:-} ]] || { error "Missing arg: $1"; exit 1} } | |
| ACTIONS=( | |
| create-table | |
| delete-table | |
| import-ndjson | |
| ndjson-to-putreqs | |
| ndjson-to-batches | |
| ) | |
| table=jeopardy200k | |
| duckdb_file=jeopardy200k.duckdb | |
| ndjson_import=jeopardy200k.ndjson | |
| local_port=8000 | |
| processes=8 | |
| unset noop yes max_rows local_dynamodb aws_profile | |
| help() { | |
| <<EOF >&2 | |
| Usage: $ZSH_SCRIPT:t [opts] <cmd> <[cmd] ...> | |
| Commands: | |
| create-table | |
| delete-table | |
| import-ndjson | |
| ndjson-to-putreqs Show the stream of PutRequests instead of import | |
| ndjson-to-batches Show the stream of BatchWriteItem instead of import | |
| Warning: Doesn't properly split into batches of 25, | |
| so use with --max-rows to stay sane | |
| Options: | |
| --noop Dry-run mode. | |
| --yes Auto-confirm actions, maybe destructive | |
| --pretty Format JSON for display (not ndjson compatible) | |
| --max-rows Max import records to import or stream | |
| --table <name> DynamoDB table name | |
| default: $table | |
| --ndjson-import <fnm> NDJSON import filename | |
| default: $ndjson_import | |
| --profile <name> AWS profile to use | |
| default: <none> (for default aws cli behavior) | |
| --local Configured aws cli for DynamoDB local | |
| --local-port Port when using DynamoDB local | |
| default: $local_port | |
| • Files are in the current dir by default. | |
| EOF | |
| } | |
| (($#)) || set -- --help | |
| while (($#)); do case $1 in | |
| -h|--help) help; exit 0 ;; | |
| --noop) noop=; shift ;; | |
| --yes) yes=; shift ;; | |
| --pretty) pretty=; shift ;; | |
| --local) local_dynamodb=; shift ;; | |
| --local-port) req-arg $1 ${2:-}; local_port=$2; shift 2 ;; | |
| -j|--processes) req-arg $1 ${2:-}; processes=$2; shift 2 ;; | |
| --max-rows) req-arg $1 ${2:-}; max_rows=$2; shift 2 ;; | |
| --table) req-arg $1 ${2:-}; table=$2; shift 2 ;; | |
| --ndjson-import) req-arg $1 ${2:-}; ndjson_import=$2; shift 2 ;; | |
| --profile) req-arg $1 ${2:-}; aws_profile=$2; shift 2 ;; | |
| *) ((${ACTIONS[(i)$1]})) && actions+=($1) || { info "Bad arg: $1"; exit 1 } | |
| shift ;; | |
| esac; done | |
| # JQ program to interpolate fields from a record into a DynamoDB PutRequest | |
| # Single-quoted heredoc is to avoid escape anything | |
| () { typeset -g ndjson_to_putreqs_jq=$(cat) } <<'EOF' | |
| { | |
| PutRequest: { | |
| Item: { | |
| shownum: {N: (.shownum | tostring)}, | |
| airdate: {S: .airdate}, | |
| round: {S: .round}, | |
| cat: {S: .cat}, | |
| id: {N: (.id | tostring)}, | |
| amount: (if .amount != null then {N: (.amount | tostring)} else empty end), | |
| q: {S: .q}, | |
| a: {S: .a}, | |
| pk: {S: "question#shownum=\(.shownum)"}, | |
| sk: {S: "\(.cat)#\(.round)#\(.id)"} | |
| } | |
| } | |
| } | |
| EOF | |
| # JQ program to take PutRequest JSONs in a stream and return a BatchWriteItem | |
| # DynamoDB supports up to 25 items per batch (responsibility of caller) | |
| putreqs25_stream_to_batch_jq=$'{ {table}: . }' | |
| # Helper for `aws dynamodb` calls | |
| # Uses --profile <str> if included in args | |
| # If --local then configures itself for DynamoDB local | |
| # Uses --local-port <num> when local | |
| aws_dynamodb_cmd=( | |
| aws dynamodb | |
| --output json | |
| --no-cli-auto-prompt | |
| --no-cli-pager | |
| ) | |
| (($+aws_profile)) && cmd+=(--profile $aws_profile) | |
| if (($+local_dynamodb)); then | |
| aws_dynamodb_cmd+=( | |
| --region local | |
| --endpoint-url "http://localhost:$local_port" | |
| ) | |
| fi | |
| # Var with shell command to BatchWriteItem from JSON on stdin | |
| # Reads the BatchWriteItem JSON from stdin | |
| # Used by import-ndjson in parallel processes to write to DynamoDB | |
| # Used by ndjson-to-batches command | |
| batch_write_item_cmd=( | |
| $aws_dynamodb_cmd batch-write-item | |
| --request-items file:///dev/stdin | |
| ) | |
| # Var with shell command to PutRequest stream and output one BatchWriteItem | |
| # From stdin to stdout | |
| # AWS supports up to 25 PutRequests per batch write (handled by the caller) | |
| # Used by ndjson-to-putreqs command | |
| putreqs25_stream_to_batch_cmd=( | |
| jq -s ${pretty--c} ${putreqs25_stream_to_batch_jq//\{table\}/$table} | |
| ) | |
| create-table() { | |
| local cmd=( | |
| $aws_dynamodb_cmd create-table | |
| --table-name $table | |
| --attribute-definitions | |
| AttributeName=pk,AttributeType=S | |
| AttributeName=sk,AttributeType=S | |
| --key-schema | |
| AttributeName=pk,KeyType=HASH | |
| AttributeName=sk,KeyType=RANGE | |
| --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 | |
| ) | |
| cmd $cmd | |
| } | |
| delete-table() { | |
| (($+local)) || info "Warning: ABOUT TO DELETE REMOTE TABLE!" | |
| (($+yes)) || confirm "Delete DynamoDB table $table?" | |
| local cmd=( $aws_dynamodb_cmd delete-table --table-name $table ) | |
| cmd $cmd | |
| } | |
| # Command: Create PutRequest stream on stdout from NDJSON import file | |
| ndjson-to-putreqs() { | |
| if (($+max_rows)); then | |
| cmd head -$max_rows $ndjson_import | cmd jq ${pretty--c} $ndjson_to_putreqs_jq | |
| else | |
| cmd ${pipe+--pipe} jq ${pretty--c} $ndjson_to_putreqs_jq $ndjson_import | |
| fi | |
| } | |
| # Command: Stream BatchWriteItem JSONs to stdout instead of writing to DynamoDB | |
| ndjson-to-batches() { | |
| ndjson-to-putreqs | cmd $putreqs25_stream_to_batch_cmd | |
| } | |
| # Command: Parallel stream from NDJSON import file to DynamoDB table | |
| import-ndjson() { | |
| local pretty | |
| unset pretty # so we don't break the ndjson stream while importing | |
| local tmpdir="$(mktemp -d)" | |
| local log_prefix="dynamodb.import-ndjson" | |
| local tmp_log_prefix="$tmpdir/$log_prefix" | |
| local prev_log=${${(@)$(echo $log_prefix.<->##.log.json(#qNnOn)):-}[1]} | |
| local run_num=$(( $prev_log:r:r:e + 1 )) | |
| local new_log=$log_prefix.$run_num.log.json | |
| info "import-ndjson | |
| run $run_num | |
| from file $ndjson_import | |
| to table $table | |
| max rows ${max_rows:-None} | |
| process ct $processes | |
| local dynamo ${local_dynamodb+Yes}${local_dynamodb-No} | |
| tmpdir $tmpdir | |
| log $new_log" | |
| (($+local_dynamodb)) || error "Warning: Importing to remote DynamoDB table!" | |
| (($+yes)) || confirm "Import to table $table?" || exit 1 | |
| local subprocess_cmd="" | |
| subprocess_cmd+=${(q-@)putreqs25_stream_to_batch_cmd} | |
| subprocess_cmd+=" | ${(q-@)batch_write_item_cmd}" | |
| subprocess_cmd+=" | jq -c . > ${(q-)tmp_log_prefix}{#}.json" | |
| local parallel_cmd=( | |
| parallel | |
| -j$processes | |
| --progress | |
| --halt-on-error now,fail=1 | |
| -N25 | |
| $subprocess_cmd | |
| ) | |
| ndjson-to-putreqs | cmd $parallel_cmd | |
| info "Finished writing to table $table" | |
| info "Gathering remote logs to $new_log" | |
| cmd cat $tmp_log_prefix*.json(#qN) > $new_log | |
| info Done | |
| } | |
| for action in $actions; do | |
| $action | |
| done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment