Skip to content

Instantly share code, notes, and snippets.

@marcusadair
Created September 21, 2024 14:06
Show Gist options
  • Select an option

  • Save marcusadair/3c13e83b1b7c39ab6f9b355952989cde to your computer and use it in GitHub Desktop.

Select an option

Save marcusadair/3c13e83b1b7c39ab6f9b355952989cde to your computer and use it in GitHub Desktop.
#!/usr/bin/env zsh
## Snippets for data munging and import to DynamoDB
## Requires AWS CLI and jq
## Playground data is CSV with 200k Jeopardy questions found on Kaggle:
## https://www.kaggle.com/datasets/tunguz/200000-jeopardy-questions
## DynamoDB local:
## https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html
## Snippet highlights:
## * JQ to convert some JSON format to DynamoDB PutRequest
## * JQ to convert stream of PutRequest to BatchWriteItem
## * GNU Parallel to run many DynamoDB import processes
## * zsh to find the previous log num and increments for the next
## * zsh to interpolate jq program into subcommand string for GNU Parallel
## * zsh to set a var from a heredoc without any escaping
## * zsh to confirm before destructive actions
## * aws cli with DynamoDB local
## * aws cli for DynamoDB table management
## * aws cli for batch writes
emulate -LR zsh -euo pipefail -o extended_glob
basedir=$ZSH_SCRIPT:a:h
if autoload -Uz colors && colors && [[ -t 2 ]]
then okclr=$fg[yellow]; errclr=$fg[red]
else okclr=; errclr=; reset_color=
fi
info() { print "${okclr}$*${reset_color}" >&2 }
error() { print "${errclr}$*${reset_color}" >&2 }
cmd() {
print "$okclr${noop+[NOOP] }Command:$reset_color ${${(@q-)*}}" >&2
(($+noop)) || (){ emulate -LR zsh; "$@" } "$@" || {
local err=$?
print "${errclr}Command failed (err=$err):$reset_color ${${(@q-)*}}" >&2
return $err
}
}
confirm() {
info "${1:-"Continue?"} [y/n]"
if ! read -q yes || [[ $yes != y ]]
then error "\nCancelled by user"; return 1
else info "\nConfirmed"; return 0
fi
}
# Usage: req-arg $1 ${2:-}
# If $2 is empty, print error and exit
req-arg() { [[ -n ${2:-} ]] || { error "Missing arg: $1"; exit 1} }
ACTIONS=(
create-table
delete-table
import-ndjson
ndjson-to-putreqs
ndjson-to-batches
)
table=jeopardy200k
duckdb_file=jeopardy200k.duckdb
ndjson_import=jeopardy200k.ndjson
local_port=8000
processes=8
unset noop yes max_rows local_dynamodb aws_profile
help() {
<<EOF >&2
Usage: $ZSH_SCRIPT:t [opts] <cmd> <[cmd] ...>
Commands:
create-table
delete-table
import-ndjson
ndjson-to-putreqs Show the stream of PutRequests instead of import
ndjson-to-batches Show the stream of BatchWriteItem instead of import
Warning: Doesn't properly split into batches of 25,
so use with --max-rows to stay sane
Options:
--noop Dry-run mode.
--yes Auto-confirm actions, maybe destructive
--pretty Format JSON for display (not ndjson compatible)
--max-rows Max import records to import or stream
--table <name> DynamoDB table name
default: $table
--ndjson-import <fnm> NDJSON import filename
default: $ndjson_import
--profile <name> AWS profile to use
default: <none> (for default aws cli behavior)
--local Configured aws cli for DynamoDB local
--local-port Port when using DynamoDB local
default: $local_port
• Files are in the current dir by default.
EOF
}
(($#)) || set -- --help
while (($#)); do case $1 in
-h|--help) help; exit 0 ;;
--noop) noop=; shift ;;
--yes) yes=; shift ;;
--pretty) pretty=; shift ;;
--local) local_dynamodb=; shift ;;
--local-port) req-arg $1 ${2:-}; local_port=$2; shift 2 ;;
-j|--processes) req-arg $1 ${2:-}; processes=$2; shift 2 ;;
--max-rows) req-arg $1 ${2:-}; max_rows=$2; shift 2 ;;
--table) req-arg $1 ${2:-}; table=$2; shift 2 ;;
--ndjson-import) req-arg $1 ${2:-}; ndjson_import=$2; shift 2 ;;
--profile) req-arg $1 ${2:-}; aws_profile=$2; shift 2 ;;
*) ((${ACTIONS[(i)$1]})) && actions+=($1) || { info "Bad arg: $1"; exit 1 }
shift ;;
esac; done
# JQ program to interpolate fields from a record into a DynamoDB PutRequest
# Single-quoted heredoc is to avoid escape anything
() { typeset -g ndjson_to_putreqs_jq=$(cat) } <<'EOF'
{
PutRequest: {
Item: {
shownum: {N: (.shownum | tostring)},
airdate: {S: .airdate},
round: {S: .round},
cat: {S: .cat},
id: {N: (.id | tostring)},
amount: (if .amount != null then {N: (.amount | tostring)} else empty end),
q: {S: .q},
a: {S: .a},
pk: {S: "question#shownum=\(.shownum)"},
sk: {S: "\(.cat)#\(.round)#\(.id)"}
}
}
}
EOF
# JQ program to take PutRequest JSONs in a stream and return a BatchWriteItem
# DynamoDB supports up to 25 items per batch (responsibility of caller)
putreqs25_stream_to_batch_jq=$'{ {table}: . }'
# Helper for `aws dynamodb` calls
# Uses --profile <str> if included in args
# If --local then configures itself for DynamoDB local
# Uses --local-port <num> when local
aws_dynamodb_cmd=(
aws dynamodb
--output json
--no-cli-auto-prompt
--no-cli-pager
)
(($+aws_profile)) && cmd+=(--profile $aws_profile)
if (($+local_dynamodb)); then
aws_dynamodb_cmd+=(
--region local
--endpoint-url "http://localhost:$local_port"
)
fi
# Var with shell command to BatchWriteItem from JSON on stdin
# Reads the BatchWriteItem JSON from stdin
# Used by import-ndjson in parallel processes to write to DynamoDB
# Used by ndjson-to-batches command
batch_write_item_cmd=(
$aws_dynamodb_cmd batch-write-item
--request-items file:///dev/stdin
)
# Var with shell command to PutRequest stream and output one BatchWriteItem
# From stdin to stdout
# AWS supports up to 25 PutRequests per batch write (handled by the caller)
# Used by ndjson-to-putreqs command
putreqs25_stream_to_batch_cmd=(
jq -s ${pretty--c} ${putreqs25_stream_to_batch_jq//\{table\}/$table}
)
create-table() {
local cmd=(
$aws_dynamodb_cmd create-table
--table-name $table
--attribute-definitions
AttributeName=pk,AttributeType=S
AttributeName=sk,AttributeType=S
--key-schema
AttributeName=pk,KeyType=HASH
AttributeName=sk,KeyType=RANGE
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
)
cmd $cmd
}
delete-table() {
(($+local)) || info "Warning: ABOUT TO DELETE REMOTE TABLE!"
(($+yes)) || confirm "Delete DynamoDB table $table?"
local cmd=( $aws_dynamodb_cmd delete-table --table-name $table )
cmd $cmd
}
# Command: Create PutRequest stream on stdout from NDJSON import file
ndjson-to-putreqs() {
if (($+max_rows)); then
cmd head -$max_rows $ndjson_import | cmd jq ${pretty--c} $ndjson_to_putreqs_jq
else
cmd ${pipe+--pipe} jq ${pretty--c} $ndjson_to_putreqs_jq $ndjson_import
fi
}
# Command: Stream BatchWriteItem JSONs to stdout instead of writing to DynamoDB
ndjson-to-batches() {
ndjson-to-putreqs | cmd $putreqs25_stream_to_batch_cmd
}
# Command: Parallel stream from NDJSON import file to DynamoDB table
import-ndjson() {
local pretty
unset pretty # so we don't break the ndjson stream while importing
local tmpdir="$(mktemp -d)"
local log_prefix="dynamodb.import-ndjson"
local tmp_log_prefix="$tmpdir/$log_prefix"
local prev_log=${${(@)$(echo $log_prefix.<->##.log.json(#qNnOn)):-}[1]}
local run_num=$(( $prev_log:r:r:e + 1 ))
local new_log=$log_prefix.$run_num.log.json
info "import-ndjson
run $run_num
from file $ndjson_import
to table $table
max rows ${max_rows:-None}
process ct $processes
local dynamo ${local_dynamodb+Yes}${local_dynamodb-No}
tmpdir $tmpdir
log $new_log"
(($+local_dynamodb)) || error "Warning: Importing to remote DynamoDB table!"
(($+yes)) || confirm "Import to table $table?" || exit 1
local subprocess_cmd=""
subprocess_cmd+=${(q-@)putreqs25_stream_to_batch_cmd}
subprocess_cmd+=" | ${(q-@)batch_write_item_cmd}"
subprocess_cmd+=" | jq -c . > ${(q-)tmp_log_prefix}{#}.json"
local parallel_cmd=(
parallel
-j$processes
--progress
--halt-on-error now,fail=1
-N25
$subprocess_cmd
)
ndjson-to-putreqs | cmd $parallel_cmd
info "Finished writing to table $table"
info "Gathering remote logs to $new_log"
cmd cat $tmp_log_prefix*.json(#qN) > $new_log
info Done
}
for action in $actions; do
$action
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment