Skip to content

Instantly share code, notes, and snippets.

@dui
Last active April 7, 2026 23:31
Show Gist options
  • Select an option

  • Save dui/b9a20992ef5582c4bb7c4cc8b1dff877 to your computer and use it in GitHub Desktop.

Select an option

Save dui/b9a20992ef5582c4bb7c4cc8b1dff877 to your computer and use it in GitHub Desktop.
Claude Code usage analysis — 5h window breakdown with cost ($) at Opus 4.6 list pricing, deduped on msg_id (corrected after discovering streaming-chunk + cross-file replication bugs)

Claude Code Usage Limit Analysis

User: @dui_toledo (Max 20x / $200 plan) Models in dataset: claude-opus-4-5 (Jan 20 – Feb 5) → claude-opus-4-6 (Feb 6 → present), reasoning effort high throughout. Both share the same price tier ($5 / $6.25 / $0.50 / $25 per MTok). Timezone: UTC-3 (BRT, São Paulo) Original report: 2026-04-06 (since corrected — see below) Latest revision: 2026-04-07 evening (added $-cost view, then found a second over-count bug — duplicate streaming chunks + cross-file replication. Numbers are now ~14x lower than the prior revision.)

Two methodology bugs (both now fixed)

  1. Non-recursive glob (found Apr 6). The scanner used Path.glob("*.jsonl") (non-recursive) on each project directory. Subagent sessions live in <project>/<parent-session-id>/subagents/agent-*.jsonl subdirectories, which were silently skipped. Fixed by switching to rglob("*.jsonl"). This made the dataset look ~30x larger.

  2. No deduplication of repeated message logs (found Apr 7 evening). Two distinct duplication patterns existed:

    • Streaming chunks within a file. Claude Code logs each streaming response in 2–3 entries per file. Each entry carries the full input / cache_create / cache_read totals, and only the output count grows across chunks. Naively summing every line over-counts input/cache 2–3x.
    • Cross-file replication. Sprint subagents share conversation history — the same msg_id (same request_id, identical usage) appears in 2–6 sibling subagent files because each persistent agent keeps its own local transcript. Auto-compaction subagents (agent-acompact-*.jsonl) also copy parent messages verbatim.

    Both fixed by deduplicating globally on msg_id, keeping the final-streaming-chunk entry per id. This compresses the dataset by another ~9x on top of the rglob fix.

The first revision reported $58,081 of API list value over 78 days. The corrected number is $4,079 — about 14x lower. The 5-hour cap that previously appeared to fire at "$3,000 of API value" actually fires at ~$200. Apologies to anyone who saw the inflated version; the real picture is much more grounded.

Pricing assumptions ($/MTok, Opus 4.5 = Opus 4.6 list rates)

Verified against platform.claude.com/docs/en/docs/about-claude/pricing on 2026-04-07.

Token type Rate Multiplier vs base
Input $5.00 / MTok 1x
5-min cache write $6.25 / MTok 1.25x
Cache read (hit) $0.50 / MTok 0.10x
Output $25.00 / MTok 5x

Headline numbers (Jan 20 – Apr 7, 2026)

Metric Value
Active 5-hour windows 94
Unique API calls (deduped) 55,485
Total raw tokens 5.42 B
↳ input 1.24 M
↳ cache write 184.4 M
↳ cache read 5.22 B
↳ output 12.35 M
Estimated API list value $4,079
Subscription paid $200/mo × ~2.6 months ≈ $520
ROI vs list price ~7.8x

Cache reads dominate the bill: ~$2,612 of the $4,079 (64%) comes from cache_read tokens at $0.50/MTok. Even at the 0.1x weighting of the 5h budget, they're the binding cost driver.

Top 25 single 5-hour windows (sorted by $ list value)

Rank Date Day Window (BRT) Active Msgs Total $ value Notes
1 Mar 21 Sat 08:00–13:00 4h18 5,273 653.3 M $482 promo (2x off-peak)
2 Mar 28 Sat 10:00–15:00 4h59 3,659 469.1 M $322 promo (last day)
3 Apr 03 Fri 16:00–21:00 2h09 2,858 267.0 M $194 ← LIMIT HIT
4 Apr 06 Mon 19:00–00:00 2h12 2,189 243.1 M $190 indicator hit 79% (no cap fire)
5 Apr 02 Thu 20:00–01:00 2h44 2,310 242.0 M $189
6 Mar 23 Mon 15:00–20:00 2h57 2,124 221.3 M $181 promo
7 Mar 23 Mon 20:00–01:00 2h44 2,026 237.6 M $169 promo
8 Apr 04 Sat 12:00–17:00 4h59 2,552 233.6 M $155
9 Mar 30 Mon 17:00–22:00 2h56 1,950 177.9 M $141 post-promo
10 Mar 13 Fri 15:00–20:00 2h13 2,006 185.5 M $141 promo (day 1)
11 Mar 24 Tue 21:00–02:00 2h15 1,335 163.6 M $124 promo
12 Mar 24 Tue 16:00–21:00 2h00 1,700 170.1 M $124 promo
13 Apr 02 Thu 15:00–20:00 1h55 1,744 135.0 M $103
14 Mar 21 Sat 13:00–18:00 2h00 887 136.6 M $103 promo
15 Apr 01 Wed 19:00–00:00 4h06 1,388 120.1 M $95
16 Mar 26 Thu 18:00–23:00 4h59 1,362 116.6 M $95 promo
17 Apr 01 Wed 14:00–19:00 1h49 1,446 117.8 M $94
18 Jan 21 Wed 19:00–00:00 4h40 1,582 134.6 M $93 pre-promo
19 Feb 19 Thu 23:00–04:00 2h07 1,262 101.2 M $75 pre-promo
20 Apr 04 Sat 17:00–22:00 2h16 1,001 94.1 M $62
21 Feb 11 Wed 20:00–01:00 2h48 1,081 90.1 M $61 pre-promo
22 Feb 12 Thu 21:00–02:00 3h29 1,054 79.2 M $61 pre-promo
23 Jan 20 Tue 18:00–23:00 4h25 881 76.7 M $54 pre-promo
24 Mar 13 Fri 20:00–01:00 4h08 593 67.2 M $52 promo
25 Feb 17 Tue 16:00–21:00 2h15 739 66.5 M $47 pre-promo

How the session limit appears to be metered

Two sessions in the dataset hit (or approached) the cap:

Date Window Total tokens $ list value Indicator observation
Apr 03 Fri 16:00–21:00 BRT 267 M $194 cap fired at end of window
Apr 06 Mon 19:00–00:00 BRT 243 M $190 indicator reached 79%, no cap fire

Largest non-capped post-promo window: Mon Mar 30 @ $141 (clearly under). Largest non-capped pre-promo window: Thu Feb 19 @ $75. Largest promo window: Sat Mar 21 @ $482 (about 2.5x the post-promo cap, consistent with "2x off-peak promo" plus headroom).

Cross-fitting against the live Settings → Usage percentage on Apr 6 (the user observed 49% → 55% → 67% → 79% during the 19-00 BRT window):

  • Final window value: $190 raw
  • If 79% indicator corresponds to $190 of value (i.e. 79% read at the END of the window), the implied 100% cap is **$240**
  • If 79% was midway and more spend followed, the cap is closer to $200

The 5-hour session cap therefore appears to fire somewhere between $190 and $240 of API list value per window, equivalent to roughly 220–280 M effective-weighted tokens. This is much lower than the previous (over-counted) estimate of $3,000.

Why Apr 3 hit the cap

Three things lined up on Apr 3:

  1. High concurrency. ~80 distinct subagent files in this window (down from "353" in the bad-data version, which counted each subagent file 4-5x via worktree replicas). Each spawned agent has its own initial cache_creation cost and growing context history.
  2. Aggregate cache_read volume. 258.6 M cache_read tokens in 2 hours, at $0.50/MTok = ~$129 of cache reads alone.
  3. No idle gaps long enough to amortize. Median message gap during sprint runs was ~3 seconds; max ~30 minutes; nothing long enough for cache TTLs.

The session bucket measures fan-out × duration × cache reuse intensity, weighted by API price tiers. Apr 3 maxed all three.

Promo vs post-promo: a smaller real difference than first thought

Mar 21 Sat 08-13 burned $482 during the Mar 13-28 promo window — about 2.5x the post-promo cap of ~$200. The earlier (over-counted) version of this report showed $22,718 for the same window and forced a "the cap was disabled during promo" hypothesis.

With correct dedup:

  • Mar 21 ($482) — fits a ~2.5x off-peak promo bump cleanly
  • Mar 28 ($322) — fits a 1.6x bump
  • Mar 23 ($181), Mar 24 ($124, $124) — well under the post-promo cap
  • Mar 30 (post-promo, $141) — well under

The pattern is consistent with a stable cap that was raised by ~2-2.5x during promo for off-peak hours, then returned to normal afterward. No need for a "cap was unenforced" hypothesis.

Correcting prior conclusions (twice corrected)

Original framing (Apr 6 morning, retracted):

  • "Apr 3 was the lowest-volume window to ever trigger a usage limit" — wrong, depended on the rglob bug.

Revised framing (Apr 6 evening, ALSO retracted):

  • ~~"Apr 3 burned ~$3,632 of API value"~~ — wrong, depended on counting streaming chunks and cross-file replicas as distinct calls.
  • ~~"Cap fires at ~$3,000 of value"~~ — wrong, was 14x over.
  • "112x ROI vs subscription" — wrong, was 14x over.
  • "The promo period had the cap effectively disabled" — wrong, the gap was 2-2.5x, not 7-15x.

Current best understanding (Apr 7 evening):

  • Apr 3 burned ~$194 of API list value over 2h09m of active sprint usage
  • 5-hour session cap fires at ~$200-240 of API list value
  • Promo cap was ~2-2.5x the post-promo cap (fits the announced "2x off-peak" promo)
  • Total dataset is ~$4,079 of API list value over 78 days = ~$52/day average on active days
  • ROI vs $200/month subscription is ~7-8x (still very good, but not "crazy")

Methodology

  • Scan all ~/.claude/projects/**/*.jsonl files recursively
  • Extract assistant message timestamps + usage fields (input, cache_creation, cache_read, output)
  • Dedupe by msg_id globally across all files. Within a file, keep only the LAST entry per msg_id (the final streaming chunk has the complete output count). Across files, the first occurrence wins.
  • Bucket by 5-hour window, anchored from a known reset (Apr 6 22:00 UTC)
  • Compute $ value at Opus 4.5/4.6 list prices

Verification (what convinced me the dedup is real)

For one msg_id picked at random (msg_01PTaHVkaETBU8LARbjQtuMK, a Mar 21 sprint subagent message), the same request_id, same timestamp, and identical usage (in=3 cawr=12967 card=6696 out=2) appeared in 6 different subagent files across two sibling worktrees. Anthropic billed for that response once; my prior summation counted it 6 times.

For another msg_id (msg_012CNYvQ9u7jLWse), three entries in the same file appeared within 1 second of each other, with the same request_id and identical input/cache totals — but output_tokens grew from 4 → 4 → 206 across the three. This is a streaming response logged in 3 chunks; only the final entry should be counted.

Environment

  • CLI version: 2.1.x
  • Plan: Claude Max 20x ($200/mo)
  • Models in dataset: claude-opus-4-5 (Jan 20 – Feb 5) → claude-opus-4-6 (Feb 6 → present), reasoning effort: high
  • Subagent helper model: claude-haiku-4-5 (small fraction of messages)
  • Token source: ~/.claude/projects/**/*.jsonl (recursive, deduped on msg_id)
  • Live caps observed Apr 6:
    • Session (5h): ~$200 of API list value, ~220–280 M effective weighted tokens
    • Weekly (all models): not yet remeasured under corrected dedup; original "1.57B / 21%" reading is suspect for the same reason
    • Sonnet only: separate bucket, untouched
  • Promo period: Mar 13–28, 2026 — 2x off-peak usage. Apparent cap raised by ~2-2.5x during promo for off-peak hours.
#!/usr/bin/env python3
"""
Claude Code token usage analyzer.
Adapted from https://gist.github.com/kieranklaassen/7b2ebb39cbbb78cc2831497605d76cc6
Adds per-day breakdown and fixes paths for local use.
"""
import json
import os
import sys
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timedelta, timezone
PROJECTS_DIR = Path.home() / ".claude" / "projects"
# Filter: only include sessions that started within the last N days (None = all time)
SINCE_DAYS = int(os.environ.get("SINCE_DAYS", "0")) or None
SINCE_DATE = os.environ.get("SINCE_DATE") # e.g. "2026-03-30"
def extract_text_content(content):
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif isinstance(item, str):
parts.append(item)
return "\n".join(parts).strip()
return ""
def is_human_prompt(msg_obj):
content = msg_obj.get("message", {}).get("content", "")
if isinstance(content, list):
types = [i.get("type") for i in content if isinstance(i, dict)]
if types and all(t == "tool_result" for t in types):
return False
return True
def parse_session(jsonl_path, is_subagent=False, seen_msg_ids=None):
"""Parse one session's jsonl. ``seen_msg_ids`` lets the caller pass a set
that's shared across files to deduplicate messages globally — important
because Claude Code logs the same API response in 2-3 streaming chunks per
file AND replicates it across multiple files (sprint subagents share
context, auto-compaction copies parent messages, etc.)."""
usage_total = defaultdict(int)
prompts = []
agent_id = None
session_id = None
timestamp_start = None
subagent_sessions = []
if seen_msg_ids is None:
seen_msg_ids = set()
try:
with open(jsonl_path) as f:
lines = f.readlines()
except Exception:
return None
# First pass: dedupe assistant entries by msg_id within this file, keeping
# the LAST occurrence (final streaming chunk has the complete output count).
# Records the *line index* of the chosen entry.
final_idx_for_msg = {} # msg_id -> line index of the chosen final entry
for line_no, line in enumerate(lines):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("type") != "assistant":
continue
msg = obj.get("message", {}) or {}
if not msg.get("usage"):
continue
key = msg.get("id") or obj.get("requestId") or f"_unkeyed_{line_no}"
final_idx_for_msg[key] = line_no
chosen_indexes = set(final_idx_for_msg.values())
for line_no, line in enumerate(lines):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
msg_type = obj.get("type")
ts = obj.get("timestamp")
if ts and not timestamp_start:
timestamp_start = ts
if not agent_id:
agent_id = obj.get("agentId")
if not session_id:
session_id = obj.get("sessionId")
if msg_type == "assistant":
if line_no not in chosen_indexes:
continue
msg = obj.get("message", {}) or {}
mid = msg.get("id") or obj.get("requestId") or f"_unkeyed_{line_no}"
if mid in seen_msg_ids:
continue
seen_msg_ids.add(mid)
usage = msg.get("usage", {})
usage_total["input_tokens"] += usage.get("input_tokens", 0)
usage_total["cache_creation_input_tokens"] += usage.get("cache_creation_input_tokens", 0)
usage_total["cache_read_input_tokens"] += usage.get("cache_read_input_tokens", 0)
usage_total["output_tokens"] += usage.get("output_tokens", 0)
elif msg_type == "user":
user_type = obj.get("userType", "")
is_sidechain = obj.get("isSidechain", False)
content = obj.get("message", {}).get("content", "")
text = extract_text_content(content)
if text and not is_sidechain and is_human_prompt(obj) and user_type != "tool":
prompts.append({
"text": text,
"timestamp": obj.get("timestamp"),
"entrypoint": obj.get("entrypoint", ""),
})
# Check for subagent sessions
session_dir = jsonl_path.parent / jsonl_path.stem
if session_dir.is_dir():
subagents_dir = session_dir / "subagents"
if subagents_dir.is_dir():
for sub_file in subagents_dir.glob("*.jsonl"):
sub_data = parse_session(sub_file, is_subagent=True, seen_msg_ids=seen_msg_ids)
if sub_data:
sub_data["subagent_file"] = str(sub_file.name)
subagent_sessions.append(sub_data)
total_tokens = (
usage_total["input_tokens"]
+ usage_total["cache_creation_input_tokens"]
+ usage_total["cache_read_input_tokens"]
+ usage_total["output_tokens"]
)
return {
"file": str(jsonl_path),
"session_id": session_id or jsonl_path.stem,
"agent_id": agent_id,
"is_subagent": is_subagent,
"timestamp_start": timestamp_start,
"usage": dict(usage_total),
"total_tokens": total_tokens,
"prompts": prompts,
"subagent_sessions": subagent_sessions,
}
def get_project_name(project_dir_name):
name = project_dir_name
# Auto-detect username from home dir
username = Path.home().name
prefixes = [f"-Users-{username}-", f"Users-{username}-"]
for p in prefixes:
if name.startswith(p):
name = name[len(p):]
break
return name or project_dir_name
def get_cutoff():
if SINCE_DATE:
return datetime.fromisoformat(SINCE_DATE).replace(tzinfo=timezone.utc)
if SINCE_DAYS:
return datetime.now(timezone.utc) - timedelta(days=SINCE_DAYS)
return None
def session_in_range(session, cutoff):
if not cutoff or not session["timestamp_start"]:
return True
try:
ts = datetime.fromisoformat(session["timestamp_start"].replace("Z", "+00:00"))
return ts >= cutoff
except ValueError:
return True
def get_session_date(session):
"""Extract date string (YYYY-MM-DD) from session timestamp."""
ts = session.get("timestamp_start")
if not ts:
return "unknown"
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
return "unknown"
def analyze_all():
projects = defaultdict(list)
cutoff = get_cutoff()
seen_msg_ids = set() # Global dedupe across all files in all projects.
for project_dir in sorted(PROJECTS_DIR.iterdir()):
if not project_dir.is_dir():
continue
project_name = get_project_name(project_dir.name)
# Top-level files first; parse_session() recurses into <session>/subagents/.
for jsonl_file in sorted(project_dir.glob("*.jsonl")):
session = parse_session(jsonl_file, seen_msg_ids=seen_msg_ids)
if session and session["total_tokens"] > 0 and session_in_range(session, cutoff):
projects[project_name].append(session)
# Sweep up any subagent files whose parent .jsonl wasn't at the top
# level (e.g. nested worktree projects).
for jsonl_file in sorted(project_dir.rglob("subagents/*.jsonl")):
session = parse_session(jsonl_file, is_subagent=True, seen_msg_ids=seen_msg_ids)
if session and session["total_tokens"] > 0 and session_in_range(session, cutoff):
projects[project_name].append(session)
return projects
def fmt(n):
return f"{n:,}"
def daily_breakdown(projects):
"""Aggregate token usage by day across all projects."""
by_day = defaultdict(lambda: {
"input_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"sessions": 0,
"subagent_tokens": 0,
"subagent_sessions": 0,
"projects": set(),
})
for project_name, sessions in projects.items():
for session in sessions:
date = get_session_date(session)
day = by_day[date]
day["sessions"] += 1
day["projects"].add(project_name)
for k in ["input_tokens", "cache_creation_input_tokens", "cache_read_input_tokens", "output_tokens"]:
day[k] += session["usage"].get(k, 0)
day["total_tokens"] += session["total_tokens"]
for sub in session["subagent_sessions"]:
day["subagent_tokens"] += sub["total_tokens"]
day["subagent_sessions"] += 1
return dict(sorted(by_day.items()))
def estimate_cost(usage):
"""
Cost estimate at Claude Opus 4.6 list prices ($/MTok), verified against
https://platform.claude.com/docs/en/docs/about-claude/pricing on 2026-04-07.
Opus 4.6: $5 input, $25 output, $0.50 cache read (0.1x), $6.25 cache write (1.25x)
Sonnet 4.6 included as a lower bound for mixed-model sessions.
"""
inp = usage.get("input_tokens", 0)
cache_create = usage.get("cache_creation_input_tokens", 0)
cache_read = usage.get("cache_read_input_tokens", 0)
out = usage.get("output_tokens", 0)
# Sonnet 4.6 rates (lower bound)
sonnet = (inp * 3 + cache_create * 3.75 + cache_read * 0.30 + out * 15) / 1_000_000
# Opus 4.6 rates (the model actually used in this dataset)
opus = (inp * 5 + cache_create * 6.25 + cache_read * 0.50 + out * 25) / 1_000_000
return sonnet, opus
def print_daily(projects):
days = daily_breakdown(projects)
if not days:
print("No sessions found.")
return
print("\n" + "=" * 100)
print("DAILY TOKEN USAGE BREAKDOWN")
print("=" * 100)
print(f"\n{'Date':<12} {'Sessions':>8} {'Total Tokens':>14} {'Input':>12} {'Cache Write':>12} {'Cache Read':>12} {'Output':>12} {'Sub Agents':>10} {'~Cost ($)':>12}")
print("-" * 106)
grand = defaultdict(int)
for date, day in days.items():
sonnet_cost, opus_cost = estimate_cost(day)
print(
f"{date:<12} "
f"{day['sessions']:>8,} "
f"{fmt(day['total_tokens']):>14} "
f"{fmt(day['input_tokens']):>12} "
f"{fmt(day['cache_creation_input_tokens']):>12} "
f"{fmt(day['cache_read_input_tokens']):>12} "
f"{fmt(day['output_tokens']):>12} "
f"{day['subagent_sessions']:>10,} "
f"{'$' + f'{sonnet_cost:.2f}':>5}-{'$' + f'{opus_cost:.2f}':<6}"
)
for k in ["input_tokens", "cache_creation_input_tokens", "cache_read_input_tokens", "output_tokens", "total_tokens", "sessions", "subagent_sessions", "subagent_tokens"]:
grand[k] += day[k]
print("-" * 106)
s_total, o_total = estimate_cost(grand)
print(
f"{'TOTAL':<12} "
f"{grand['sessions']:>8,} "
f"{fmt(grand['total_tokens']):>14} "
f"{fmt(grand['input_tokens']):>12} "
f"{fmt(grand['cache_creation_input_tokens']):>12} "
f"{fmt(grand['cache_read_input_tokens']):>12} "
f"{fmt(grand['output_tokens']):>12} "
f"{grand['subagent_sessions']:>10,} "
f"{'$' + f'{s_total:.2f}':>5}-{'$' + f'{o_total:.2f}':<6}"
)
# Per-day detail: which projects contributed
print("\n\nDAILY DETAIL (projects per day):")
print("-" * 80)
for date, day in days.items():
projs = ", ".join(sorted(day["projects"]))
print(f" {date}: {day['sessions']} sessions — {projs}")
def print_summary(projects):
summaries = []
for project_name, sessions in projects.items():
total = defaultdict(int)
sub_tokens = 0
sub_count = 0
for session in sessions:
for k, v in session["usage"].items():
total[k] += v
for sub in session["subagent_sessions"]:
sub_tokens += sub["total_tokens"]
sub_count += 1
grand_total = sum(total.values())
summaries.append({
"project": project_name,
"sessions": len(sessions),
"usage": dict(total),
"total_tokens": grand_total,
"subagent_tokens": sub_tokens,
"subagent_count": sub_count,
})
summaries.sort(key=lambda x: x["total_tokens"], reverse=True)
grand_total = sum(s["total_tokens"] for s in summaries)
total_sessions = sum(s["sessions"] for s in summaries)
print(f"\n{'=' * 100}")
print("PROJECT SUMMARY")
print(f"{'=' * 100}")
print(f"\nTotal: {fmt(grand_total)} tokens across {total_sessions} sessions in {len(summaries)} projects\n")
print(f"{'Project':<50} {'Sessions':>8} {'Total Tokens':>14} {'Subagents':>10}")
print("-" * 86)
for s in summaries[:30]:
print(f"{s['project']:<50} {s['sessions']:>8,} {fmt(s['total_tokens']):>14} {s['subagent_count']:>10,}")
def main():
print("Scanning ~/.claude/projects/ ...")
projects = analyze_all()
print(f"Found {len(projects)} projects")
print_daily(projects)
print_summary(projects)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Map token usage to 5-hour rolling windows.
Anchored from a known reset time, walks backward to assign every
assistant message to its window based on timestamp.
"""
import json
import os
import sys
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timedelta, timezone
PROJECTS_DIR = Path.home() / ".claude" / "projects"
# Known reset: Apr 6, 2026 7pm BRT = 22:00 UTC
ANCHOR_UTC = datetime(2026, 4, 6, 22, 0, tzinfo=timezone.utc)
WINDOW_HOURS = 5
# Claude Opus 4.6 pricing ($/MTok), verified against
# https://platform.claude.com/docs/en/docs/about-claude/pricing on 2026-04-07
PRICE_INPUT = 5.00
PRICE_CACHE_WRITE = 6.25 # 5-min cache write = 1.25x input
PRICE_CACHE_READ = 0.50 # cache hit = 0.10x input
PRICE_OUTPUT = 25.00
def usd_cost(usage):
"""Estimated $ cost at Opus 4.6 list prices."""
return (
usage.get("input_tokens", 0) * PRICE_INPUT
+ usage.get("cache_creation_input_tokens", 0) * PRICE_CACHE_WRITE
+ usage.get("cache_read_input_tokens", 0) * PRICE_CACHE_READ
+ usage.get("output_tokens", 0) * PRICE_OUTPUT
) / 1_000_000
def get_window_start(ts: datetime) -> datetime:
"""Find which 5-hour window a timestamp falls into."""
# Walk backward from anchor in 5hr steps until we're before ts,
# then step forward once
diff = (ANCHOR_UTC - ts).total_seconds()
windows_back = int(diff // (WINDOW_HOURS * 3600))
candidate = ANCHOR_UTC - timedelta(hours=WINDOW_HOURS * windows_back)
if candidate > ts:
candidate -= timedelta(hours=WINDOW_HOURS)
return candidate
def format_brt(dt: datetime) -> str:
brt = dt - timedelta(hours=3)
return brt.strftime("%b %d %H:%M")
def weekday_brt(dt: datetime) -> str:
brt = dt - timedelta(hours=3)
return brt.strftime("%a")
def parse_ts(ts_str: str) -> datetime:
return datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
def scan_sessions(since_date: str = None):
"""Scan all sessions and yield (timestamp, usage_dict) per assistant message.
Deduplicates by message id GLOBALLY across all files. Two sources of
duplication exist in ~/.claude/projects:
1. Streaming chunks. Claude Code logs each streaming response in 2–3 chunks;
every chunk carries the full input / cache_create / cache_read totals
and a growing output count. Keeping only the LAST entry for each
message id gives the correct final usage tally.
2. Cross-file copies. Auto-compaction subagents (agent-acompact-*.jsonl)
and other agent files contain copies of messages from the parent
session. Without global dedup, these get counted 2x or more.
Together these inflate naive token sums by ~2.5–3x.
"""
cutoff = None
if since_date:
cutoff = datetime.fromisoformat(since_date).replace(tzinfo=timezone.utc)
# Global dedup map: msg_id -> (ts, usage). Last write wins.
# Unkeyed entries (no msg_id) get a synthetic per-line key so they're not
# collapsed into one another.
latest = {}
for project_dir in sorted(PROJECTS_DIR.iterdir()):
if not project_dir.is_dir():
continue
# Recursive scan: subagents live in <session-id>/subagents/agent-*.jsonl
for jsonl_file in sorted(project_dir.rglob("*.jsonl")):
try:
with open(jsonl_file) as f:
for line_no, line in enumerate(f):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get("type") != "assistant":
continue
ts_str = obj.get("timestamp")
if not ts_str:
continue
try:
ts = parse_ts(ts_str)
except (ValueError, TypeError):
continue
if cutoff and ts < cutoff:
continue
msg = obj.get("message", {}) or {}
usage = msg.get("usage", {})
if not usage:
continue
key = (
msg.get("id")
or obj.get("requestId")
or f"_unkeyed_{jsonl_file}_{line_no}"
)
latest[key] = (ts, usage)
except Exception:
continue
for ts, usage in latest.values():
yield ts, usage
def main():
since = sys.argv[1] if len(sys.argv) > 1 else None
print(f"Scanning sessions{' since ' + since if since else ''}...")
windows = defaultdict(lambda: {
"input_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 0,
"total": 0,
"messages": 0,
"first_ts": None,
"last_ts": None,
})
count = 0
for ts, usage in scan_sessions(since):
w = get_window_start(ts)
win = windows[w]
for k in ["input_tokens", "cache_creation_input_tokens", "cache_read_input_tokens", "output_tokens"]:
win[k] += usage.get(k, 0)
win["total"] += sum(usage.get(k, 0) for k in ["input_tokens", "cache_creation_input_tokens", "cache_read_input_tokens", "output_tokens"])
win["messages"] += 1
if win["first_ts"] is None or ts < win["first_ts"]:
win["first_ts"] = ts
if win["last_ts"] is None or ts > win["last_ts"]:
win["last_ts"] = ts
count += 1
print(f"Processed {count:,} assistant messages across {len(windows)} active windows\n")
if not windows:
print("No data found.")
return
# Sort by window start
sorted_windows = sorted(windows.items())
def fmt_m(n):
"""Format token count as millions or thousands."""
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return str(n)
print(f"{'Day':<4} {'Window':<17} {'Active':<15} {'Dur':>4} {'Msgs':>5} {'Total':>7} {'In':>5} {'CaWr':>5} {'CaRd':>7} {'Out':>5} {'$Cost':>9}")
print("-" * 92)
for w_start, w in sorted_windows:
w_end = w_start + timedelta(hours=WINDOW_HOURS)
day = weekday_brt(w_start)
def short_brt(dt):
brt = dt - timedelta(hours=3)
return brt.strftime("%d %H:%M")
label = f"{short_brt(w_start)}-{short_brt(w_end)[-5:]}"
if w["first_ts"] and w["last_ts"]:
active = f"{short_brt(w['first_ts'])[-5:]}-{short_brt(w['last_ts'])[-5:]}"
dur_min = int((w["last_ts"] - w["first_ts"]).total_seconds() / 60)
dur = f"{dur_min // 60}h{dur_min % 60:02d}"
else:
active = ""
dur = ""
cost = usd_cost(w)
print(
f"{day:<4} "
f"{label:<17} "
f"{active:<15} "
f"{dur:>4} "
f"{w['messages']:>5} "
f"{fmt_m(w['total']):>7} "
f"{fmt_m(w['input_tokens']):>5} "
f"{fmt_m(w['cache_creation_input_tokens']):>5} "
f"{fmt_m(w['cache_read_input_tokens']):>7} "
f"{fmt_m(w['output_tokens']):>5} "
f"${cost:>8,.0f}"
)
# Summary
print("-" * 102)
totals = defaultdict(int)
for w in windows.values():
for k, v in w.items():
if k in ("first_ts", "last_ts"):
continue
totals[k] += v
grand_cost = usd_cost(totals)
print(
f"{'TOTAL':<28} "
f"{totals['messages']:>5,} "
f"{totals['total']:>12,} "
f"{totals['input_tokens']:>10,} "
f"{totals['cache_creation_input_tokens']:>10,} "
f"{totals['cache_read_input_tokens']:>12,} "
f"{totals['output_tokens']:>10,} "
f"${grand_cost:>10,.2f}"
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment