Skip to content

Instantly share code, notes, and snippets.

@nick0lay
Created April 18, 2026 19:47
Show Gist options
  • Select an option

  • Save nick0lay/b473fc71ef413dfa46885a6b132abd2b to your computer and use it in GitHub Desktop.

Select an option

Save nick0lay/b473fc71ef413dfa46885a6b132abd2b to your computer and use it in GitHub Desktop.
Claude Code Git Worktree Manager
#!/usr/bin/env python3
"""parallel-agents helper CLI.
Manages config/state for the parallel-agents plugin and performs the
mechanical parts of worktree/task orchestration. Intended to be called
from slash commands; emits JSON on stdout for easy consumption.
"""
from __future__ import annotations
import argparse
import copy
import datetime as _dt
import json
import os
import re
import signal
import subprocess
import sys
import time
import uuid
from pathlib import Path
from typing import Any
DEFAULT_CONFIG: dict[str, Any] = {
"version": 1,
"worktree_root": ".claude/worktrees",
"max_worktrees_per_repo": 4,
"monitoring": {
"_comment_mode": (
"live = after 'task start', the skill also runs 'task wait' in a "
"Bash background and attaches the Monitor tool, so the assistant "
"is notified when events arrive. wait = no live watcher; the "
"assistant polls via ScheduleWakeup or when the user asks."
),
"mode": "live",
"_comment_event_granularity": (
"Only applies when mode=live. Controls what 'task wait' emits on "
"stdout (each line wakes the assistant up via Monitor). "
"exit = one WATCHING line + one EXITED line per task — quiet, "
"recommended default, good for 'notify me when it's done'. "
"tool = also emit one TOOL line per tool-use event inside the "
"headless session (Edit/Write/Bash/…). Useful for narrating "
"progress; higher wake-up cost. "
"all = stream every --include-hook-events JSON line. Very noisy, "
"debug only."
),
"event_granularity": "exit",
},
"repositories": [],
}
DEFAULT_STATE: dict[str, Any] = {
"version": 1,
"gen_seq": 0,
"tasks": [],
}
CONFIG_FILE = "config.json"
STATE_FILE = "state.json"
EXCLUDE_MARKER = "# parallel-agents: managed worktree path"
EXCLUDE_PATTERN = ".claude/worktrees/"
def config_path(root: Path) -> Path:
return root / CONFIG_FILE
def state_path(root: Path) -> Path:
return root / STATE_FILE
def _load_json(path: Path, default: dict[str, Any]) -> dict[str, Any]:
if not path.exists():
return copy.deepcopy(default)
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _save_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
tmp.replace(path)
def _migrate_config(cfg: dict[str, Any]) -> bool:
"""Add missing top-level sections so consumers can rely on them being present.
Returns True if `cfg` was mutated. Call before reading any optional section.
"""
changed = False
if "monitoring" not in cfg:
cfg["monitoring"] = copy.deepcopy(DEFAULT_CONFIG["monitoring"])
changed = True
else:
monitoring = cfg["monitoring"]
if not isinstance(monitoring, dict):
cfg["monitoring"] = copy.deepcopy(DEFAULT_CONFIG["monitoring"])
changed = True
else:
if "mode" not in monitoring:
monitoring["mode"] = DEFAULT_CONFIG["monitoring"]["mode"]
changed = True
if "event_granularity" not in monitoring:
monitoring["event_granularity"] = DEFAULT_CONFIG["monitoring"]["event_granularity"]
changed = True
return changed
def load_config(root: Path) -> dict[str, Any]:
cfg = _load_json(config_path(root), DEFAULT_CONFIG)
_migrate_config(cfg) # in-memory only — ensures defaults without touching the file
return cfg
def save_config(root: Path, cfg: dict[str, Any]) -> None:
_save_json(config_path(root), cfg)
def load_state(root: Path) -> dict[str, Any]:
return _load_json(state_path(root), DEFAULT_STATE)
def save_state(root: Path, st: dict[str, Any]) -> None:
_save_json(state_path(root), st)
def discover_sibling_git_repos(start: Path) -> list[Path]:
"""Return sibling directories of `start` that contain a `.git` entry."""
start = start.resolve()
parent = start.parent
if not parent.is_dir():
return []
results: list[Path] = []
for child in sorted(parent.iterdir()):
if not child.is_dir():
continue
if child.resolve() == start:
continue
if (child / ".git").exists():
results.append(child)
return results
def _resolve_git_dir(repo_path: Path) -> Path:
"""Resolve the real `.git` directory for a repo or worktree checkout."""
git_entry = repo_path / ".git"
if git_entry.is_dir():
return git_entry
if git_entry.is_file():
content = git_entry.read_text(encoding="utf-8")
for line in content.splitlines():
if line.startswith("gitdir:"):
ref = line.split(":", 1)[1].strip()
return (repo_path / ref).resolve()
raise FileNotFoundError(f"No .git entry at {repo_path}")
def add_to_git_exclude(repo_path: Path, pattern: str = EXCLUDE_PATTERN) -> bool:
"""Append `pattern` to `<repo>/.git/info/exclude` if not already present.
Returns True when the exclude file was modified.
"""
git_dir = _resolve_git_dir(repo_path)
info_dir = git_dir / "info"
info_dir.mkdir(parents=True, exist_ok=True)
exclude_file = info_dir / "exclude"
existing = exclude_file.read_text(encoding="utf-8") if exclude_file.exists() else ""
if pattern in existing.splitlines():
return False
new_content = existing
if new_content and not new_content.endswith("\n"):
new_content += "\n"
new_content += f"{EXCLUDE_MARKER}\n{pattern}\n"
exclude_file.write_text(new_content, encoding="utf-8")
return True
_SLUG_RE = re.compile(r"[^a-z0-9]+")
_PRD_ID_RE = re.compile(r"^PRD-\d+$")
_GEN_ID_RE = re.compile(r"^GEN-\d+$")
ACTIVE_STATUSES = {"created", "running", "exited", "failed", "awaiting-merge"}
DEFAULT_CLAUDE_BIN = os.environ.get("PARALLEL_AGENTS_CLAUDE_BIN", "claude")
def slugify(text: str, *, max_len: int = 40) -> str:
s = _SLUG_RE.sub("-", text.strip().lower()).strip("-")
if len(s) > max_len:
s = s[:max_len].rstrip("-")
return s or "task"
def _now_iso() -> str:
return _dt.datetime.now(_dt.timezone.utc).isoformat(timespec="seconds")
def allocate_generic_id(state: dict[str, Any]) -> str:
"""Mutate `state` to bump `gen_seq`, return the next `GEN-NNN` id."""
state["gen_seq"] = int(state.get("gen_seq", 0)) + 1
return f"GEN-{state['gen_seq']:03d}"
def _find_repo(cfg: dict[str, Any], name: str) -> dict[str, Any]:
for r in cfg["repositories"]:
if r["name"] == name:
return r
raise SystemExit(f"Unknown repository: {name}")
def _active_tasks_for_repo(state: dict[str, Any], repo_name: str) -> list[dict[str, Any]]:
return [
t for t in state["tasks"]
if t["repo"] == repo_name and t["status"] in ACTIVE_STATUSES
]
def _find_active_task(state: dict[str, Any], task_id: str) -> dict[str, Any] | None:
for t in state["tasks"]:
if t["id"] == task_id and t["status"] in ACTIVE_STATUSES:
return t
return None
def _run_git(args: list[str], *, cwd: Path | None = None) -> subprocess.CompletedProcess[str]:
cmd = ["git"]
if cwd is not None:
cmd += ["-C", str(cwd)]
cmd += args
return subprocess.run(cmd, capture_output=True, text=True)
def _git_worktree_add(repo_path: Path, worktree_path: Path, branch: str, base_branch: str) -> None:
worktree_path.parent.mkdir(parents=True, exist_ok=True)
result = _run_git(
["worktree", "add", "-b", branch, str(worktree_path), base_branch],
cwd=repo_path,
)
if result.returncode != 0:
raise SystemExit(f"git worktree add failed: {result.stderr.strip() or result.stdout.strip()}")
def _git_worktree_remove(repo_path: Path, worktree_path: Path, *, force: bool = False) -> None:
args = ["worktree", "remove"]
if force:
args.append("--force")
args.append(str(worktree_path))
result = _run_git(args, cwd=repo_path)
if result.returncode != 0:
raise SystemExit(f"git worktree remove failed: {result.stderr.strip() or result.stdout.strip()}")
def _git_delete_branch(repo_path: Path, branch: str, *, force: bool = True) -> None:
flag = "-D" if force else "-d"
result = _run_git(["branch", flag, branch], cwd=repo_path)
if result.returncode != 0:
raise SystemExit(f"git branch delete failed: {result.stderr.strip() or result.stdout.strip()}")
def detect_default_branch(repo_path: Path) -> str | None:
"""Best-effort detection of the default branch for a registered repo."""
try:
result = subprocess.run(
["git", "-C", str(repo_path), "symbolic-ref", "--short", "refs/remotes/origin/HEAD"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
ref = result.stdout.strip()
if "/" in ref:
return ref.split("/", 1)[1]
except FileNotFoundError:
return None
return None
def cmd_init(root: Path, *, discover_from: Path | None = None) -> dict[str, Any]:
cfg_existed = config_path(root).exists()
st_existed = state_path(root).exists()
# Load raw config so we know whether migration mutated it; load_config() would
# return a migrated copy regardless of whether the on-disk file had the fields.
cfg = _load_json(config_path(root), DEFAULT_CONFIG)
mutated = _migrate_config(cfg)
st = load_state(root)
if not cfg_existed or mutated:
save_config(root, cfg)
if not st_existed:
save_state(root, st)
siblings: list[str] = []
if discover_from is not None:
siblings = [str(p) for p in discover_sibling_git_repos(discover_from)]
return {
"root": str(root),
"config_created": not cfg_existed,
"state_created": not st_existed,
"config": cfg,
"state": st,
"siblings": siblings,
}
def cmd_repo_add(
root: Path,
*,
name: str,
path: str,
default_branch: str = "main",
branch_prefix_prd: str = "prd/",
branch_prefix_generic: str = "task/",
) -> dict[str, Any]:
cfg = load_config(root)
repo_path = Path(path).expanduser().resolve()
if not repo_path.is_dir():
raise SystemExit(f"Path does not exist: {repo_path}")
if not (repo_path / ".git").exists():
raise SystemExit(f"Not a git repo: {repo_path}")
for r in cfg["repositories"]:
if r["name"] == name:
raise SystemExit(f"Repository '{name}' already registered")
if Path(r["path"]).resolve() == repo_path:
raise SystemExit(f"Path already registered under name '{r['name']}'")
entry = {
"name": name,
"path": str(repo_path),
"default_branch": default_branch,
"branch_prefix_prd": branch_prefix_prd,
"branch_prefix_generic": branch_prefix_generic,
}
cfg["repositories"].append(entry)
save_config(root, cfg)
excluded = add_to_git_exclude(repo_path)
return {"added": entry, "exclude_updated": excluded}
def cmd_repo_list(root: Path) -> dict[str, Any]:
return {"repositories": load_config(root)["repositories"]}
def cmd_discover(start: Path) -> dict[str, Any]:
return {"siblings": [str(p) for p in discover_sibling_git_repos(start)]}
def cmd_task_create(
root: Path,
*,
task_type: str,
description: str,
repo_name: str,
task_id: str | None = None,
slug: str | None = None,
blocked_by: list[str] | None = None,
) -> dict[str, Any]:
if task_type not in ("prd", "generic"):
raise SystemExit(f"Invalid task type: {task_type}")
cfg = load_config(root)
state = load_state(root)
repo = _find_repo(cfg, repo_name)
cap = int(cfg.get("max_worktrees_per_repo", 4))
active = _active_tasks_for_repo(state, repo_name)
if len(active) >= cap:
raise SystemExit(
f"Worktree cap reached for repo '{repo_name}' ({len(active)}/{cap}). "
f"Destroy a task first."
)
if task_type == "prd":
if not task_id:
raise SystemExit("PRD task requires --task-id")
if not _PRD_ID_RE.fullmatch(task_id):
raise SystemExit(f"Invalid PRD id: {task_id} (expected PRD-<digits>)")
else:
if task_id is None:
task_id = allocate_generic_id(state)
elif not _GEN_ID_RE.fullmatch(task_id):
raise SystemExit(f"Invalid generic id: {task_id} (expected GEN-<digits>)")
if _find_active_task(state, task_id) is not None:
raise SystemExit(f"Active task already exists: {task_id}")
effective_slug = slug or slugify(description)
prefix = repo["branch_prefix_prd"] if task_type == "prd" else repo["branch_prefix_generic"]
branch = f"{prefix}{task_id}-{effective_slug}"
repo_path = Path(repo["path"])
worktree_path = repo_path / cfg["worktree_root"] / task_id
_git_worktree_add(repo_path, worktree_path, branch, repo["default_branch"])
now = _now_iso()
task = {
"id": task_id,
"type": task_type,
"description": description,
"repo": repo_name,
"branch": branch,
"worktree_path": str(worktree_path.resolve()),
"status": "created",
"session_id": None,
"session_name": None,
"log_path": None,
"initial_prompt": None,
"queued_prompts": [],
"blocked_by": list(blocked_by or []),
"process": None,
"pr": None,
"created_at": now,
"updated_at": now,
}
state["tasks"].append(task)
save_state(root, state)
return {"task": task}
def cmd_task_list(root: Path, *, include_history: bool = False) -> dict[str, Any]:
tasks = load_state(root)["tasks"]
if not include_history:
tasks = [t for t in tasks if t["status"] in ACTIVE_STATUSES]
return {"tasks": tasks}
def _process_alive(pid: int) -> bool:
"""Return True if a process with `pid` currently exists and is not a zombie.
Zombies matter because when paw.py spawned the process itself, its exit turns
it into a zombie in our process table — `kill(pid, 0)` still says "alive"
until someone waits on it. We waitpid non-blockingly first to reap any such
zombie we own.
"""
if pid <= 0:
return False
try:
wpid, _status = os.waitpid(pid, os.WNOHANG)
if wpid == pid:
return False
if wpid == 0:
return True
except ChildProcessError:
pass # Not our child (or already reaped) — fall through to kill probe.
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _kill_process_tree(pid: int, *, timeout: float = 2.0) -> bool:
"""SIGTERM a pid, wait up to `timeout`, SIGKILL if still alive. Returns True if it exited."""
if not _process_alive(pid):
return True
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
return True
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if not _process_alive(pid):
return True
time.sleep(0.05)
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
return True
time.sleep(0.05)
return not _process_alive(pid)
def _tail_file(path: Path, n: int = 20) -> list[str]:
if not path.exists():
return []
try:
text = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return []
return text.splitlines()[-n:]
def build_claude_command(
*,
prompt: str,
session_id: str,
session_name: str | None = None,
resume: bool = False,
claude_bin: str = DEFAULT_CLAUDE_BIN,
dangerously_skip_permissions: bool = True,
include_hook_events: bool = True,
) -> list[str]:
"""Assemble argv for a headless `claude` invocation.
First-launch shape: `claude -p <prompt> --session-id <id> --name <name> …`
Resume shape: `claude --resume <id> -p <prompt> …`
"""
args: list[str] = [claude_bin]
if resume:
args += ["--resume", session_id, "-p", prompt]
else:
args += ["-p", prompt, "--session-id", session_id]
if session_name:
args += ["--name", session_name]
if dangerously_skip_permissions:
args.append("--dangerously-skip-permissions")
if include_hook_events:
args.append("--include-hook-events")
return args
BROADCAST_TARGET_STATUSES = {"created", "running", "exited"}
def _spawn_claude_for_task(
root: Path,
task: dict[str, Any],
prompt: str,
*,
claude_bin: str = DEFAULT_CLAUDE_BIN,
) -> None:
"""Start `claude` in the task's worktree, mutate task state to reflect it."""
worktree_path = Path(task["worktree_path"])
if not worktree_path.is_dir():
raise SystemExit(f"Worktree missing for {task['id']}: {worktree_path}")
is_resume = bool(task.get("session_id"))
session_id = task.get("session_id") or str(uuid.uuid4())
session_name = task.get("session_name") or f"{task['id']}: {task['description']}"
log_path = _log_path_for(root, task["id"])
log_path.parent.mkdir(parents=True, exist_ok=True)
cmd = build_claude_command(
prompt=prompt,
session_id=session_id,
session_name=session_name,
resume=is_resume,
claude_bin=claude_bin,
)
log_fh = log_path.open("ab")
try:
proc = subprocess.Popen(
cmd,
cwd=str(worktree_path),
stdout=log_fh,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
finally:
log_fh.close()
now = _now_iso()
task["session_id"] = session_id
task["session_name"] = session_name
task["log_path"] = str(log_path)
if not task.get("initial_prompt"):
task["initial_prompt"] = prompt
task["process"] = {
"pid": proc.pid,
"started_at": now,
"exited_at": None,
"exit_code": None,
}
task["status"] = "running"
task["updated_at"] = now
def _drain_queue_if_idle(
root: Path,
task: dict[str, Any],
*,
claude_bin: str = DEFAULT_CLAUDE_BIN,
) -> bool:
"""If the task is idle and has queued prompts, spawn the next one. Returns True if spawned."""
if task["status"] not in ("created", "exited"):
return False
queue = task.get("queued_prompts") or []
if not queue:
return False
next_prompt = queue.pop(0)
task["queued_prompts"] = queue
_spawn_claude_for_task(root, task, next_prompt, claude_bin=claude_bin)
return True
def _log_path_for(root: Path, task_id: str) -> Path:
return root / "logs" / f"{task_id}.jsonl"
def _reap_if_exited(task: dict[str, Any]) -> bool:
"""If the task is marked running but the process is gone, update it. Return True if changed."""
if task.get("status") != "running":
return False
proc = task.get("process") or {}
pid = proc.get("pid")
if not isinstance(pid, int):
return False
if _process_alive(pid):
return False
proc["exited_at"] = _now_iso()
task["process"] = proc
task["status"] = "exited"
task["updated_at"] = _now_iso()
return True
def _find_task_any(state: dict[str, Any], task_id: str) -> dict[str, Any] | None:
for t in state["tasks"]:
if t["id"] == task_id and t["status"] != "destroyed":
return t
return None
def cmd_task_start(
root: Path,
*,
task_id: str,
prompt: str,
claude_bin: str = DEFAULT_CLAUDE_BIN,
dry_run: bool = False,
) -> dict[str, Any]:
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
raise SystemExit(f"No active task: {task_id}")
# If task was previously running but the process is gone, reconcile first.
_reap_if_exited(task)
if task["status"] == "running":
pid = (task.get("process") or {}).get("pid")
raise SystemExit(f"Task {task_id} is already running (pid={pid})")
worktree_path = Path(task["worktree_path"])
if not worktree_path.is_dir():
raise SystemExit(f"Worktree missing for {task_id}: {worktree_path}")
if dry_run:
is_resume = bool(task.get("session_id"))
session_id = task.get("session_id") or str(uuid.uuid4())
session_name = task.get("session_name") or f"{task_id}: {task['description']}"
cmd = build_claude_command(
prompt=prompt,
session_id=session_id,
session_name=session_name,
resume=is_resume,
claude_bin=claude_bin,
)
return {
"dry_run": True,
"task_id": task_id,
"session_id": session_id,
"session_name": session_name,
"log_path": str(_log_path_for(root, task_id)),
"cwd": str(worktree_path),
"command": cmd,
}
_spawn_claude_for_task(root, task, prompt, claude_bin=claude_bin)
save_state(root, state)
return {"task": task}
def cmd_task_send(
root: Path,
*,
task_id: str,
prompt: str,
claude_bin: str = DEFAULT_CLAUDE_BIN,
) -> dict[str, Any]:
"""Enqueue a prompt for a task and drain the queue if the task is idle.
Semantics:
- If task is currently `running`: prompt is appended to `queued_prompts`.
- If task is `created` or `exited`: the new prompt goes to the back of the
queue and the head of the queue is immediately dispatched via spawn
(using `--resume` when a session_id already exists).
"""
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
raise SystemExit(f"No active task: {task_id}")
_reap_if_exited(task)
task.setdefault("queued_prompts", []).append(prompt)
dispatched = _drain_queue_if_idle(root, task, claude_bin=claude_bin)
task["updated_at"] = _now_iso()
save_state(root, state)
return {
"task_id": task_id,
"dispatched": dispatched,
"queued_count": len(task.get("queued_prompts") or []),
"task": task,
}
def cmd_task_broadcast(
root: Path,
*,
prompt: str,
claude_bin: str = DEFAULT_CLAUDE_BIN,
) -> dict[str, Any]:
"""Append a prompt to every broadcast-eligible task. Drain idle ones."""
state = load_state(root)
results: list[dict[str, Any]] = []
for task in state["tasks"]:
if task["status"] not in BROADCAST_TARGET_STATUSES:
continue
_reap_if_exited(task)
task.setdefault("queued_prompts", []).append(prompt)
dispatched = _drain_queue_if_idle(root, task, claude_bin=claude_bin)
task["updated_at"] = _now_iso()
results.append({
"task_id": task["id"],
"dispatched": dispatched,
"queued_count": len(task.get("queued_prompts") or []),
})
save_state(root, state)
return {"total": len(results), "results": results}
def cmd_task_status(
root: Path,
*,
task_id: str,
tail_lines: int = 20,
claude_bin: str = DEFAULT_CLAUDE_BIN,
) -> dict[str, Any]:
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
raise SystemExit(f"No task: {task_id}")
reaped = _reap_if_exited(task)
drained = _drain_queue_if_idle(root, task, claude_bin=claude_bin)
if reaped or drained:
task["updated_at"] = _now_iso()
save_state(root, state)
log_path = task.get("log_path")
tail: list[str] = []
if log_path:
tail = _tail_file(Path(log_path), n=tail_lines)
return {"task": task, "log_tail": tail, "reaped": reaped, "drained": drained}
def _gh_pr_info(
repo_path: Path,
branch: str,
*,
gh_bin: str = "gh",
) -> dict[str, Any] | None:
"""Look up the most recent PR for `branch`. Returns raw dict or None."""
try:
result = subprocess.run(
[
gh_bin, "pr", "list",
"--head", branch,
"--state", "all",
"--json", "number,state,mergedAt,url",
"--limit", "1",
],
cwd=str(repo_path),
capture_output=True,
text=True,
)
except FileNotFoundError:
return None
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout or "[]")
except json.JSONDecodeError:
return None
if not isinstance(data, list) or not data:
return None
return data[0]
def cmd_task_pr_check(
root: Path,
*,
task_id: str,
gh_bin: str = "gh",
) -> dict[str, Any]:
"""Query gh for PR state on this task's branch. Persist into task.pr."""
cfg = load_config(root)
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
raise SystemExit(f"No task: {task_id}")
repo = _find_repo(cfg, task["repo"])
info = _gh_pr_info(Path(repo["path"]), task["branch"], gh_bin=gh_bin)
if info is not None:
task["pr"] = {
"number": info.get("number"),
"state": info.get("state"),
"merged_at": info.get("mergedAt"),
"url": info.get("url"),
}
task["updated_at"] = _now_iso()
save_state(root, state)
return {
"task_id": task_id,
"branch": task["branch"],
"found": info is not None,
"pr": task.get("pr"),
}
def cmd_task_attach(root: Path, *, task_id: str) -> dict[str, Any]:
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
raise SystemExit(f"No task: {task_id}")
session_id = task.get("session_id")
if not session_id:
raise SystemExit(f"Task {task_id} has no session yet")
worktree = task["worktree_path"]
resume_cmd = f'cd "{worktree}" && claude --resume {session_id}'
return {
"task_id": task_id,
"session_id": session_id,
"session_name": task.get("session_name"),
"worktree_path": worktree,
"resume_command": resume_cmd,
}
def _extract_tool_use(line: str) -> str | None:
"""Best-effort parser for `--include-hook-events` JSON lines.
Returns the tool name if the line looks like a tool-use hook event.
Returns None for log noise, non-JSON, or non-tool events.
Tolerant of schema variations — we don't know the exact shape at
write-time, so we check several common field names.
"""
line = line.strip()
if not line or not line.startswith("{"):
return None
try:
data = json.loads(line)
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(data, dict):
return None
event = str(
data.get("hook_event_name")
or data.get("event")
or data.get("type")
or ""
)
tool = data.get("tool_name") or data.get("tool")
if not tool:
tool_input = data.get("tool_input")
if isinstance(tool_input, dict):
tool = tool_input.get("name")
if not tool:
payload = data.get("payload") or data.get("data")
if isinstance(payload, dict):
tool = payload.get("tool_name") or payload.get("tool")
if not tool:
return None
event_lower = event.lower()
if "tooluse" in event_lower or "tool_use" in event_lower or event == "":
return str(tool)
return None
def _emit(line: str) -> None:
"""Line-buffered stdout write, flushed immediately so Monitor sees it."""
sys.stdout.write(line + "\n")
sys.stdout.flush()
def cmd_task_wait(
root: Path,
*,
task_id: str,
granularity: str = "exit",
timeout: float | None = None,
poll_interval: float = 0.5,
) -> int:
"""Block until the task's headless process exits, emitting events to stdout.
Designed to be launched via `Bash(run_in_background=true)` and attached to
the `Monitor` tool — each emitted line becomes a notification.
Granularities:
- `exit`: one WATCHING line up front, one EXITED line when the process dies.
Quietest; one wake-up per task.
- `tool`: additionally emit a TOOL line for each parsed tool-use event in
the log stream. Higher wake-up volume; narrates progress.
- `all`: emit every line from the log as a LOG event. Very noisy; debug only.
"""
if granularity not in ("exit", "tool", "all"):
_emit(f"ERROR task={task_id} reason=invalid_granularity={granularity}")
return 2
state = load_state(root)
task = _find_task_any(state, task_id)
if task is None:
_emit(f"ERROR task={task_id} reason=not_found")
return 3
proc = task.get("process") or {}
pid = proc.get("pid")
if not isinstance(pid, int):
_emit(f"ERROR task={task_id} reason=no_process")
return 4
log_path_str = task.get("log_path")
log_fh = None
if granularity in ("tool", "all") and log_path_str:
log_path = Path(log_path_str)
if log_path.exists():
try:
log_fh = log_path.open("r", encoding="utf-8", errors="replace")
# Seek to start so we replay everything the child has written
# so far, then follow forward.
log_fh.seek(0, 0)
except OSError:
log_fh = None
_emit(f"WATCHING task={task_id} pid={pid} granularity={granularity}")
deadline = (time.monotonic() + timeout) if timeout else None
try:
while True:
if log_fh is not None:
while True:
line = log_fh.readline()
if not line:
break
stripped = line.rstrip("\n")
if granularity == "all":
_emit(f"LOG task={task_id}: {stripped}")
elif granularity == "tool":
tool_name = _extract_tool_use(stripped)
if tool_name:
_emit(f"TOOL task={task_id} tool={tool_name}")
if not _process_alive(pid):
# Drain any last-written bytes now that the process is gone.
if log_fh is not None:
while True:
line = log_fh.readline()
if not line:
break
stripped = line.rstrip("\n")
if granularity == "all":
_emit(f"LOG task={task_id}: {stripped}")
elif granularity == "tool":
tool_name = _extract_tool_use(stripped)
if tool_name:
_emit(f"TOOL task={task_id} tool={tool_name}")
_emit(f"EXITED task={task_id}")
return 0
if deadline is not None and time.monotonic() > deadline:
_emit(f"TIMEOUT task={task_id}")
return 5
time.sleep(poll_interval)
finally:
if log_fh is not None:
log_fh.close()
def cmd_task_cleanup_merged(
root: Path,
*,
gh_bin: str = "gh",
dry_run: bool = False,
) -> dict[str, Any]:
"""Refresh PR state for every active PRD task and destroy those whose PR is MERGED.
Generic tasks are never touched (they have no PR concept).
Tasks where gh returns no data or a non-merged state are left alone.
"""
state = load_state(root)
# Snapshot active PRD task ids so destroy operations don't invalidate iteration.
candidate_ids = [
t["id"]
for t in state["tasks"]
if t["type"] == "prd" and t["status"] in ACTIVE_STATUSES
]
results: list[dict[str, Any]] = []
for tid in candidate_ids:
try:
pr_out = cmd_task_pr_check(root, task_id=tid, gh_bin=gh_bin)
except SystemExit as e:
results.append({"task_id": tid, "action": "error", "error": str(e)})
continue
pr = pr_out.get("pr")
pr_state = str((pr or {}).get("state", "")).upper()
if pr_state != "MERGED":
results.append({
"task_id": tid,
"action": "skipped",
"reason": f"pr_state={pr_state or 'none'}",
"pr": pr,
})
continue
if dry_run:
results.append({"task_id": tid, "action": "would_destroy", "pr": pr})
continue
try:
destroy_out = cmd_task_destroy(
root, task_id=tid, delete_branch=True, force=False,
)
results.append({
"task_id": tid,
"action": "destroyed",
"pr": pr,
"detail": destroy_out,
})
except SystemExit as e:
results.append({
"task_id": tid,
"action": "error",
"error": str(e),
"pr": pr,
})
destroyed = sum(1 for r in results if r["action"] == "destroyed")
return {"results": results, "scanned": len(candidate_ids), "destroyed": destroyed}
def cmd_task_destroy(
root: Path,
*,
task_id: str,
delete_branch: bool = False,
force: bool = False,
) -> dict[str, Any]:
cfg = load_config(root)
state = load_state(root)
task = _find_active_task(state, task_id)
if task is None:
raise SystemExit(f"No active task: {task_id}")
repo = _find_repo(cfg, task["repo"])
repo_path = Path(repo["path"])
worktree_path = Path(task["worktree_path"])
branch = task["branch"]
process_killed = False
proc = task.get("process") or {}
pid = proc.get("pid") if isinstance(proc, dict) else None
if isinstance(pid, int) and _process_alive(pid):
process_killed = _kill_process_tree(pid)
worktree_removed = False
if worktree_path.exists():
_git_worktree_remove(repo_path, worktree_path, force=force)
worktree_removed = True
branch_deleted = False
branch_delete_error: str | None = None
if delete_branch:
try:
_git_delete_branch(repo_path, branch, force=True)
branch_deleted = True
except SystemExit as e:
branch_delete_error = str(e)
task["status"] = "destroyed"
task["updated_at"] = _now_iso()
save_state(root, state)
return {
"destroyed": task_id,
"worktree_removed": worktree_removed,
"branch_deleted": branch_deleted,
"branch_delete_error": branch_delete_error,
"process_killed": process_killed,
}
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="paw", description="parallel-agents helper")
p.add_argument(
"--root",
type=Path,
default=Path.cwd() / ".claude" / "parallel-agents",
help="Plugin data root (holds config.json and state.json)",
)
sub = p.add_subparsers(dest="command", required=True)
sp_init = sub.add_parser("init", help="Create config/state if missing")
sp_init.add_argument("--discover-from", type=Path, default=None)
sp_discover = sub.add_parser("discover", help="List sibling git repos of a path")
sp_discover.add_argument("--from", dest="from_", type=Path, default=Path.cwd())
sp_repo = sub.add_parser("repo", help="Repository management")
sp_repo_sub = sp_repo.add_subparsers(dest="repo_command", required=True)
sp_repo_add = sp_repo_sub.add_parser("add", help="Register a repository")
sp_repo_add.add_argument("--name", required=True)
sp_repo_add.add_argument("--path", required=True)
sp_repo_add.add_argument("--default-branch", default="main")
sp_repo_add.add_argument("--branch-prefix-prd", default="prd/")
sp_repo_add.add_argument("--branch-prefix-generic", default="task/")
sp_repo_sub.add_parser("list", help="List registered repositories")
sp_task = sub.add_parser("task", help="Task/worktree management")
sp_task_sub = sp_task.add_subparsers(dest="task_command", required=True)
sp_task_create = sp_task_sub.add_parser("create", help="Create a worktree + task entry")
sp_task_create.add_argument("--type", dest="task_type", choices=["prd", "generic"], required=True)
sp_task_create.add_argument("--repo", dest="repo_name", required=True)
sp_task_create.add_argument("--description", required=True)
sp_task_create.add_argument("--task-id", default=None, help="Required for PRD; auto-allocated for generic")
sp_task_create.add_argument("--slug", default=None, help="Override the derived branch slug")
sp_task_create.add_argument(
"--blocked-by",
default="",
help="Comma-separated task ids that block this one (informational, no enforcement)",
)
sp_task_list = sp_task_sub.add_parser("list", help="List tasks")
sp_task_list.add_argument("--include-history", action="store_true")
sp_task_destroy = sp_task_sub.add_parser("destroy", help="Remove worktree, mark task destroyed")
sp_task_destroy.add_argument("--task-id", required=True)
sp_task_destroy.add_argument("--delete-branch", action="store_true")
sp_task_destroy.add_argument("--force", action="store_true", help="Pass --force to git worktree remove")
sp_task_start = sp_task_sub.add_parser("start", help="Launch a headless claude session in the worktree")
sp_task_start.add_argument("--task-id", required=True)
sp_task_start.add_argument("--prompt", required=True)
sp_task_start.add_argument("--claude-bin", default=DEFAULT_CLAUDE_BIN)
sp_task_start.add_argument("--dry-run", action="store_true")
sp_task_status = sp_task_sub.add_parser("status", help="Inspect one task (reconciles running→exited)")
sp_task_status.add_argument("--task-id", required=True)
sp_task_status.add_argument("--tail", type=int, default=20, dest="tail_lines")
sp_task_attach = sp_task_sub.add_parser("attach", help="Print the resume command for a task")
sp_task_attach.add_argument("--task-id", required=True)
sp_task_send = sp_task_sub.add_parser("send", help="Queue a prompt for a task (drains if idle)")
sp_task_send.add_argument("--task-id", required=True)
sp_task_send.add_argument("--prompt", required=True)
sp_task_send.add_argument("--claude-bin", default=DEFAULT_CLAUDE_BIN)
sp_task_broadcast = sp_task_sub.add_parser("broadcast", help="Append a prompt to every active task")
sp_task_broadcast.add_argument("--prompt", required=True)
sp_task_broadcast.add_argument("--claude-bin", default=DEFAULT_CLAUDE_BIN)
sp_task_pr = sp_task_sub.add_parser("pr-check", help="Look up PR state via gh")
sp_task_pr.add_argument("--task-id", required=True)
sp_task_pr.add_argument("--gh-bin", default="gh")
sp_task_cleanup = sp_task_sub.add_parser(
"cleanup",
help="Destroy all active PRD tasks whose PR is MERGED (refreshes PR state first).",
)
sp_task_cleanup.add_argument("--gh-bin", default="gh")
sp_task_cleanup.add_argument("--dry-run", action="store_true")
sp_task_wait = sp_task_sub.add_parser(
"wait",
help="Block until the task exits; streams events for live monitoring.",
)
sp_task_wait.add_argument("--task-id", required=True)
sp_task_wait.add_argument(
"--granularity",
choices=["exit", "tool", "all"],
default="exit",
help="exit = one EXITED line only; tool = also one TOOL line per tool use; all = every log line",
)
sp_task_wait.add_argument("--timeout", type=float, default=None)
sp_task_wait.add_argument("--poll-interval", type=float, default=0.5)
return p
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
root: Path = args.root
if args.command == "init":
out = cmd_init(root, discover_from=args.discover_from)
elif args.command == "discover":
out = cmd_discover(args.from_)
elif args.command == "repo" and args.repo_command == "add":
out = cmd_repo_add(
root,
name=args.name,
path=args.path,
default_branch=args.default_branch,
branch_prefix_prd=args.branch_prefix_prd,
branch_prefix_generic=args.branch_prefix_generic,
)
elif args.command == "repo" and args.repo_command == "list":
out = cmd_repo_list(root)
elif args.command == "task" and args.task_command == "create":
blocked_by_list = [s.strip() for s in (args.blocked_by or "").split(",") if s.strip()]
out = cmd_task_create(
root,
task_type=args.task_type,
repo_name=args.repo_name,
description=args.description,
task_id=args.task_id,
slug=args.slug,
blocked_by=blocked_by_list,
)
elif args.command == "task" and args.task_command == "list":
out = cmd_task_list(root, include_history=args.include_history)
elif args.command == "task" and args.task_command == "destroy":
out = cmd_task_destroy(
root,
task_id=args.task_id,
delete_branch=args.delete_branch,
force=args.force,
)
elif args.command == "task" and args.task_command == "start":
out = cmd_task_start(
root,
task_id=args.task_id,
prompt=args.prompt,
claude_bin=args.claude_bin,
dry_run=args.dry_run,
)
elif args.command == "task" and args.task_command == "status":
out = cmd_task_status(
root,
task_id=args.task_id,
tail_lines=args.tail_lines,
)
elif args.command == "task" and args.task_command == "attach":
out = cmd_task_attach(root, task_id=args.task_id)
elif args.command == "task" and args.task_command == "send":
out = cmd_task_send(
root,
task_id=args.task_id,
prompt=args.prompt,
claude_bin=args.claude_bin,
)
elif args.command == "task" and args.task_command == "broadcast":
out = cmd_task_broadcast(
root,
prompt=args.prompt,
claude_bin=args.claude_bin,
)
elif args.command == "task" and args.task_command == "pr-check":
out = cmd_task_pr_check(
root,
task_id=args.task_id,
gh_bin=args.gh_bin,
)
elif args.command == "task" and args.task_command == "cleanup":
out = cmd_task_cleanup_merged(
root,
gh_bin=args.gh_bin,
dry_run=args.dry_run,
)
elif args.command == "task" and args.task_command == "wait":
# Streams plain-text event lines to stdout instead of a JSON blob,
# so the Monitor tool can read line-by-line notifications.
return cmd_task_wait(
root,
task_id=args.task_id,
granularity=args.granularity,
timeout=args.timeout,
poll_interval=args.poll_interval,
)
else:
print(f"Unknown command: {args.command}", file=sys.stderr)
return 2
json.dump(out, sys.stdout, indent=2)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
name parallel-agents
description Orchestrate parallel headless Claude Code sessions in isolated git worktrees. Each task (a PRD implementation OR a free-form generic prompt) runs in its own worktree with its own session. Activate when the user wants to run one or more tasks in parallel ("implement PRD-010 and PRD-012 in parallel", "start PRD-010 in a worktree", "run this prompt in the background", "investigate the flaky calendar test in a worktree", "try this refactor in isolation"), check on parallel tasks they already started ("how's PRD-010 going", "any updates", "what's running"), send a follow-up prompt to a running worktree ("tell PRD-010 to also update the CHANGELOG", "broadcast — run the linter in every worktree"), jump into a running session ("take me into PRD-010", "open PRD-010 in VSCode"), or clean up worktrees after PRs have merged ("close all completed tasks", "destroy PRD-010"). Also activates on the first mention of parallel-agents in a project that hasn't been configured yet — runs the init sub-flow inline to register repositories before proceeding.
argument-hint start <PRD-id[s]|prompt> | status [id] | send <id|all> <prompt> | attach <id> | destroy <id> | cleanup

parallel-agents skill

You are orchestrating parallel headless Claude Code sessions, each running inside an isolated git worktree on a dedicated branch. This skill is the user-facing layer; the mechanical parts go through a helper script paw.py that owns config, state, worktrees, process lifecycle, and PR detection.

Task types — both are first-class

The plugin tracks two kinds of tasks, and both behave the same way structurally (worktree + branch + headless session + TaskCreate todo + queue):

  • PRD tasks (type: prd, id PRD-<N>) — implement a concrete, pre-written Product Requirements Document. The launch prompt is always /implement-prd PRD-<N>. The source of truth for PRD content is $PWD/prd/PRD-<N>*.md in the planning project.
  • Generic tasks (type: generic, id GEN-<NNN> auto-allocated) — arbitrary free-form prompts the user wants to run in isolation. Examples: "investigate why the calendar test is flaky", "try refactoring the filter bar to use reducers", "spike out an alternative approach to X". Ids are assigned by the helper, not by the user.

Both types support the same actions: start, status, send follow-up, broadcast-receive, attach, destroy. The only differences are:

  • PRD tasks get PR-state tracking via gh; generic tasks do not.
  • task cleanup (bulk merged-cleanup) only touches PRD tasks — generic tasks are never auto-destroyed because there's no "merged" signal for them.
  • The launch prompt is different (see above).

Never refuse a generic task because it "isn't a PRD". Never try to cram a generic request into a PRD id. If the user says "spin up a worktree and figure out X", that is a perfectly valid generic task.

Absolute rules

  1. Every shell command that runs under this skill is spelled out explicitly in this file. Do not invent flags. Do not call claude or paw.py with arguments you did not see here.
  2. Each spawned claude process runs with its cwd set to the worktree directory. paw.py enforces this when it calls Popen(cwd=<worktree_path>). Never suggest shell commands that cd elsewhere before invoking claude for an existing task.
  3. One live claude process per worktree. If a target task is currently running, follow-up prompts are queued, not launched in parallel inside the same worktree. This is how we preserve edit-consistency.
  4. You mirror parallel-agents state into Claude Code's native task list via the TaskCreate / TaskUpdate / TaskList tools. The user sees the task list — that is your primary UI. Keep it flat (one todo per parallel session); use the content field to note dependencies with "blocked by ".
  5. Do not auto-destroy worktrees. When you detect a merged PR, mark the corresponding todo as completed. The user will later ask you to "close all completed tasks" (or similar); that is when you run the cleanup command.
  6. Run a status-refresh pass whenever you are about to spawn or destroy tasks. Do not run one on every turn — only around mutations.
  7. Honour the monitoring config. When monitoring.mode == "live", every successful task start must be followed by launching a live watcher (see below). When monitoring.mode == "wait", do not launch a watcher — rely on status refreshes.

Monitoring configuration

The config file has a monitoring block that controls whether the skill watches spawned tasks live or polls on demand. Read it with:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" repo list

(the full config comes back in other calls too — use whichever is convenient; the fields you care about are config.monitoring.mode and config.monitoring.event_granularity).

monitoring.mode

  • live (current default) — after every task start, the skill runs paw.py task wait in a Bash background and attaches the Monitor tool. The assistant gets woken up when events arrive.
  • wait — no live watcher is launched. The skill only learns about completion when the user asks for status or when a subsequent mutation triggers a refresh pass.

monitoring.event_granularity (only used when mode == "live")

  • exit (default) — task wait emits one WATCHING line on attach and one EXITED line when the process dies. Quietest; one wake-up per task. Best for "notify me when it's done" workflows.
  • tool — additionally emits TOOL task=<id> tool=<name> for each parsed tool-use event in the log stream. Narrates progress at the cost of more wake-ups (one per Edit/Write/Bash/etc. inside the child).
  • all — streams every --include-hook-events log line as a LOG task=<id>: … event. Very noisy; use only when debugging what a headless session is actually doing.

The plugin auto-migrates older config files on first load: if monitoring is missing, defaults are filled in in-memory on read and persisted on the next init.

Preconditions — auto-init on first use

Config + state live at $PWD/.claude/parallel-agents/. Every action under this skill must start with this check, and if the check fails you run the init sub-flow inline — do not tell the user to run a separate command. This is a skill responsibility, not a slash command.

Check:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" repo list

Three possible states:

  1. Helper fails / files missing — config hasn't been created yet. Run the full init sub-flow below.
  2. repositories array is empty — files exist but no repos registered. Run the init sub-flow from Step 3 onward.
  3. repositories array has entries — ready to use. Proceed with the requested action.

Init sub-flow (runs inline, no slash command)

Use this whenever you detect state 1 or 2 above. Tell the user once: "I need to set up parallel-agents before I can do that — this only happens once." Then:

Step 1 — Create config + state files and scan for sibling repos:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  init --discover-from "$PWD"

Parse the JSON. Note config_created, state_created, and the siblings array (sibling directories of the current project that contain a git repo).

Step 2 — Offer sibling repos for registration via AskUserQuestion (multiSelect). Options: one entry per sibling path, plus "Custom path…" (user types an absolute path) and "None — I'll add later".

If the user picks "None", tell them they can run the setup again later by asking for any parallel-agents action. Stop the init sub-flow without registering anything (but the original request they made still needs a registered repo, so you'll re-prompt next time).

Step 3 — Register each selected repo. For each chosen path:

  1. Propose a short name (default: basename of the path). Only ask via AskUserQuestion if the name collides with an existing registered repo.
  2. Detect the default branch:
    git -C "<path>" symbolic-ref --short refs/remotes/origin/HEAD 2>/dev/null | sed 's|origin/||'
    If empty, ask the user via AskUserQuestion (singleSelect with the output of git -C <path> branch --format '%(refname:short)').
  3. Register:
    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      repo add \
      --name "<name>" \
      --path "<path>" \
      --default-branch "<branch>"
    Registration also appends .claude/worktrees/ to <repo>/.git/info/exclude so future worktrees don't show as untracked in the parent repo.

Step 4 — Confirm final state:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" repo list

Show the user the registered repos.

Step 5 — Resume the user's original request. Go back to whatever action triggered the init sub-flow (start / status / send / destroy / cleanup) and execute it. The user should not have to repeat themselves.

Helper command reference

This is the exhaustive list of paw.py calls you are allowed to make. All of them emit JSON on stdout.

Data root argument (always the same): --root "$PWD/.claude/parallel-agents"

Repo discovery / listing:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" repo list

Task creation — new worktree, new branch, state entry (no session yet):

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task create \
  --type prd \
  --repo "<repo-name>" \
  --task-id "PRD-<N>" \
  --description "<short label>"

Generic variant (auto-allocates GEN-NNN):

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task create \
  --type generic \
  --repo "<repo-name>" \
  --description "<short label>"

With dependency annotation:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task create \
  --type prd \
  --repo "<repo-name>" \
  --task-id "PRD-<N>" \
  --description "<short label>" \
  --blocked-by "PRD-<M>,PRD-<K>"

--blocked-by is informational only — it populates task.blocked_by so you can render "blocked by PRD-" in the todo content. No enforcement; you are still free to start the task.

Start (first session launch) — spawn headless claude in the worktree:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task start \
  --task-id "<id>" \
  --prompt "<prompt>"

The prompt for a PRD task is literally /implement-prd PRD-<N>. For a generic task it's the raw user text. paw.py assembles the full claude argv:

  • claude -p "<prompt>" --session-id <new-uuid> --name "<id>: <description>" --dangerously-skip-permissions --include-hook-events
  • cwd set to the worktree path.
  • stdout/stderr redirected to <root>/logs/<task-id>.jsonl.

Send follow-up prompt (queue-aware):

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task send \
  --task-id "<id>" \
  --prompt "<prompt>"
  • If target task is running, the prompt is queued.
  • If target task is exited or created, paw.py spawns immediately via claude --resume <session-id> -p "<prompt>" (keeps the same session history).
  • Response JSON has dispatched: bool and queued_count: int.

Broadcast to all active tasks:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task broadcast \
  --prompt "<prompt>"

Destroyed tasks are ignored. Running tasks receive the prompt in their queue; exited ones dispatch immediately.

Status of one task — reconciles running→exited transitions and drains the queue if the task is now idle:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task status \
  --task-id "<id>" \
  --tail 20

List all active tasks (used by status refresh pass):

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task list

Add --include-history to see destroyed ones too.

PR lookup for a task (PRD tasks only — uses gh pr list --head <branch>):

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task pr-check \
  --task-id "<id>"

Attach info — returns the shell command for interactive resume:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task attach \
  --task-id "<id>"

Destroy one task — removes worktree, kills any live process, marks destroyed:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task destroy \
  --task-id "<id>" \
  [--delete-branch] \
  [--force]
  • --delete-branch force-deletes the local branch; only use it when the PR is merged or for generic tasks the user wants fully purged.
  • --force is needed if the worktree has uncommitted changes.

Bulk cleanup of merged PRD tasks — refreshes PR state for every active PRD task and destroys those whose PR is MERGED, with --delete-branch:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task cleanup

Add --dry-run to see what would be destroyed without touching anything.

Live watcher — blocks until the task exits, emits event lines to stdout. Designed to be launched via Bash(run_in_background=true) and consumed via Monitor:

python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
  task wait \
  --task-id "<id>" \
  --granularity <exit|tool|all>

Output line formats:

  • WATCHING task=<id> pid=<N> granularity=<mode> — always emitted once on attach
  • LOG task=<id>: <raw log line> — only when granularity=all
  • TOOL task=<id> tool=<name> — only when granularity=tool and a tool-use event was parsed
  • EXITED task=<id> — always emitted when the process dies (exit line)
  • TIMEOUT task=<id> — if a --timeout was set and it elapsed before the process died
  • ERROR task=<id> reason=<reason> — misconfiguration (no session, invalid granularity, etc.)

Exit codes: 0 normal exit, 2 invalid granularity, 3 task not found, 4 no process to watch, 5 timeout hit.


Action: start one or more parallel tasks

Triggered when the user asks to implement/start/run a PRD, a batch of PRDs, or a generic prompt in a worktree.

Step 1 — Status refresh pass (mandatory before spawning)

Run task list to see what's already in flight, so you can:

  • detect tasks whose sessions have exited (the status call will reap them)
  • show the user an up-to-date picture before adding more
  • avoid duplicating a task id that already exists
python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" task list

For each task in the response with status == "running", also call task status --task-id <id> to trigger per-task reconciliation + queue drain. This also refreshes merged-PR detection if the task has a PR.

Sync the TaskCreate todo list with what you just learned:

  • For tasks you didn't have a todo for yet, create one via TaskCreate with status: in_progress and content describing it (id, repo, branch, worktree path, session id, initial prompt).
  • For tasks whose state changed since the last todo snapshot (exited, PR merged), call TaskUpdate accordingly:
    • session running → in_progress
    • session exited, no PR yet → in_progress with a content note "session ended — awaiting PR"
    • PR merged → completed (but do not destroy)
    • PR open / closed (not merged) → in_progress with note about PR state

Step 2 — Classify intent

Apply to $ARGUMENTS or to the user's natural-language message:

  1. Pure PRD implementation — matches ^\s*(PRD-\d+[,\s]*)+$ OR contains PRD ids with explicit implementation verbs (implement, build, start, ship, run, do). Go to Step 3a with the extracted list of PRD ids.

  2. PRD destructive/status intentclose PRD-010, destroy PRD-010, finish PRD-010, mark PRD-010 done. Jump to the Destroy one task action instead.

  3. PRD with ambiguous intent — PRD ids present but the verb is unclear (look into PRD-010, check PRD-010, what's in PRD-010). Use AskUserQuestion (singleSelect, per PRD): "Implement it", "Research it (generic task)", "Cancel".

  4. Generic prompt — no PRD ids, or PRDs are only incidental references. Go to Step 3b with the user's prompt as the task description.

Step 3a — Per-PRD worktree + session

For each PRD id:

  1. Pick the target repo. If exactly one is registered, use it. Otherwise use AskUserQuestion (singleSelect, options = repo names) to ask which repo this PRD goes into. Ask per-PRD since different ones might go different places.

  2. Derive a short description (≤60 chars). Try to read the PRD title from $PWD/prd/PRD-<N>*.md (first # heading or frontmatter title). Fall back to the literal id.

  3. Detect dependencies. If the PRD file has a Depends on: line in its frontmatter and any of those dependency ids are currently active tasks, pass them via --blocked-by. Also add a note in the TaskCreate content: "blocked by: ".

  4. Create the worktree:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task create \
      --type prd \
      --repo "<repo>" \
      --task-id "PRD-<N>" \
      --description "<short>" \
      [--blocked-by "PRD-<M>,…"]
  5. Launch the session with /implement-prd:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task start \
      --task-id "PRD-<N>" \
      --prompt "/implement-prd PRD-<N>"
  6. Attach live watcher if monitoring.mode == "live". See the "Live watcher launch" procedure below. Do this once per task, right after task start succeeds and before moving to the next task.

  7. Create the todo via TaskCreate:

    • content: PRD-<N>: <short description> (+ "blocked by …" if relevant)
    • status: in_progress
    • activeForm: Running PRD-<N> or similar
  8. If any step fails, surface the stderr message and move on to the next PRD — don't abort the whole batch. Reflect failed ones in todos too (in_progress with a note about the failure or skip creating the todo entirely).

Step 3b — One generic task

  1. Ask for a short label (≤60 chars) via AskUserQuestion if not obvious from the prompt.
  2. Ask for the repo if >1 registered.
  3. Create + start using the same two commands, with --type generic and the raw user text as --prompt:
    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task create \
      --type generic \
      --repo "<repo>" \
      --description "<label>"
    Note the allocated GEN-NNN id from the JSON response.
    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task start \
      --task-id "GEN-<N>" \
      --prompt "<raw user text>"
  4. Attach live watcher if monitoring.mode == "live" (see procedure below).
  5. Create a TaskCreate todo entry for it.

Live watcher launch (Procedure)

Run this any time a new headless process was just spawned and monitoring.mode == "live". The trigger points are:

  • After every successful task start (always a fresh spawn).
  • After task send or task broadcast on a task where the response shows dispatched: true (means the helper spawned a new claude --resume process). Do not launch a watcher when dispatched: false (that means the prompt was queued behind a currently-running process — the existing watcher, if any, already covers that pid; and if there's no existing watcher, the next dispatched: true will be your hook).

Skip this procedure entirely when monitoring.mode == "wait".

Steps

  1. Ensure Monitor is loaded. Monitor is a deferred Claude Code tool — it may not be in the current tool set. If you don't already see it as available, load it once via ToolSearch:

    ToolSearch { query: "select:Monitor", max_results: 1 }
    

    Idempotent: loading an already-loaded tool is a no-op.

  2. Launch the watcher directly via Monitor. Monitor takes the shell command as its own parameter — do not wrap it in a Bash(run_in_background=true) call first. One Monitor tool call per task:

    Monitor {
      command: 'python3 /absolute/path/to/.claude/plugins/parallel-agents/scripts/paw.py --root /absolute/path/to/.claude/parallel-agents task wait --task-id <task-id> --granularity <granularity> --poll-interval 1.0',
      description: "<task-id> live watcher (<granularity>)",
      timeout_ms: 900000,
      persistent: false
    }
    

    Notes on the arguments:

    • Use absolute paths for both paw.py and --root. ${CLAUDE_PLUGIN_ROOT} and $PWD get expanded by shells but not always reliably inside Monitor — safest to hardcode the resolved paths. You can get them once via echo "$PWD" at the start of the skill invocation.
    • <granularity> = the value of config.monitoring.event_granularity (exit, tool, or all).
    • --poll-interval 1.0 gives cmd_task_wait a 1s heartbeat — fast enough to catch exits within a second, slow enough to not burn CPU.
    • timeout_ms — 900000 (15 min) is a reasonable ceiling for most PRD-style runs. Raise to 3600000 (1 h, max) for long refactors. Use persistent: true if you truly don't know how long the task will take; the watcher will run for the life of the session.
    • description shows in every event notification; make it specific (include the task id and granularity).
  3. Each stdout line from task wait becomes an in-session notification. The notification body contains the exact line task wait printed. Expected event types:

    Event line When What to do
    WATCHING task=<id> pid=<N> granularity=<mode> Fires once on attach Acknowledge in summary; no action needed
    TOOL task=<id> tool=<name> Only if granularity=tool, per parsed tool-use event in the child's log Optionally narrate ("GEN-001 is editing now")
    LOG task=<id>: <raw line> Only if granularity=all, per raw log line Debug-only; don't surface to user unless asked
    EXITED task=<id> Child process died Run task status --task-id <id>; for PRD tasks run task pr-check; update the TaskCreate todo
    TIMEOUT task=<id> --timeout deadline hit (rare — we don't set one) Treat as a tool-side error; run manual status
    ERROR task=<id> reason=<…> Watcher startup failure Log it; fall back to manual task status checks for this task

    You will also see a stream-ended lifecycle notification once the Monitor command exits. That is Monitor's own bookkeeping, not a task wait event — ignore it.

  4. One watcher per (task, pid) generation, not per task. Every time the helper spawns a fresh process for a task (initial start, or a dispatched: true send/broadcast), that's a new pid, and you should attach a new Monitor — the old watcher has already exited with EXITED when its pid died. Do NOT attach two monitors to the same live pid.

  5. Destroy cleanup is automatic. When the user destroys a task, paw.py task destroy sends SIGTERM to the live pid; the watcher sees the process disappear and emits EXITED within the poll interval (≤1s). The watcher process exits cleanly on its own — you don't need to TaskStop it.

Step 4 — Summary

Print a compact table of what was just created in this invocation:

id type repo branch worktree session

Remind the user they can say "how's it going" for an update, "take me into " to jump in, or "close all completed" to clean up merged work.


Action: check status

Triggered by things like "how's it going", "any updates", "status", "what's running", "check on PRD-010".

  1. Run task list (see the Helper command reference). For each task with status == "running", also run task status --task-id <id> to reap + drain.
  2. For each PRD task whose session is exited and whose pr field hasn't been refreshed recently, run task pr-check --task-id <id>.
  3. Update the TaskCreate todos to reflect what you learned (see Step 1 rules in the "start" action).
  4. Summarize to the user:
    • Tasks that changed state since last time
    • Tasks whose PRs just went MERGED — flag these loudly with "ready to close"
    • Tasks still running
    • Any queued prompts waiting to dispatch

Do not destroy anything in this action, no matter what you find.


Action: send a follow-up prompt

Triggered by "tell PRD-010 to also X", "ask GEN-001 to Y", "broadcast Z to every worktree".

  1. Parse the target and the prompt. Target can be a specific id or all (broadcast).

  2. If the prompt is ambiguous, clarify with AskUserQuestion. If the target is ambiguous and there are multiple tasks, list them via task list and ask which.

  3. For a single target:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task send \
      --task-id "<id>" \
      --prompt "<prompt>"
  4. For broadcast:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task broadcast \
      --prompt "<prompt>"
  5. Inspect the dispatch result per task. For task send, read the top-level dispatched field. For task broadcast, read results[].dispatched for every target.

  6. Launch a fresh live watcher for each task where dispatched: true — only when monitoring.mode == "live". A dispatched: true response means the helper just spawned a brand-new claude --resume process inside the worktree; its pid is different from any previous generation and any prior watcher has already exited. Follow the Live watcher launch procedure once per freshly dispatched task.

    Do not launch a watcher when dispatched: false — that means the prompt was appended to the queue of a process that is already running, and any existing watcher for that pid is still covering it. The queued prompt will fire on next idle, at which point calling task status (or the next task send) will drain the queue and dispatch with a new pid — attach a watcher then, not now.

  7. Report per-task:

    • dispatched: true → "PRD-010: dispatched (resume, pid ), watcher attached"
    • dispatched: false → "GEN-001: queued (position N, behind running pid )"
    • broadcast: show each target in the list with the same format.
  8. Optionally update the corresponding TaskCreate todos with a content note about the queued or dispatched follow-up.

Queued prompts fire the next time the task goes idle and something calls task status, task send, or task broadcast for it — this is the "status refresh around mutations" pattern; there's no background drainer. The live watcher has no effect on queue semantics — it only tells the assistant when the current pid has died. To actually drain the queue, something must call into paw.py again after the watcher fires EXITED.


Action: jump into a session

Triggered by "take me into PRD-010", "open PRD-010 in VSCode", "attach to PRD-010", "how do I resume PRD-010 interactively".

  1. Run task attach for the id:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task attach \
      --task-id "<id>"

    This returns worktree_path, session_id, session_name, and resume_command.

  2. Decide execute vs. print based on the user's verb:

    • "Open [in VSCode]" — the user wants the editor opened now. Execute code "<worktree_path>" via Bash (or cursor "<worktree_path>" if that's what's installed). Do not ask for confirmation first — opening an editor is local and reversible. After running it, report that the worktree was opened and then also print the terminal resume command (see below) so the user can jump the session too if they want.

    • "How do I attach / resume / jump into" — the user wants instructions. Print both commands without executing. In this mode never run either.

  3. Resume in terminal is always printed (not executed) because claude --resume is an interactive TUI that can't run under Claude Code's Bash tool:

    cd "<worktree_path>" && claude --resume <session_id>
    

    (this is the resume_command field from the JSON; use it verbatim)

  4. Open in VSCode command shape:

    code "<worktree_path>"
    

    VSCode treats each git worktree as its own workspace — the parent repo's main window is unaffected.

If task attach fails with "has no session yet", run the start action for this task instead.


Action: destroy one task

Triggered by "close PRD-010", "destroy PRD-010", "remove the PRD-010 worktree".

  1. Run task list to confirm the task exists and is active.
  2. If task type is prd, run task pr-check:
    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task pr-check \
      --task-id "<id>"
    Interpret the pr field:
    • merged → destroy + delete branch recommended
    • open or closed (not merged) → destroy worktree but keep the branch so no work is lost
    • no PR → warn that the branch hasn't been pushed; destroying now abandons any uncommitted work
  3. Confirm with the user via AskUserQuestion (multiSelect): "Also delete local branch" (preselected iff PR is merged), "Force" (off by default, needed for dirty worktree).
  4. Run destroy:
    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task destroy \
      --task-id "<id>" \
      [--delete-branch] \
      [--force]
  5. If the response's branch_delete_error is non-null, surface it verbatim (usually "branch not fully merged" — tells the user they'd be losing unmerged commits).
  6. Update the corresponding TaskCreate todo to completed.

Action: clean up all completed

Triggered by "close all completed tasks", "clean up merged worktrees", "destroy anything that's done".

  1. Run the bulk cleanup, which internally refreshes PR state for every active PRD task and destroys the merged ones:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task cleanup

    Consider running with --dry-run first and showing the user what will happen:

    python3 ${CLAUDE_PLUGIN_ROOT}/scripts/paw.py --root "$PWD/.claude/parallel-agents" \
      task cleanup --dry-run

    Then confirm, then re-run without --dry-run.

  2. Each results[] entry has an action:

    • destroyed — worktree removed, local branch deleted, task marked destroyed.
    • would_destroy — dry-run only.
    • skipped — reason in reason field (usually pr_state=OPEN or pr_state=none).
    • error — show the error and the pr payload if present.
  3. For every destroyed task, set its TaskCreate todo to completed (it should already be, but confirm).

  4. Generic tasks are never auto-cleaned. If the user wants to remove generic tasks, use the single-destroy action on each.


TaskCreate integration rules (summary)

  • One todo per parallel task. Flat, no nesting.
  • Todo content includes: id, repo, branch, worktree path, session id, initial prompt (truncated), and any "blocked by " annotation.
  • Status mapping:
    • created or running in paw state → in_progress
    • exitedin_progress with a content note "session ended"
    • PR state MERGEDcompleted (but worktree still exists until cleanup)
    • destroyed in paw state → completed (worktree is gone)
  • Updates happen in status-refresh passes, not on every user turn. The triggers are: before spawning, before destroying, when the user asks for status, before a cleanup pass.
  • Never fabricate todos for tasks that don't exist in paw.py's state. The source of truth is task list JSON.

Slash command shortcuts still available

These exist for quick inspection or on-demand docs without invoking the skill:

  • /pa-list — raw table dump of active tasks (and --all for history)
  • /pa-status <id> — one-task detail view with log tail
  • /pa-attach <id> — prints the resume command, opens VSCode
  • /pa-help — static reference card: task types, actions, config, modes, file locations
  • /pa-demo — interactive guided demo that runs the full lifecycle against a real repo (creates 3 tasks, opens real PRs, lets the user play, then cleans up)

Use them when the user asks for them explicitly or when a quick raw view is simpler than the conversational flow. Init is not a slash command — it runs inline via the preconditions check at the top of this file whenever it's needed. Everything else (start / send / broadcast / destroy / cleanup / jump / init) goes through this skill.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment