Skip to content

Instantly share code, notes, and snippets.

@dui
Created April 10, 2026 00:39
Show Gist options
  • Select an option

  • Save dui/9286c859527fc286e145a8e97b57c788 to your computer and use it in GitHub Desktop.

Select an option

Save dui/9286c859527fc286e145a8e97b57c788 to your computer and use it in GitHub Desktop.
cc_cost — see how much you're spending on Claude Code Max in API-equivalent dollars
#!/usr/bin/env python3
"""
cc_cost — quick token / $-cost queries against ~/.claude/projects.
For LIVE current-session info, prefer the authoritative numbers from
Claude Code's statusline JSON (`cost.total_cost_usd`, `rate_limits.*`).
This script is for HISTORICAL queries (per-day, per-window, since-X) that
Claude Code doesn't surface directly.
Usage:
cc_cost Current 5h session window (auto-detected)
cc_cost --since 8pm Since 8pm today (BRT, UTC-3)
cc_cost --since 1h Last 1 hour
cc_cost --since 30m Last 30 minutes
cc_cost --since 2026-04-07 Since midnight UTC on a specific day
cc_cost --today Since 00:00 today (BRT)
cc_cost --day 2026-04-03 Activity on a specific BRT day
cc_cost --windows Show all 5h windows since Jan 20
cc_cost --windows --top 10 Top 10 windows by $ cost
cc_cost --raw Print raw token totals as well
cc_cost --pct 4 Treat current spend as N% of cap and show
implied 100% cap value
All queries print:
- $ cost at Opus 4.5/4.6 list rates ($5/$6.25/$0.50/$25 per MTok)
- input / cache_create / cache_read / output token breakdown
- first/last message timestamps
- implied % of session cap (if relevant), defaulting to a $200 cap
Dedup: messages are deduplicated globally on `msg_id`, keeping the FINAL
streaming chunk per file. Without this, raw totals are inflated ~14x by
streaming chunks (2-3 entries per response) and cross-file replication
(sprint subagents share context, auto-compaction copies parent messages).
Known limitations vs Claude Code's authoritative numbers:
- The cache_creation token type is split into 5m and 1h ephemeral caches in
the JSONL (`usage.cache_creation.ephemeral_5m_input_tokens` /
`ephemeral_1h_input_tokens`). 1h writes are billed at 2x base ($10/MTok),
not 1.25x. This script uses the 1.25x rate for both, which under-counts
cache writes when the 1h cache is dominant. For exact figures, use
Claude Code's `cost.total_cost_usd` instead.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
PROJECTS_DIR = Path.home() / ".claude" / "projects"
# Opus 4.5 / 4.6 list pricing ($/MTok). Both share the same tier.
PRICE = {
"input_tokens": 5.00,
"cache_creation_input_tokens": 6.25, # 5-min cache write
"cache_read_input_tokens": 0.50,
"output_tokens": 25.00,
}
BRT = timezone(timedelta(hours=-3)) # UTC-3 São Paulo
DEFAULT_SESSION_CAP_USD = 200.0 # rough fit from Apr 3/6 cap hits
WINDOW_HOURS = 5
# Known reset boundary for 5h window grid (Apr 6 22:00 UTC = 7pm BRT)
ANCHOR_UTC = datetime(2026, 4, 6, 22, 0, tzinfo=timezone.utc)
# ─────────────────────────────────────────────────────────── time parsing ──
def parse_since(expr: str) -> datetime:
"""
Parse a flexible time expression into an aware UTC datetime.
Accepted forms:
"8pm", "2:30pm", "08:00" — clock time today (BRT)
"1h", "30m", "2h15m" — relative offset from now
"2026-04-07" — UTC midnight of date
"2026-04-07T20:00" — naive ISO interpreted as BRT
"2026-04-07T20:00:00Z" or "+...:00" — ISO with explicit zone
"""
s = expr.strip().lower()
# Relative: 1h, 30m, 2h15m
m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?", s)
if m and (m.group(1) or m.group(2)):
h = int(m.group(1) or 0)
mn = int(m.group(2) or 0)
return datetime.now(timezone.utc) - timedelta(hours=h, minutes=mn)
# Clock time today (BRT): 8pm, 2:30pm, 14:00, 08:00
m = re.fullmatch(r"(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", s)
if m:
hour = int(m.group(1))
minute = int(m.group(2) or 0)
ampm = m.group(3)
if ampm == "pm" and hour != 12:
hour += 12
elif ampm == "am" and hour == 12:
hour = 0
now_brt = datetime.now(BRT)
candidate = now_brt.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If time is in the future, assume yesterday
if candidate > now_brt:
candidate -= timedelta(days=1)
return candidate.astimezone(timezone.utc)
# ISO date / datetime
try:
dt = datetime.fromisoformat(s.replace("z", "+00:00"))
if dt.tzinfo is None:
# Naive ISO → interpret as BRT (the user's local zone)
dt = dt.replace(tzinfo=BRT)
return dt.astimezone(timezone.utc)
except ValueError:
pass
raise SystemExit(f"cc_cost: unrecognized --since expression: {expr!r}")
def current_window_start(now: datetime | None = None) -> datetime:
"""Return the UTC start of the 5h session window containing `now`."""
if now is None:
now = datetime.now(timezone.utc)
diff = (now - ANCHOR_UTC).total_seconds()
windows_forward = int(diff // (WINDOW_HOURS * 3600))
return ANCHOR_UTC + timedelta(hours=WINDOW_HOURS * windows_forward)
def fmt_brt(dt: datetime) -> str:
return dt.astimezone(BRT).strftime("%Y-%m-%d %H:%M BRT")
def fmt_dur(td: timedelta) -> str:
s = int(td.total_seconds())
h, rem = divmod(s, 3600)
m, _ = divmod(rem, 60)
if h:
return f"{h}h{m:02d}"
return f"{m}m"
# ─────────────────────────────────────────────────────────────── scanning ──
def iter_assistant_usages(since: datetime | None = None,
until: datetime | None = None,
mtime_after: datetime | None = None):
"""
Yield (timestamp, usage_dict) for each unique assistant message in
~/.claude/projects, deduped globally by `msg_id`. Within a file the
LAST occurrence per msg_id wins (final streaming chunk has the
complete output count).
`mtime_after` is a fast-path filter: skip files whose filesystem mtime
is older than this. Useful for status-line use where you only care
about the current ~5h window — there are typically only 1-3 active
files vs ~4000 total in ~/.claude/projects.
"""
seen: dict[str, tuple[datetime, dict]] = {}
mtime_cutoff = mtime_after.timestamp() if mtime_after else None
for project_dir in sorted(PROJECTS_DIR.iterdir()):
if not project_dir.is_dir():
continue
for jsonl_file in sorted(project_dir.rglob("*.jsonl")):
if mtime_cutoff is not None:
try:
if jsonl_file.stat().st_mtime < mtime_cutoff:
continue
except OSError:
continue
try:
file_latest: dict[str, tuple[datetime, dict]] = {}
with open(jsonl_file) as f:
for line_no, line in enumerate(f):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("type") != "assistant":
continue
ts_str = obj.get("timestamp")
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except (ValueError, TypeError):
continue
if since and ts < since:
continue
if until and ts >= until:
continue
msg = obj.get("message", {}) or {}
usage = msg.get("usage", {})
if not usage:
continue
key = (
msg.get("id")
or obj.get("requestId")
or f"_{jsonl_file}_{line_no}"
)
file_latest[key] = (ts, usage)
for key, (ts, usage) in file_latest.items():
if key not in seen:
seen[key] = (ts, usage)
except Exception:
continue
for ts, usage in seen.values():
yield ts, usage
def cost_of(usage: dict) -> float:
return sum(usage.get(k, 0) * PRICE[k] for k in PRICE) / 1_000_000
# ─────────────────────────────────────────────────────────────── reports ──
def report_range(since: datetime, until: datetime | None,
label: str, cap_usd: float | None,
show_raw: bool = False, pct_now: float | None = None,
mtime_after: datetime | None = None):
"""Print a single time-range report."""
totals = defaultdict(int)
msgs = 0
first_ts = last_ts = None
for ts, usage in iter_assistant_usages(since=since, until=until,
mtime_after=mtime_after):
msgs += 1
for k in PRICE:
totals[k] += usage.get(k, 0)
if first_ts is None or ts < first_ts:
first_ts = ts
if last_ts is None or ts > last_ts:
last_ts = ts
cost = cost_of(totals)
raw = sum(totals[k] for k in PRICE)
print(f"\n {label}")
print(f" {'─' * 64}")
print(f" since {fmt_brt(since)}")
if until:
print(f" until {fmt_brt(until)}")
if first_ts:
print(f" first msg {fmt_brt(first_ts)}")
print(f" last msg {fmt_brt(last_ts)}")
print(f" active {fmt_dur(last_ts - first_ts)}")
print(f" msgs {msgs:,} (deduped)")
print()
def line(name, k):
n = totals[k]
c = n * PRICE[k] / 1_000_000
print(f" {name:<14} {n:>15,} ${c:>9,.2f}")
line("input", "input_tokens")
line("cache write", "cache_creation_input_tokens")
line("cache read", "cache_read_input_tokens")
line("output", "output_tokens")
print(f" {'─' * 44}")
print(f" {'TOTAL':<14} {raw:>15,} ${cost:>9,.2f}")
if cap_usd:
pct = 100.0 * cost / cap_usd
bar = int(pct / 2.5) # 40 cells = 100%
print()
print(f" cap fit ${cost:.2f} of ~${cap_usd:.0f} cap "
f"= {pct:.1f}%")
print(f" [{'█' * min(bar, 40)}{'·' * max(40 - bar, 0)}]")
if pct_now is not None and cost > 0:
implied = cost / (pct_now / 100)
print()
print(f" implied if current spend is {pct_now:.1f}% of cap, "
f"100% ≈ ${implied:.2f}")
if show_raw:
print()
print(" raw token counts:")
for k in PRICE:
print(f" {k:<32} {totals[k]:>15,}")
def report_windows(top: int | None, since: datetime | None):
"""Show 5h windows, sorted by start time, with $ cost per window."""
windows: dict[datetime, dict] = defaultdict(lambda: {
**{k: 0 for k in PRICE}, "msgs": 0,
"first_ts": None, "last_ts": None,
})
cutoff = since or datetime(2026, 1, 1, tzinfo=timezone.utc)
for ts, usage in iter_assistant_usages(since=cutoff):
diff = (ts - ANCHOR_UTC).total_seconds()
windows_back = int(diff // (WINDOW_HOURS * 3600))
w_start = ANCHOR_UTC + timedelta(hours=WINDOW_HOURS * windows_back)
w = windows[w_start]
for k in PRICE:
w[k] += usage.get(k, 0)
w["msgs"] += 1
if w["first_ts"] is None or ts < w["first_ts"]:
w["first_ts"] = ts
if w["last_ts"] is None or ts > w["last_ts"]:
w["last_ts"] = ts
rows = []
for w_start, w in windows.items():
c = sum(w[k] * PRICE[k] for k in PRICE) / 1_000_000
rows.append((w_start, w, c))
rows.sort(key=lambda r: r[2], reverse=True)
if top:
rows = rows[:top]
else:
rows = sorted(rows, key=lambda r: r[0])
print()
print(f" {'window (BRT)':<22} {'active':>7} {'msgs':>6} {'$cost':>9}")
print(f" {'─' * 50}")
for w_start, w, c in rows:
label = w_start.astimezone(BRT).strftime("%a %m-%d %H:%M")
active = ""
if w["first_ts"] and w["last_ts"]:
active = fmt_dur(w["last_ts"] - w["first_ts"])
print(f" {label:<22} {active:>7} {w['msgs']:>6,} ${c:>8,.2f}")
print(f" {'─' * 50}")
total_cost = sum(c for _, _, c in rows)
total_msgs = sum(w["msgs"] for _, w, _ in rows)
print(f" {'TOTAL':<22} {'':>7} {total_msgs:>6,} ${total_cost:>8,.2f}")
# ─────────────────────────────────────────────────────────────────── CLI ──
def _bucket_costs(totals: dict) -> dict:
"""Return per-bucket $ costs as a flat dict."""
return {
"input_usd": totals.get("input_tokens", 0) * PRICE["input_tokens"] / 1_000_000,
"cache_write_usd": totals.get("cache_creation_input_tokens", 0) * PRICE["cache_creation_input_tokens"] / 1_000_000,
"cache_read_usd": totals.get("cache_read_input_tokens", 0) * PRICE["cache_read_input_tokens"] / 1_000_000,
"output_usd": totals.get("output_tokens", 0) * PRICE["output_tokens"] / 1_000_000,
}
def _compute_window_state():
"""Compute current 5h window cost + metadata. Fast (mtime-filtered)."""
since = current_window_start()
mtime_cutoff = since - timedelta(minutes=30)
totals = defaultdict(int)
last_ts = None
msgs = 0
for ts, usage in iter_assistant_usages(since=since, mtime_after=mtime_cutoff):
msgs += 1
for k in PRICE:
totals[k] += usage.get(k, 0)
if last_ts is None or ts > last_ts:
last_ts = ts
cost = cost_of(totals)
pct = 100.0 * cost / DEFAULT_SESSION_CAP_USD
elapsed = (datetime.now(timezone.utc) - since)
remaining = timedelta(hours=WINDOW_HOURS) - elapsed
return {
"since": since,
"msgs": msgs,
"cost_usd": cost,
"cap_usd": DEFAULT_SESSION_CAP_USD,
"pct": pct,
"elapsed": elapsed,
"remaining": remaining,
"last_ts": last_ts,
"buckets": _bucket_costs(totals),
}
def _weekly_window_start() -> datetime:
"""
Return the UTC start of the current 7-day rate-limit window by reading
the reset timestamp from Claude Code's statusline JSON dump. The window
start is simply resets_at − 7 days.
Falls back to now − 7 days if the dump is missing or stale.
"""
dump = Path("/tmp/claude-statusline-last.json")
try:
data = json.loads(dump.read_text())
resets_at = data.get("rate_limits", {}).get("seven_day", {}).get("resets_at")
if resets_at:
reset_dt = datetime.fromtimestamp(resets_at, tz=timezone.utc)
return reset_dt - timedelta(days=7)
except (OSError, json.JSONDecodeError, TypeError, ValueError):
pass
# Fallback: rolling 7 days
return datetime.now(timezone.utc) - timedelta(days=7)
def _compute_weekly_state():
"""Compute 7-day window cost + msg count using the actual window boundary."""
since = _weekly_window_start()
mtime_cutoff = since - timedelta(minutes=30)
totals = defaultdict(int)
msgs = 0
for ts, usage in iter_assistant_usages(since=since, mtime_after=mtime_cutoff):
msgs += 1
for k in PRICE:
totals[k] += usage.get(k, 0)
return {
"since": since,
"cost_usd": cost_of(totals),
"msgs": msgs,
"buckets": _bucket_costs(totals),
}
def statusline():
"""One-line summary for the Claude Code status bar (text)."""
s = _compute_window_state()
sys.stdout.write(
f"${s['cost_usd']:.2f} ({s['pct']:.0f}% of ~${s['cap_usd']:.0f} cap) "
f"· {s['msgs']} msgs · {fmt_dur(s['remaining'])} left in window"
)
def _session_breakdown(transcript_path: str, since_epoch: float | None = None):
"""
Per-bucket $ breakdown for ONE session (parent transcript + subagent files).
Scope: just the given transcript and any sibling files in the
`<session_id>/subagents/` directory. No global glob — fast (~40ms full
process incl. python startup) and bounded to one session.
`since_epoch` (optional) filters messages by timestamp. The status bar
passes this to align scope with `cost.total_cost_usd`, which counts only
the current Claude Code process invocation (resets on /compact and on
session resume). Without filtering, the scan covers the full transcript
history and the bracket totals diverge from CC's authoritative total
after a /compact.
Output (shell-eval format) on stdout:
IN_USD=... CW_USD=... CR_USD=... OUT_USD=... TOTAL_USD=...
"""
p = Path(transcript_path)
sid = p.stem
sub_dir = p.parent / sid
files = [p]
if sub_dir.exists():
files.extend(sorted(sub_dir.rglob("*.jsonl")))
since_dt = (
datetime.fromtimestamp(since_epoch, tz=timezone.utc)
if since_epoch is not None else None
)
latest: dict[str, dict] = {}
for f in files:
try:
with open(f) as fh:
for line_no, line in enumerate(fh):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("type") != "assistant":
continue
msg = obj.get("message", {}) or {}
usage = msg.get("usage")
if not usage:
continue
if since_dt is not None:
ts_str = obj.get("timestamp")
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except (ValueError, TypeError):
continue
if ts < since_dt:
continue
key = msg.get("id") or obj.get("requestId") or f"_{f}_{line_no}"
latest[key] = usage
except (OSError, FileNotFoundError):
continue
totals = {k: 0 for k in PRICE}
for u in latest.values():
for k in PRICE:
totals[k] += u.get(k, 0)
parts = {k: totals[k] / 1_000_000 * PRICE[k] for k in PRICE}
grand = sum(parts.values())
sys.stdout.write(
f"IN_USD={parts['input_tokens']:.4f} "
f"CW_USD={parts['cache_creation_input_tokens']:.4f} "
f"CR_USD={parts['cache_read_input_tokens']:.4f} "
f"OUT_USD={parts['output_tokens']:.4f} "
f"TOTAL_USD={grand:.4f}\n"
)
def cache_statusbar():
"""
Write JSON cache for the status bar to consume. Refreshed in the
background by statusline.sh on a ~5min cadence. Schema:
{
"ts": "<UTC iso>", # cache mtime
"window_start": "<UTC iso>",
"window_cost_usd": float, # current 5h window
"window_msgs": int,
"window_remaining_sec": int,
"weekly_start": "<UTC iso>", # 7d ago
"weekly_cost_usd": float, # last 7d
"weekly_msgs": int
}
"""
w = _compute_window_state()
wk = _compute_weekly_state()
cache_dir = Path.home() / ".cache" / "claude-stats"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / "cc_cost.json"
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"window_start": w["since"].isoformat(),
"window_cost_usd": round(w["cost_usd"], 4),
"window_in_usd": round(w["buckets"]["input_usd"], 4),
"window_cw_usd": round(w["buckets"]["cache_write_usd"], 4),
"window_cr_usd": round(w["buckets"]["cache_read_usd"], 4),
"window_out_usd": round(w["buckets"]["output_usd"], 4),
"window_msgs": w["msgs"],
"window_remaining_sec": int(w["remaining"].total_seconds()),
"weekly_start": wk["since"].isoformat(),
"weekly_cost_usd": round(wk["cost_usd"], 4),
"weekly_in_usd": round(wk["buckets"]["input_usd"], 4),
"weekly_cw_usd": round(wk["buckets"]["cache_write_usd"], 4),
"weekly_cr_usd": round(wk["buckets"]["cache_read_usd"], 4),
"weekly_out_usd": round(wk["buckets"]["output_usd"], 4),
"weekly_msgs": wk["msgs"],
}
tmp = cache_file.with_suffix(".json.tmp")
tmp.write_text(json.dumps(payload))
tmp.replace(cache_file)
def main():
# Statusline fast paths: skip argparse overhead.
if len(sys.argv) >= 2 and sys.argv[1] == "--statusline":
statusline()
return
if len(sys.argv) >= 2 and sys.argv[1] == "--cache-statusbar":
cache_statusbar()
return
if len(sys.argv) >= 3 and sys.argv[1] == "--session-breakdown":
# Optional 3rd arg: unix epoch seconds — only count msgs after this.
since = float(sys.argv[3]) if len(sys.argv) >= 4 else None
_session_breakdown(sys.argv[2], since_epoch=since)
return
p = argparse.ArgumentParser(
prog="cc_cost",
description="Quick token / $-cost queries against ~/.claude/projects",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__.split("Usage:")[1] if "Usage:" in (__doc__ or "") else "",
)
# --since pairs with --windows OR drives a one-shot range report.
p.add_argument("--since", help="Show usage since this time (see --help)")
g = p.add_mutually_exclusive_group()
g.add_argument("--today", action="store_true",
help="Since 00:00 today (BRT)")
g.add_argument("--day", help="Activity on a specific BRT day (YYYY-MM-DD)")
g.add_argument("--windows", action="store_true",
help="Show 5-hour window breakdown")
g.add_argument("--statusline", action="store_true",
help="One-line status bar output (fast path)")
p.add_argument("--top", type=int, default=None,
help="With --windows, show top N windows by cost")
p.add_argument("--cap", type=float, default=DEFAULT_SESSION_CAP_USD,
help=f"5h session cap in USD (default ${DEFAULT_SESSION_CAP_USD:.0f})")
p.add_argument("--pct", type=float, default=None,
help="If you know the current indicator %, show implied 100% cap")
p.add_argument("--raw", action="store_true",
help="Also print raw token counts")
args = p.parse_args()
if args.windows:
since = parse_since(args.since) if args.since else None
report_windows(top=args.top, since=since)
return
if args.today:
now_brt = datetime.now(BRT)
since = now_brt.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc)
report_range(since, until=None, label="Today (since 00:00 BRT)",
cap_usd=None, show_raw=args.raw, pct_now=args.pct)
return
if args.day:
try:
d = datetime.fromisoformat(args.day).replace(tzinfo=BRT)
except ValueError:
raise SystemExit(f"cc_cost: bad --day: {args.day}")
since = d.astimezone(timezone.utc)
until = (d + timedelta(days=1)).astimezone(timezone.utc)
report_range(since, until=until, label=f"{args.day} (BRT day)",
cap_usd=None, show_raw=args.raw, pct_now=args.pct)
return
if args.since:
since = parse_since(args.since)
report_range(since, until=None, label=f"Since {args.since}",
cap_usd=args.cap, show_raw=args.raw, pct_now=args.pct)
return
# Default: current 5h session window
since = current_window_start()
report_range(since, until=None, label="Current 5h session window",
cap_usd=args.cap, show_raw=args.raw, pct_now=args.pct)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment