|
#!/usr/bin/env bash |
|
# PreCompact/SessionEnd hook: Upload session JSONL delta to Cloudflare R2. |
|
# |
|
# Reads hook input JSON from stdin: |
|
# {"transcript_path": "...", "cwd": "..."} |
|
# |
|
# Flags: |
|
# --async Fork upload to background (PreCompact). Parent exits immediately. |
|
# Delta is saved to pending dir; manifest enables retry on failure. |
|
# (default) Synchronous upload (SessionEnd). Also retries any pending uploads. |
|
# |
|
# Environment variables (pre-loaded via sops-secrets shell integration or set manually): |
|
# SESSION_R2_BUCKET (required) R2 bucket name |
|
# R2_S3_ACCESS_KEY_ID (required) R2 S3-compatible Access Key ID |
|
# R2_S3_SECRET_ACCESS_KEY (required) R2 S3-compatible Secret Access Key |
|
# R2_S3_ENDPOINT (required) R2 S3 endpoint URL |
|
set -euo pipefail |
|
|
|
ASYNC_MODE=false |
|
if [[ "${1:-}" == "--async" ]]; then |
|
ASYNC_MODE=true |
|
fi |
|
|
|
# --- Ensure HOME is set (hook env may not have it) --- |
|
export HOME="${HOME:-$(eval echo ~$(whoami))}" |
|
|
|
# --- Load minimal env (PATH for node/npx, env vars) --- |
|
if [ -x "$HOME/.local/bin/mise" ]; then |
|
eval "$("$HOME/.local/bin/mise" env -s bash 2>/dev/null)" || true |
|
fi |
|
|
|
# --- Load secrets from sops if not already in env --- |
|
SCRIPT_REAL="$(readlink -f "$0" 2>/dev/null || python3 -c "import os; print(os.path.realpath('$0'))")" |
|
SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_REAL")" && pwd)" |
|
# Resolve sops secrets: repo checkout first, then skill-relative fallback |
|
SOPS_SECRETS="" |
|
for _candidate in \ |
|
"${SCRIPT_DIR}/../../secrets/agents.enc.env" \ |
|
"$HOME/.agents/skills/secrets/agents.enc.env" \ |
|
; do |
|
[ -f "$_candidate" ] && SOPS_SECRETS="$_candidate" && break |
|
done |
|
unset _candidate |
|
if [ -z "${R2_S3_ACCESS_KEY_ID:-}" ] && [ -n "$SOPS_SECRETS" ] && command -v sops &>/dev/null; then |
|
export SOPS_AGE_KEY_FILE="${SOPS_AGE_KEY_FILE:-$HOME/.config/sops/age/keys.txt}" |
|
while IFS='=' read -r _k _v; do |
|
[ -z "$_k" ] && continue |
|
export "$_k=$_v" |
|
done < <(sops decrypt --input-type dotenv --output-type dotenv "$SOPS_SECRETS" 2>/dev/null) |
|
unset _k _v |
|
fi |
|
|
|
if [ -z "${R2_S3_ACCESS_KEY_ID:-}" ] || [ -z "${R2_S3_SECRET_ACCESS_KEY:-}" ] || [ -z "${R2_S3_ENDPOINT:-}" ]; then |
|
echo "R2 S3 credentials not available (env or sops), skipping" >&2 |
|
exit 0 |
|
fi |
|
|
|
# --- R2 upload helper (delegates to shared r2-s3v4.sh) --- |
|
R2_SCRIPT="$(dirname "$(readlink -f "$0" 2>/dev/null || python3 -c "import os; print(os.path.realpath('$0'))")")/../../scripts/r2-s3v4.sh" |
|
if [ ! -x "$R2_SCRIPT" ]; then |
|
R2_SCRIPT="$HOME/.agents/scripts/r2-s3v4.sh" |
|
fi |
|
[ -x "$R2_SCRIPT" ] || { echo "r2-s3v4.sh not found" >&2; exit 1; } |
|
|
|
_s3_put() { |
|
"$R2_SCRIPT" put "$1" "$2" "$3" |
|
} |
|
|
|
# --- Parse hook input --- |
|
INPUT=$(cat) |
|
read -r TRANSCRIPT_PATH CWD <<< "$(echo "$INPUT" | python3 -c " |
|
import sys, json |
|
d = json.loads(sys.stdin.read()) |
|
print(d.get('transcript_path', ''), d.get('cwd', '.')) |
|
" 2>/dev/null || echo "")" |
|
CWD="${CWD:-.}" |
|
|
|
if [ -z "$TRANSCRIPT_PATH" ] || [ ! -f "$TRANSCRIPT_PATH" ]; then |
|
echo "No transcript found, skipping" >&2 |
|
exit 0 |
|
fi |
|
|
|
export SESSION_R2_BUCKET="${SESSION_R2_BUCKET:-claude-sessions}" |
|
if [ -z "${SESSION_R2_BUCKET:-}" ]; then |
|
echo "SESSION_R2_BUCKET not set, skipping" >&2 |
|
exit 0 |
|
fi |
|
|
|
# --- Derive session ID --- |
|
SESSION_FILE=$(basename "$TRANSCRIPT_PATH" .jsonl) |
|
SESSION_ID="${SESSION_FILE:0:12}" |
|
|
|
# --- Derive repo info --- |
|
REPO_PREFIX="_noproject" |
|
if git -C "$CWD" rev-parse --is-inside-work-tree &>/dev/null 2>&1; then |
|
REMOTE=$(git -C "$CWD" remote get-url origin 2>/dev/null || true) |
|
if [ -n "$REMOTE" ]; then |
|
REPO_PREFIX=$(echo "$REMOTE" | sed -E 's#.*[:/]([^/]+)/([^/.]+)(\.git)?$#\1/\2#') |
|
fi |
|
fi |
|
|
|
# --- Directories --- |
|
OFFSET_DIR="$HOME/.claude/session-r2-offsets" |
|
PENDING_DIR="$HOME/.claude/session-r2-pending" |
|
TMPDIR_SAFE="$HOME/.claude/tmp" |
|
mkdir -p "$OFFSET_DIR" "$PENDING_DIR" "$TMPDIR_SAFE" |
|
|
|
OFFSET_FILE="$OFFSET_DIR/${SESSION_FILE}.offset" |
|
|
|
# --- [rb-001] Lock offset file to prevent race conditions --- |
|
# Use mkdir-based lock (portable: works on macOS and Linux without flock). |
|
# Store owning PID inside the lock dir for liveness checks. |
|
LOCK_DIR="$OFFSET_FILE.lk" |
|
_acquire_lock() { |
|
if mkdir "$LOCK_DIR" 2>/dev/null; then |
|
echo $$ > "$LOCK_DIR/pid" |
|
return 0 |
|
fi |
|
return 1 |
|
} |
|
_release_lock() { |
|
# Only release if we still own it (guard against stale-recovery stealing) |
|
if [ -f "$LOCK_DIR/pid" ] && [ "$(cat "$LOCK_DIR/pid" 2>/dev/null)" = "$$" ]; then |
|
rm -f "$LOCK_DIR/pid" |
|
rmdir "$LOCK_DIR" 2>/dev/null || true |
|
fi |
|
} |
|
|
|
if ! _acquire_lock; then |
|
if [ -d "$LOCK_DIR" ]; then |
|
# Check if owning process is still alive |
|
OWNER_PID=$(cat "$LOCK_DIR/pid" 2>/dev/null || echo "") |
|
if [ -n "$OWNER_PID" ] && kill -0 "$OWNER_PID" 2>/dev/null; then |
|
echo "Another upload in progress (pid=$OWNER_PID), skipping" >&2 |
|
exit 0 |
|
fi |
|
# Owner is dead — check age as additional safety |
|
LOCK_AGE=$(( $(date +%s) - $(python3 -c "import os; print(int(os.stat('$LOCK_DIR').st_mtime))" 2>/dev/null || echo 0) )) |
|
if [ "$LOCK_AGE" -gt 60 ]; then |
|
echo "Removing stale lock (pid=$OWNER_PID, age=${LOCK_AGE}s)" >&2 |
|
# Atomically claim the stale lock via mv (rb-007: prevents TOCTOU double-acquisition) |
|
STALE_DIR="$LOCK_DIR.stale.$$" |
|
if mv "$LOCK_DIR" "$STALE_DIR" 2>/dev/null; then |
|
rm -rf "$STALE_DIR" |
|
if ! _acquire_lock; then |
|
echo "Another upload in progress, skipping" >&2 |
|
exit 0 |
|
fi |
|
else |
|
echo "Another process already claimed stale lock, skipping" >&2 |
|
exit 0 |
|
fi |
|
else |
|
echo "Lock held by recently exited process, skipping" >&2 |
|
exit 0 |
|
fi |
|
else |
|
# mkdir failed but LOCK_DIR is not a directory (e.g., permission error, ENOSPC, regular file) |
|
echo "Lock acquisition failed (not a directory collision), skipping" >&2 |
|
exit 0 |
|
fi |
|
fi |
|
# Default EXIT trap: release lock on any exit path. |
|
# The sync path overrides this to also clean up TMPFILE. |
|
trap '_release_lock' EXIT |
|
|
|
PREV_OFFSET=0 |
|
if [ -f "$OFFSET_FILE" ]; then |
|
PREV_OFFSET=$(cat "$OFFSET_FILE") |
|
fi |
|
|
|
CURRENT_SIZE=$(wc -c < "$TRANSCRIPT_PATH" | tr -d ' ') |
|
|
|
if [ "$CURRENT_SIZE" -le "$PREV_OFFSET" ]; then |
|
echo "No new data (offset=$PREV_OFFSET, size=$CURRENT_SIZE), skipping" >&2 |
|
exit 0 |
|
fi |
|
|
|
DELTA_SIZE=$((CURRENT_SIZE - PREV_OFFSET)) |
|
_log() { echo "$*" >&2; echo "$*" > /dev/tty 2>/dev/null || true; } |
|
_log "🚀 Uploading delta: offset=$PREV_OFFSET size=$DELTA_SIZE" |
|
|
|
# --- Extract delta to temp file --- |
|
TMPFILE=$(mktemp "$TMPDIR_SAFE/session-to-r2.XXXXXX") |
|
mv "$TMPFILE" "${TMPFILE}.jsonl" |
|
TMPFILE="${TMPFILE}.jsonl" |
|
tail -c +"$((PREV_OFFSET + 1))" "$TRANSCRIPT_PATH" | head -c "$DELTA_SIZE" > "$TMPFILE" |
|
|
|
# --- Build R2 object key --- |
|
TIMESTAMP=$(date +"%Y%m%d_%H-%M-%S") |
|
R2_KEY="${REPO_PREFIX}/${SESSION_ID}/${TIMESTAMP}_${PREV_OFFSET}.jsonl" |
|
|
|
if $ASYNC_MODE; then |
|
# --- [rb-001] Update offset BEFORE forking (reserve the range) --- |
|
echo "$CURRENT_SIZE" > "$OFFSET_FILE" |
|
|
|
# --- [rb-002] Move delta to pending dir (avoid parent trap deleting it) --- |
|
DELTA_FILE="$PENDING_DIR/${SESSION_FILE}_${PREV_OFFSET}.jsonl" |
|
mv "$TMPFILE" "$DELTA_FILE" |
|
|
|
# Write manifest for retry |
|
cat > "$PENDING_DIR/${SESSION_FILE}_${PREV_OFFSET}.manifest.json" << MANIFEST |
|
{"r2_key":"$R2_KEY","delta_path":"$DELTA_FILE","bucket":"$SESSION_R2_BUCKET","prev_offset":$PREV_OFFSET,"current_size":$CURRENT_SIZE,"offset_file":"$OFFSET_FILE"} |
|
MANIFEST |
|
|
|
# --- [rb-003] Fork upload with full detachment --- |
|
# Export S3 creds for the background subshell, call shared r2-s3v4.sh directly |
|
nohup bash -c " |
|
export R2_S3_ACCESS_KEY_ID='${R2_S3_ACCESS_KEY_ID}' |
|
export R2_S3_SECRET_ACCESS_KEY='${R2_S3_SECRET_ACCESS_KEY}' |
|
export R2_S3_ENDPOINT='${R2_S3_ENDPOINT}' |
|
if '${R2_SCRIPT}' put '${SESSION_R2_BUCKET}' '${R2_KEY}' '$DELTA_FILE'; then |
|
rm -f '$DELTA_FILE' '$PENDING_DIR/${SESSION_FILE}_${PREV_OFFSET}.manifest.json' |
|
fi |
|
" >/dev/null 2>&1 & |
|
disown |
|
|
|
_log "🚀 Async upload queued: r2://${SESSION_R2_BUCKET}/${R2_KEY} (${DELTA_SIZE} bytes)" |
|
else |
|
# --- Synchronous upload --- |
|
# Combine tmpfile cleanup with lock release (rb-001: bash trap replaces, not stacks) |
|
trap 'rm -f "$TMPFILE"; _release_lock' EXIT |
|
|
|
if _s3_put "${SESSION_R2_BUCKET}" "${R2_KEY}" "$TMPFILE"; then |
|
echo "$CURRENT_SIZE" > "$OFFSET_FILE" |
|
_log "✅ Uploaded to r2://${SESSION_R2_BUCKET}/${R2_KEY} (${DELTA_SIZE} bytes)" |
|
echo "r2://${SESSION_R2_BUCKET}/${R2_KEY}" |
|
else |
|
_log "❌ Upload failed" |
|
exit 1 |
|
fi |
|
|
|
# --- Retry any pending uploads from previous async failures --- |
|
for manifest in "$PENDING_DIR"/*.manifest.json; do |
|
[ -f "$manifest" ] || continue |
|
eval "$(python3 -c " |
|
import sys, json |
|
m = json.load(open('$manifest')) |
|
print('P_R2_KEY=%s' % m.get('r2_key', '')) |
|
print('P_DELTA=%s' % m.get('delta_path', '')) |
|
print('P_BUCKET=%s' % m.get('bucket', '')) |
|
" 2>/dev/null || echo "P_R2_KEY= P_DELTA= P_BUCKET=")" |
|
[ -z "$P_R2_KEY" ] || [ -z "$P_DELTA" ] || [ -z "$P_BUCKET" ] && continue |
|
[ -f "$P_DELTA" ] || { rm -f "$manifest"; continue; } |
|
if _s3_put "${P_BUCKET}" "${P_R2_KEY}" "$P_DELTA"; then |
|
rm -f "$P_DELTA" "$manifest" |
|
_log "🔄 Retried pending: $P_R2_KEY" |
|
fi |
|
done |
|
fi |