Skip to content

Instantly share code, notes, and snippets.

@possibilities
Created April 11, 2026 21:35
Show Gist options
  • Select an option

  • Save possibilities/07b03e033cd631d940f60bda0293250e to your computer and use it in GitHub Desktop.

Select an option

Save possibilities/07b03e033cd631d940f60bda0293250e to your computer and use it in GitHub Desktop.
hookctl pipe events — consumer guide for realtime UIs on top of Claude Code hook data

hookctl pipe events — consumer guide

A guide for app developers who want to build realtime UIs on top of hookctl's mutation stream. Covers both surfaces exposed by the pipeline: the raw per-mutation event stream and the reduced hook_jobs state slice. Pick whichever matches the shape your app actually needs — sometimes it's both.

The big picture

  Claude Code hook fires
        │
        ▼
  hooks-tracker.py
        │  writes ~/.local/state/claude/hooks-tracker.db
        │  queues one mutation envelope per SQL write
        │  flushes the batch as JSONL to a Unix socket
        ▼
  /tmp/prise-pipe-hooks.sock         ← surface #1 (raw mutations)
        │  (prisectl pipe-plug listens here)
        ▼
  prise store                         (apps/prisectl/init.lua)
        │  reducer applies mutations → state.hook_jobs slice
        │  subscriber bus-emits hook_jobs on every change
        ▼
  /tmp/prise-bus-events.sock          ← surface #2 (reduced snapshot)
        │  (prisectl bus-plug broadcasts here)
        ▼
  Your TUI / web UI / Zig binary / Python script / ...

The hook's SQL write is authoritative — the pipe is best-effort replication. If prise isn't running, the hook drops the batch and moves on; the next hook fire picks up fresh. This means your consumer never blocks the agent, and disaster recovery is "read hooks-tracker.db and rebuild from there."

Which surface should I use?

You have two real choices and one anti-choice:

Surface Format Shape Good for
Raw mutations (pipe.hooks) JSONL over AF_UNIX typed inserts & updates, one line per row change Delta-reactive reducers, append-only logs, audit trails, counters, apps that want to see each change in context (e.g. "that PostToolUse was followed by an ExitPlanMode approval"), apps that need to react to events inserts (the only way to see per-hook activity)
Reduced snapshot (bus.hook_jobs) JSON over AF_UNIX full {job_id → job} dict emitted on every change UIs that read current state (status dashboards, job lists, statuslines), apps that can re-render from a snapshot, clients that don't want to think about ordering
❌ Reading hooks-tracker.db directly SQLite the db itself Only for one-shot queries or cold-start bootstraps. Don't poll — you'll miss mid-flight state and thrash WAL

If your app needs both — delta reactions and a current view — you can consume the mutation stream and reduce it yourself. Or you can subscribe to the bus snapshot for the current view and keep the mutation socket open for the narrow slices you need deltas on (e.g. a counter of "tool uses in the last hour" that the snapshot doesn't expose). The two surfaces are independent — opening one doesn't affect the other.

Rule of thumb: if you find yourself writing for diff in compute_diff(prev_snapshot, next_snapshot), you should have been reading the mutation stream instead. If you find yourself maintaining your own reducer for every table hookctl writes, you should have been reading the bus snapshot instead.


Surface #1: raw mutation stream

Connecting

It's a prisectl pipe-plug listener — plain AF_UNIX stream socket at /tmp/prise-pipe-hooks.sock. The pipe plug fans incoming batches into prise as plug.pipe.hooks notifications, but the socket itself is just a write target: any client that connects and writes newline- delimited JSON is a valid sender. For reading, you want the bus socket (see surface #2) — the pipe socket only accepts writes.

In other words: if you're building something that wants to receive mutations, you subscribe to the bus (surface #2, re-emitting the reduced slice) or you write your own reducer inside prise's init.lua and register store:on_plug_method("pipe.hooks", ...). The raw pipe is not a client-facing fanout — it's the inbound seam between the hook and prise.

If you actually want to watch the raw mutations as they arrive from outside prise, the easiest path is: have init.lua bus-emit the mutation batch unchanged under a new method name (e.g. bus_emit("hook_mutations", params.events)), then subscribe to the bus.

Envelope shape

Every flush writes one or more JSON objects, newline-terminated, in a single sendall. The pipe plug's coalescing window re-batches these into one plug.pipe.hooks notification with params.events = [mutation, mutation, ...]. Shape inside each mutation:

// Insert
{
  "op": "insert",
  "table": "events" | "jobs" | "job_sessions" | "job_name_history",
  "row": { ... }
}

// Update
{
  "op": "update",
  "table": "jobs",
  "pk": { "job_id": "..." },
  "set": { "column": "new_value", "other_column": ... }
}

No deletes. hookctl's tables are append-only on the event side and only jobs mutates in place — and even then, job rows are never removed, only state-transitioned through running → working → stopped → ended.

Mutation catalog

Here's every mutation hooks-tracker.py emits, what triggers it, and what the row looks like. This is the full surface. If it's not listed here, it's not in the stream.

insert into events

When: every hook fire. Once per Claude Code hook event.

{
  "op": "insert",
  "table": "events",
  "row": {
    "ts": 1712345678.901,
    "session_id": "sess-abc",
    "pid": 72840,
    "hook_event": "PreToolUse",         // Claude Code event name
    "event_type": "pre_tool_use",       // snake_cased, or "session_start"/"tool_use"/"stop"
    "tool_name": "Bash" | null,
    "matcher": "^(?!AskUser).*" | null,
    "cwd": "/Users/mike/..." | null,
    "permission_mode": "plan" | "act" | "acceptEdits" | null,
    "agent_id": "..." | null,           // subagent events only
    "agent_type": "Explore" | null,
    "stop_hook_active": true | false | null   // Stop events only
  }
}

What's NOT in the row: the bulky data column (the raw JSON input blob). If your consumer needs the full payload, read it from hooks-tracker.db by (ts, session_id) — the pipe event is the breadcrumb, not the whole thing.

Fires for every hook. Big list: SessionStart, SessionEnd, UserPromptSubmit, PreToolUse, PostToolUse, PostToolUseFailure, PermissionRequest, Stop, Notification, SubagentStart, SubagentStop, PreCompact, TeammateIdle, TaskCompleted, InstructionsLoaded, ConfigChange, WorktreeCreate, WorktreeRemove. See apps/hookctl/hooks/hooks.json for the full current registration.

insert into jobs

When: fresh job lifecycle starts. Specifically, SessionStart with source in {"startup", "new", ""} — a brand-new Claude session (not a resume, not a /clear).

{
  "op": "insert",
  "table": "jobs",
  "row": {
    "job_id": "sess-abc",
    "created_at": 1712345678.901
  }
}

job_id equals the session_id that initially created the job. The row is sparse by design — subsequent update events fill in name, mode, state, tmux/prise metadata, etc.

insert into job_sessions

When: a Claude process attaches to a job. Fires on every SessionStart regardless of source, because even resumes and clears attach a new (session_id, pid) pair.

{
  "op": "insert",
  "table": "job_sessions",
  "row": {
    "job_id": "sess-abc",
    "session_id": "sess-def",
    "pid": 72840,
    "added_at": 1712345678.901
  }
}

The first insert for a job usually has session_id == job_id (the session that created the job). Resumes add new (session_id, pid) rows. /clear adds new sessions under the same job_id — that's how job identity survives context clears.

Use this to build "what sessions belong to this job?" views. Every pid that has ever owned the job is here; you'll want to filter on liveness separately (check the parent state table or run ps).

insert into job_name_history

When: a new unique (job_id, name) pair is recorded. Sources:

  • SessionStart with a --name CLI flag picked up from the process tree walk
  • Stop events that detect a new transcript title (from /rename or Claude's auto-naming)
{
  "op": "insert",
  "table": "job_name_history",
  "row": {
    "job_id": "sess-abc",
    "name": "fix flaky test",
    "recorded_at": 1712345678.901
  }
}

Gotcha: the underlying table has UNIQUE(job_id, name) and the SQL is INSERT OR IGNORE. hookctl emits the mutation optimistically — if the row was a duplicate and the SQL actually no-op'd, the pipe event still fires. Your consumer should deduplicate by (job_id, name) if it cares about "is this a new name we haven't seen?".

This is an audit-trail surface. The current name lives on the jobs row and arrives via an update (below).

update on jobs{"state": ...}

When: the job transitions between lifecycle states. States:

  • running — job created, agent not actively thinking (initial state after SessionStart resume)
  • working — agent is doing work (set on UserPromptSubmit, also on SessionStart if a prompt was passed via -p / positional arg)
  • stopped — agent finished a turn and is idle (set on Stop)
  • ended — session terminated (set on SessionEnd)
{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"state":"working"}}

Transitions fire one update each. working → stopped → working → stopped → ... is the normal heartbeat of an active session.

update on jobs{"mode": ...}

When: the job transitions between plan and act mode. Modes:

  • plan — Claude is planning, not executing
  • act — Claude is executing

Fires from several places:

  • Initial mode on SessionStart (settings.json + --permission-mode CLI flag)
  • PostToolUse on ExitPlanMode — plan approved, transitioning to act
  • SessionStart source="clear" immediately after an ExitPlanMode PreToolUse — plan approval option 1 (clear + act)
  • Any hook event with a permission_mode in the payload — keeps the state row current with the live permission mode
{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"mode":"act"}}

update on jobs{"waiting_on": ...}

When: the job is blocked waiting on a human decision.

  • "plan_approval" — an ExitPlanMode permission request is pending
  • "question" — an AskUserQuestion permission request is pending
  • null — the permission request resolved (PostToolUse)
{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"waiting_on":"plan_approval"}}

Drive your "this job needs attention" UI off this field.

update on jobs{"name": ..., "name_dirty": 1}

When: the job's human-readable name is set or changed. Sources same as job_name_history inserts (--name CLI flag, transcript custom-title).

{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"name":"fix flaky test","name_dirty":1}}

The name_dirty flag is an arthack-repo-internal "this needs to be synced" signal for downstream tooling. Feel free to ignore it.

update on jobs{"plan_text": ...}

When: an ExitPlanMode PreToolUse fires with a plan payload. The full plan text is stored so the dispatcher and downstream callbacks can introspect it.

{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"plan_text":"..."}}

Plans can be long — prefer to render a preview (line count, first line, etc.) rather than displaying the whole text.

update on jobs{"context_cleared": 0|1}

When: SessionStart source="clear" fires — the user cleared context and the same job continues on a new session_id. Also reset to 0 on ExitPlanMode PostToolUse (plan approved without clear).

{"op":"update","table":"jobs","pk":{"job_id":"sess-abc"},"set":{"context_cleared":1}}

update on jobs{"tmux_session":..., "tmux_window":..., "tmux_pane":...}

When: SessionStart inside tmux, new/startup source only. Captured via tmux display-message at job creation.

{
  "op": "update",
  "table": "jobs",
  "pk": {"job_id": "sess-abc"},
  "set": {
    "tmux_session": "arthack",
    "tmux_window": "1",
    "tmux_pane": "0"
  }
}

Useful for "send a message to that tmux pane" or "focus the human on this job's terminal." Sticky for the job lifetime — we don't track retargeting.

update on jobs{"prise_session":..., "prise_pty":..., "prise_socket":...}

When: SessionStart inside prise (env vars PRISE_SESSION, PRISE_PTY, PRISE_SOCKET present). Same timing as tmux capture.

{
  "op": "update",
  "table": "jobs",
  "pk": {"job_id": "sess-abc"},
  "set": {
    "prise_session": "arthack",
    "prise_pty": "42",
    "prise_socket": "/tmp/prise-501.sock"
  }
}

Analogous to the tmux capture. Use either pair depending on your multiplexer.

Ordering guarantees

Within a single hook invocation: strictly ordered. The hook queues mutations in call order and flushes them with one sendall. Your consumer sees them in the exact order hooks-tracker.py performed the SQL writes — e.g. the events insert always precedes the jobs update it triggered.

Across hook invocations: ordered by hook completion, not hook fire. Two concurrent Claude sessions running hooks at the same time will serialize through the kernel's socket queue. Whichever hook's _flush_mutations() hits sendall first goes first. The ts field in every row is authoritative — prefer that over arrival order for any time-based logic.

After a reconnect: the stream has no replay. If your consumer drops the bus connection and reconnects, you get whatever arrives next — no backfill. For cold-start, read hooks-tracker.db once and then listen for deltas. The reduced hook_jobs snapshot (surface #2) re-emits on connect, so if you only need current state, consuming the bus is sufficient.

Hookctl never emits delete events. Jobs, sessions, events, and name history are all append-only. If you want to prune ended jobs from your UI, drive that off state == "ended" or your own TTL.

Forward compatibility

The reducer in apps/prisectl/init.lua explicitly ignores unknown (op, table) pairs. If hookctl grows a new table or a new update shape, older consumers silently skip it and keep working on the tables they do understand. Write your consumer the same way — don't crash on unknown mutations, don't assert on keys you haven't seen before. row dicts may grow new columns between releases; set dicts may carry columns your code doesn't know yet.


Surface #2: reduced state via the bus

The lua reducer in apps/prisectl/init.lua folds the mutation stream into state.hook_jobs, a per-job dict keyed by job_id. On every change, bus_emit("hook_jobs", state.hook_jobs) broadcasts the full snapshot over the bus plug's /tmp/prise-bus-events.sock.

Late-joiner bootstrap: when a new client connects to the bus, bus.client_connected fires and the full hook_jobs snapshot is re-emitted. Your consumer can connect fresh and the first message tells it "here's the current state" — no backfill dance required.

Connecting

import socket
import json

s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect("/tmp/prise-bus-events.sock")
buf = b""
while True:
    chunk = s.recv(4096)
    if not chunk:
        break
    buf += chunk
    while b"\n" in buf:
        line, buf = buf.split(b"\n", 1)
        event = json.loads(line)
        if event["type"] == "hook_jobs":
            jobs = event["data"]
            for job_id, job in jobs.items():
                print(f"{job_id[:8]}  {job.get('state','?')}  {job.get('name','-')}")
        # (ignore other event types — pty_spawned, cwd_changed, plugs, etc.)

Bun / TypeScript: same idea with Bun.connect({ unix: ... }). Zig: std.net.connectUnixSocket(...) — prisectl-ui does it this way for plug-status.

hook_jobs shape

type HookJobs = Record<string, HookJob>;

interface HookJob {
  job_id: string;
  created_at: number;         // unix timestamp from the initial jobs insert

  // Filled in progressively by update events — may be missing for new/partial jobs
  name?: string;
  name_dirty?: 0 | 1;
  state?: "running" | "working" | "stopped" | "ended";
  mode?: "plan" | "act";
  waiting_on?: "plan_approval" | "question" | null;
  plan_text?: string;
  context_cleared?: 0 | 1;

  tmux_session?: string;
  tmux_window?: string;
  tmux_pane?: string;

  prise_session?: string;
  prise_pty?: string;
  prise_socket?: string;

  // Derived by the reducer, not from any single update:
  sessions: Record<string, { pid: number; added_at: number }>;
  last_event_at?: number;
  last_event_type?: string;   // snake_cased hook event type
  event_count: number;
}

sessions is built up from job_sessions inserts — one entry per (session_id, pid) pair ever attached to the job. If you care about "is this session still live?" you'll need to check the parent state or ps yourself; the reducer doesn't track liveness.

last_event_at / last_event_type / event_count come from the reducer seeing events inserts land on sessions it owns. This is your "is this job alive?" heartbeat signal — a job whose last_event_at is >5 minutes old is probably idle or crashed.

Every emission is a full snapshot

When the reducer applies a mutation batch, it fans out the entire hook_jobs table, not a delta. This is deliberate:

  • Bus clients can disconnect and reconnect without missing state
  • Idempotent receive logic: just replace your local copy with the incoming snapshot
  • No ordering anxiety for your consumer

Cost: the snapshot grows with the number of jobs. For typical sessions this is a few KB at most, but a long-running background agent fleet could bloat it. If you're building something resource-sensitive, consider reading from the mutation stream instead and maintaining your own reduced view.


Example consumers

Minimal: "what jobs are currently waiting on the human?"

import json
import socket

s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect("/tmp/prise-bus-events.sock")
buf = b""
while True:
    chunk = s.recv(4096)
    if not chunk:
        break
    buf += chunk
    while b"\n" in buf:
        line, buf = buf.split(b"\n", 1)
        event = json.loads(line)
        if event["type"] != "hook_jobs":
            continue
        waiting = [
            (jid, j) for jid, j in event["data"].items()
            if j.get("waiting_on")
        ]
        if not waiting:
            print("(nothing waiting)")
            continue
        for jid, j in waiting:
            print(f"{j.get('name', jid[:8])}: {j['waiting_on']}")

Every time hook_jobs changes, you get a re-renderable list. That's the whole program.

Delta-reactive: count tool uses per session in real time

The bus snapshot doesn't expose per-tool counts, so you want the mutation stream. Easiest path: add a reducer to init.lua that maintains a counter off events inserts, then bus-emit it:

-- In apps/prisectl/init.lua, alongside the existing reducers:
store:on_plug_method("pipe.hooks", function(state, params)
    for _, m in ipairs(params.events or {}) do
        if m.op == "insert" and m.table == "events" then
            local row = m.row or {}
            if row.event_type == "tool_use" then
                state.tool_uses = (state.tool_uses or 0) + 1
            end
        end
    end
end)

store:subscribe(function(s) return s.tool_uses end, function(n)
    bus_emit("tool_uses", { count = n })
end)

Now your external consumer reads tool_uses events off the bus. This is the general pattern: put reducers in init.lua, bus-emit the computed slice, subscribe from outside.

Audit trail: log every mutation to a file

-- Dump the raw mutation batch under a new bus method:
store:on_plug_method("pipe.hooks", function(_state, params)
    bus_emit("hook_mutations", params.events or {})
end)

Consumer:

import json
import socket

with open("/tmp/hookctl-audit.jsonl", "a") as f:
    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    s.connect("/tmp/prise-bus-events.sock")
    buf = b""
    while True:
        chunk = s.recv(4096)
        if not chunk:
            break
        buf += chunk
        while b"\n" in buf:
            line, buf = buf.split(b"\n", 1)
            event = json.loads(line)
            if event["type"] != "hook_mutations":
                continue
            for m in event["data"]:
                f.write(json.dumps(m) + "\n")
            f.flush()

Hybrid: current state + deltas

Build your reducer in the language you're most comfortable in. Open the bus; for every hook_jobs message, replace your local state. For every hook_mutations message, append to an activity feed.

class HookState:
    def __init__(self):
        self.jobs = {}        # replaced wholesale on hook_jobs
        self.recent = []      # append-only on hook_mutations

    def apply(self, event):
        if event["type"] == "hook_jobs":
            self.jobs = event["data"]
        elif event["type"] == "hook_mutations":
            self.recent.extend(event["data"])
            self.recent = self.recent[-500:]   # cap

The UI renders state.jobs for "what's going on" and taps state.recent for "show me the last 500 things that happened."


Gotchas and edge cases

The socket isn't there

Prise isn't running. hooks-tracker.py's flush is a no-op (it checks os.path.exists first), so there's no error; the db write still succeeds. Your consumer just waits. When prise comes up, the next hook fire delivers its mutations and you start receiving events.

If you need replay on startup, read hooks-tracker.db directly (the schema matches what the mutation stream describes) and then listen for deltas. The CLI jobctl list-jobs is one ready-made reader.

Ordering inside a batch

Mutations within a batch are in call order within a single hook. Across hooks, they arrive in whatever order the kernel socket queue accepts them. The ts on every row is the authoritative timestamp — don't infer ordering from arrival.

Bulky payloads

The events insert row deliberately omits the data column (the raw JSON blob). If you need it, read it from the db:

import sqlite3
conn = sqlite3.connect("~/.local/state/claude/hooks-tracker.db")
row = conn.execute(
    "SELECT data FROM events WHERE session_id = ? AND ts = ?",
    (session_id, ts),
).fetchone()
blob = json.loads(row[0]) if row else None

This is also the right pattern for historical queries: the pipe is for realtime, the db is for everything.

Dedupe on job_name_history

As noted in the mutation catalog: job_name_history inserts fire optimistically. If a name repeats and the SQL actually INSERT OR IGNORE'd, you still see the mutation. Dedupe by (job_id, name) if you're maintaining a unique-name set.

Jobs never disappear

No delete mutations exist. If your UI wants to drop ended jobs, filter on state != "ended" or age-out on created_at. The db doesn't garbage-collect either — hooks-tracker.db grows forever until someone truncates it.

Time resolution

ts is a Python float (sub-second precision), recorded_at / added_at are the same. Don't rely on monotonicity — NTP fixups and clock skew can make later writes have earlier timestamps in degenerate cases.


Related reading

  • apps/hookctl/hooks/hooks-tracker.py — the emitter. Read this if you want to know exactly when a mutation fires.
  • apps/hookctl/README.md#hooks-tracker — overview and config.
  • apps/prisectl/init.lua — the reducer. Read this if you want to add new reduced slices.
  • apps/prisectl/docs/plugs/pipe.md — the inbound pipe plug's protocol. The hooks socket is one of several.
  • apps/prisectl/docs/plugs/bus.md — the outbound bus plug's protocol. This is where your consumer connects.
  • apps/jobctl/jobctl/api.py — a ready-made reader for hooks-tracker.db if you want a synchronous view without the pipe round-trip. Use list_jobs() for a snapshot; list_job_name_history() for the audit trail.
  • ~/.local/state/claude/hooks-tracker.db — the source of truth. Schema matches the mutation catalog above. Safe to read while the hook is writing (WAL mode).

Summary — which thing should I reach for?

You want... Use
Current snapshot of all Claude jobs, re-rendered on change bus_emit("hook_jobs") — subscribe once, read current state forever
React to this specific transition (e.g. waiting_on became non-null) Same snapshot — diff against your previous copy
Per-hook activity feed ("show me the last 50 things") Pipe a mutation-stream passthrough into the bus (see "Audit trail" example)
Custom derived counter / aggregation Add a reducer in init.lua, bus-emit the result
Cold-start with full history Read hooks-tracker.db directly (via jobctl.api), then listen for deltas
Ad-hoc query over historical data hooks-tracker.db — don't use the pipe
Tail "has anything happened lately?" events insert rate on the mutation stream, or watch last_event_at on hook_jobs

If you're building something new and don't know which surface to start with, start with the bus hook_jobs snapshot. It's the simplest to consume, handles late-join cleanly, and covers the common case. Drop to the mutation stream only when your reducer needs to do more than "replace the current view."


Metadata

cwd: /Users/mike/code/arthack/apps/hookctl
session-id: 9ba42ccf-55ce-4128-a489-65cf09366f68
session-name: implement-pipe-hooks-realtime
path: /Users/mike/docs/hookctl-pipe-events.md
cd /Users/mike/code/arthack/apps/hookctl && claude --resume 9ba42ccf-55ce-4128-a489-65cf09366f68
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment