Created
January 24, 2026 20:43
-
-
Save RezaAmbler/2a225e2c1da746dcc8a16b4a901ca6ba to your computer and use it in GitHub Desktop.
Staged Prompt Orchestrator for Claude Code Description: A Python wrapper for the Claude Code CLI that automates sequential prompt execution. Features multi-pass iterative refinement, automatic Git versioning/tagging, dynamic context injection between phases, and deployment verification loops.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Staged Prompt Orchestrator for Claude Code | |
| Processes prompts sequentially, accumulating context between phases. | |
| Supports multiple passes through the full sequence for iterative refinement. | |
| """ | |
| import subprocess | |
| import json | |
| import re | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, List, Tuple | |
| class PhaseOrchestrator: | |
| def __init__(self, prompts_dir: str = "prompts", project_dir: str = ".", deploy_prompt: Optional[str] = None): | |
| self.prompts_dir = Path(prompts_dir) | |
| self.project_dir = Path(project_dir) | |
| self.summaries_dir = self.prompts_dir / "summaries" | |
| self.logs_dir = self.prompts_dir / "logs" | |
| self.state_file = self.prompts_dir / ".orchestrator_state.json" | |
| self.deploy_prompt_file = Path(deploy_prompt) if deploy_prompt else None | |
| # Ensure directories exist | |
| self.summaries_dir.mkdir(exist_ok=True) | |
| self.logs_dir.mkdir(exist_ok=True) | |
| def get_ordered_prompts(self) -> List[Path]: | |
| """Return prompt files sorted by numeric prefix (01_, 02_, etc.)""" | |
| prompts = list(self.prompts_dir.glob("[0-9][0-9]_*.md")) | |
| return sorted(prompts, key=lambda p: int(re.match(r"(\d+)", p.name).group(1))) | |
| def load_state(self) -> dict: | |
| """Load orchestrator state for resume capability""" | |
| if self.state_file.exists(): | |
| return json.loads(self.state_file.read_text()) | |
| return {"completed_phases": [], "last_run": None, "current_pass": 0} | |
| def save_state(self, state: dict): | |
| """Persist state to disk""" | |
| state["last_run"] = datetime.now().isoformat() | |
| self.state_file.write_text(json.dumps(state, indent=2)) | |
| def ensure_baseline_summaries(self): | |
| """Move existing flat summaries to baseline/ if not already organized""" | |
| baseline_dir = self.summaries_dir / "baseline" | |
| # Check if we have flat summary files (not in subdirs) | |
| flat_summaries = [f for f in self.summaries_dir.glob("*_summary.md") if f.is_file()] | |
| if flat_summaries and not baseline_dir.exists(): | |
| print(" β Migrating existing summaries to baseline/...") | |
| baseline_dir.mkdir(exist_ok=True) | |
| for summary in flat_summaries: | |
| shutil.move(str(summary), str(baseline_dir / summary.name)) | |
| print(f" β Moved {len(flat_summaries)} summaries to baseline/") | |
| elif not baseline_dir.exists(): | |
| baseline_dir.mkdir(exist_ok=True) | |
| def get_pass_summary_dir(self, pass_num: int) -> Path: | |
| """Get the summary directory for a specific pass""" | |
| if pass_num == 0: | |
| return self.summaries_dir / "baseline" | |
| return self.summaries_dir / f"pass_{pass_num:02d}" | |
| def ensure_pass_summary_dir(self, pass_num: int) -> Path: | |
| """Ensure the summary directory for a pass exists""" | |
| pass_dir = self.get_pass_summary_dir(pass_num) | |
| pass_dir.mkdir(exist_ok=True) | |
| return pass_dir | |
| def get_git_diff_since_tag(self, tag: str) -> str: | |
| """Get git diff since a specific tag/commit""" | |
| try: | |
| result = subprocess.run( | |
| ["git", "diff", tag, "--stat"], | |
| capture_output=True, text=True, cwd=self.project_dir | |
| ) | |
| return result.stdout if result.returncode == 0 else "" | |
| except Exception: | |
| return "" | |
| def get_summary_for_phase(self, phase_num: int, pass_num: int) -> Optional[str]: | |
| """Get summary content for a specific phase from a specific pass""" | |
| pass_dir = self.get_pass_summary_dir(pass_num) | |
| pattern = f"{phase_num:02d}_*_summary.md" | |
| matches = list(pass_dir.glob(pattern)) | |
| if matches: | |
| return matches[0].read_text() | |
| return None | |
| def build_context(self, phase_num: int, current_pass: int) -> str: | |
| """ | |
| Assemble context from: | |
| 1. Baseline summary for this phase (what was originally built) | |
| 2. All previous pass summaries for this phase (refinements history) | |
| 3. Current pass summaries for earlier phases (what we've done this pass) | |
| """ | |
| parts = [] | |
| # 1. Baseline summary for this phase | |
| baseline_summary = self.get_summary_for_phase(phase_num, 0) | |
| if baseline_summary: | |
| parts.append(f"## Baseline Implementation (Phase {phase_num:02d})\n\n{baseline_summary}") | |
| # 2. Previous pass summaries for this phase | |
| previous_pass_summaries = [] | |
| for prev_pass in range(1, current_pass): | |
| prev_summary = self.get_summary_for_phase(phase_num, prev_pass) | |
| if prev_summary: | |
| previous_pass_summaries.append(f"### Pass {prev_pass} Refinements\n\n{prev_summary}") | |
| if previous_pass_summaries: | |
| parts.append(f"## Previous Refinement Passes (Phase {phase_num:02d})\n\n" + | |
| "\n\n---\n\n".join(previous_pass_summaries)) | |
| # 3. Current pass summaries for earlier phases | |
| if current_pass > 0: | |
| current_pass_earlier = [] | |
| for earlier_phase in range(1, phase_num): | |
| earlier_summary = self.get_summary_for_phase(earlier_phase, current_pass) | |
| if earlier_summary: | |
| current_pass_earlier.append(f"### Phase {earlier_phase:02d}\n\n{earlier_summary}") | |
| if current_pass_earlier: | |
| parts.append(f"## Current Pass ({current_pass}) - Earlier Phases\n\n" + | |
| "\n\n---\n\n".join(current_pass_earlier)) | |
| # 4. Git diff context (if available) | |
| if current_pass > 0: | |
| prev_tag = f"pass-{current_pass - 1:02d}-complete" if current_pass > 1 else "baseline-complete" | |
| diff = self.get_git_diff_since_tag(prev_tag) | |
| if diff: | |
| parts.append(f"## Git Changes Since Last Pass\n\n```\n{diff}\n```") | |
| if not parts: | |
| if current_pass == 0: | |
| return "*No previous context - this is the initial build.*" | |
| else: | |
| return f"*Pass {current_pass} refinement - review existing code and improve.*" | |
| # Add pass context header | |
| if current_pass > 0: | |
| header = f"# Refinement Pass {current_pass}\n\n" | |
| header += "Your goal this pass: Review the existing implementation, identify imperfections, " | |
| header += "edge cases, or areas for improvement, and refine the code quality.\n\n" | |
| return header + "\n\n---\n\n".join(parts) | |
| return "\n\n---\n\n".join(parts) | |
| def inject_context(self, prompt_content: str, context: str) -> str: | |
| """Replace context placeholder with actual context""" | |
| placeholder = "<!-- ORCHESTRATOR_CONTEXT_INJECTION -->" | |
| if placeholder in prompt_content: | |
| return prompt_content.replace(placeholder, context) | |
| else: | |
| # If no placeholder, prepend context | |
| return f"{context}\n\n---\n\n{prompt_content}" | |
| def run_claude_code(self, prompt: str, log_file: Path) -> Tuple[bool, str]: | |
| """Execute Claude Code with prompt, capture stream-json output""" | |
| try: | |
| process = subprocess.Popen( | |
| [ | |
| "claude", | |
| "--dangerously-skip-permissions", | |
| "-p", prompt, | |
| "--output-format", "stream-json", | |
| "--verbose" | |
| ], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| cwd=self.project_dir | |
| ) | |
| output_lines = [] | |
| with open(log_file, "w") as log: | |
| for line in process.stdout: | |
| log.write(line) | |
| log.flush() | |
| output_lines.append(line) | |
| self._display_stream_event(line) | |
| return_code = process.wait() | |
| return return_code == 0, "".join(output_lines) | |
| except Exception as e: | |
| return False, str(e) | |
| def _display_stream_event(self, line: str): | |
| """Parse stream-json event and display relevant info""" | |
| try: | |
| event = json.loads(line.strip()) | |
| event_type = event.get("type", "") | |
| if event_type == "assistant": | |
| message = event.get("message", {}) | |
| if message.get("content"): | |
| for block in message["content"]: | |
| if block.get("type") == "text": | |
| text = block.get("text", "")[:100] | |
| if text: | |
| print(f" π¬ {text}...") | |
| elif event_type == "tool_use": | |
| tool = event.get("tool", {}) | |
| tool_name = tool.get("name", "unknown") | |
| print(f" π§ Using tool: {tool_name}") | |
| elif event_type == "result": | |
| result = event.get("result", "")[:100] | |
| print(f" β Result: {result}...") | |
| elif event_type == "error": | |
| error = event.get("error", {}) | |
| print(f" β Error: {error.get('message', 'unknown')}") | |
| except json.JSONDecodeError: | |
| pass | |
| def run_claude_init(self) -> bool: | |
| """Run claude /init to update CLAUDE.md""" | |
| try: | |
| result = subprocess.run( | |
| ["claude", "-p", "Run /init to update CLAUDE.md with current project state"], | |
| capture_output=True, text=True, cwd=self.project_dir | |
| ) | |
| return result.returncode == 0 | |
| except Exception: | |
| return False | |
| def git_commit_and_tag(self, phase_name: str, phase_num: int, pass_num: int) -> bool: | |
| """Commit all changes and tag the phase completion""" | |
| try: | |
| subprocess.run(["git", "add", "-A"], cwd=self.project_dir, check=True) | |
| if pass_num == 0: | |
| msg = f"Phase {phase_num:02d} complete: {phase_name}" | |
| tag = f"phase-{phase_num:02d}-complete" | |
| else: | |
| msg = f"Pass {pass_num} - Phase {phase_num:02d} refined: {phase_name}" | |
| tag = f"pass-{pass_num:02d}-phase-{phase_num:02d}" | |
| # Use --allow-empty in case no changes were made | |
| subprocess.run(["git", "commit", "--allow-empty", "-m", msg], cwd=self.project_dir, check=True) | |
| # Use -f to force overwrite existing tags | |
| subprocess.run(["git", "tag", "-f", tag], cwd=self.project_dir, check=True) | |
| return True | |
| except subprocess.CalledProcessError: | |
| return False | |
| def git_tag_pass_complete(self, pass_num: int) -> bool: | |
| """Tag the completion of a full pass""" | |
| try: | |
| if pass_num == 0: | |
| tag = "baseline-complete" | |
| else: | |
| tag = f"pass-{pass_num:02d}-complete" | |
| # Use -f to force overwrite existing tags | |
| subprocess.run(["git", "tag", "-f", tag], cwd=self.project_dir, check=True) | |
| return True | |
| except subprocess.CalledProcessError: | |
| return False | |
| def run_deployment(self, pass_num: int) -> bool: | |
| """Run deployment prompt and fix any issues discovered""" | |
| if not self.deploy_prompt_file or not self.deploy_prompt_file.exists(): | |
| print(" β No deployment prompt configured, skipping deployment step") | |
| return True | |
| pass_label = "Baseline" if pass_num == 0 else f"Pass {pass_num}" | |
| print(f"\n{'='*60}") | |
| print(f"[{pass_label}] DEPLOYMENT STAGE") | |
| print(f"{'='*60}") | |
| # Read deployment prompt | |
| deploy_content = self.deploy_prompt_file.read_text() | |
| # Inject deployment-specific instructions | |
| deploy_context = f"""# Deployment Stage - {pass_label} | |
| You are running a deployment verification after completing all phases in {pass_label}. | |
| ## Your Mission | |
| 1. Execute the deployment steps in this prompt | |
| 2. Identify any issues, errors, or failures during deployment | |
| 3. **FIX any issues you discover** - do not just report them | |
| 4. After fixing, re-run the deployment to verify the fix works | |
| ## Commit Convention | |
| If you make ANY fixes during this deployment stage, commit them with this prefix: | |
| ``` | |
| git commit -m "fixed in deployment stage: <description of what was fixed>" | |
| ``` | |
| ## Important | |
| - Be thorough - deployment often reveals issues that tests miss | |
| - Fix runtime errors, missing env vars, integration issues, startup failures | |
| - Each fix should be a separate commit with the prefix above | |
| - Re-verify after each fix before moving on | |
| --- | |
| """ | |
| full_prompt = deploy_context + deploy_content | |
| # Run deployment | |
| log_file = self.logs_dir / f"pass{pass_num:02d}_deployment.log" | |
| print(f" β Running deployment (logging to {log_file.name})...") | |
| success, output = self.run_claude_code(full_prompt, log_file) | |
| if not success: | |
| print(f" β Deployment failed. Check {log_file}") | |
| return False | |
| # Commit any uncommitted changes from deployment fixes | |
| print(" β Checking for deployment fixes to commit...") | |
| self.commit_deployment_fixes(pass_num) | |
| print(f" β [{pass_label}] Deployment complete") | |
| return True | |
| def commit_deployment_fixes(self, pass_num: int): | |
| """Commit any staged/unstaged changes from deployment fixes""" | |
| try: | |
| # Check if there are any changes | |
| result = subprocess.run( | |
| ["git", "status", "--porcelain"], | |
| capture_output=True, text=True, cwd=self.project_dir | |
| ) | |
| if result.stdout.strip(): | |
| # There are changes - commit them | |
| subprocess.run(["git", "add", "-A"], cwd=self.project_dir, check=True) | |
| subprocess.run( | |
| ["git", "commit", "-m", f"fixed in deployment stage: pass {pass_num} deployment fixes"], | |
| cwd=self.project_dir | |
| ) | |
| print(" β Committed deployment fixes") | |
| else: | |
| print(" β No additional fixes needed") | |
| except subprocess.CalledProcessError: | |
| pass # No changes or commit failed - that's okay | |
| def verify_summary_exists(self, phase_num: int, phase_name: str, pass_num: int) -> bool: | |
| """Check that Claude created the expected summary file""" | |
| pass_dir = self.get_pass_summary_dir(pass_num) | |
| pattern = f"{phase_num:02d}_*_summary.md" | |
| return len(list(pass_dir.glob(pattern))) > 0 | |
| def run_phase(self, prompt_file: Path, pass_num: int) -> bool: | |
| """Execute a single phase""" | |
| match = re.match(r"(\d+)_(.+)\.md", prompt_file.name) | |
| if not match: | |
| print(f" β Invalid prompt filename format: {prompt_file.name}") | |
| return False | |
| phase_num = int(match.group(1)) | |
| phase_name = match.group(2) | |
| pass_label = "Baseline" if pass_num == 0 else f"Pass {pass_num}" | |
| print(f"\n{'='*60}") | |
| print(f"[{pass_label}] Phase {phase_num:02d}: {phase_name}") | |
| print(f"{'='*60}") | |
| # Ensure summary directory for this pass exists | |
| pass_summary_dir = self.ensure_pass_summary_dir(pass_num) | |
| # Build context and inject | |
| print(" β Building context from previous phases and passes...") | |
| context = self.build_context(phase_num, pass_num) | |
| prompt_content = prompt_file.read_text() | |
| # Add instruction about where to save summary | |
| summary_instruction = f"\n\n---\n\n**IMPORTANT**: Save your phase summary to: `{pass_summary_dir}/{phase_num:02d}_{phase_name}_summary.md`" | |
| prompt_content += summary_instruction | |
| full_prompt = self.inject_context(prompt_content, context) | |
| # Run Claude Code | |
| log_file = self.logs_dir / f"pass{pass_num:02d}_{phase_num:02d}_{phase_name}.log" | |
| print(f" β Running Claude Code (logging to {log_file.name})...") | |
| success, output = self.run_claude_code(full_prompt, log_file) | |
| if not success: | |
| print(f" β Claude Code failed. Check {log_file}") | |
| return False | |
| # Verify summary was created | |
| print(" β Verifying summary file created...") | |
| if not self.verify_summary_exists(phase_num, phase_name, pass_num): | |
| print(f" β Warning: Summary file not found for phase {phase_num} in {pass_summary_dir.name}") | |
| # Git commit and tag | |
| print(" β Committing and tagging...") | |
| if not self.git_commit_and_tag(phase_name, phase_num, pass_num): | |
| print(" β Warning: Git commit/tag failed") | |
| # Update CLAUDE.md | |
| print(" β Updating CLAUDE.md via /init...") | |
| self.run_claude_init() | |
| print(f" β [{pass_label}] Phase {phase_num:02d} complete") | |
| return True | |
| def run(self, start_from: int = 1, stop_after: int = None, passes: int = 1, start_pass: int = 0, force: bool = False): | |
| """ | |
| Run all phases sequentially with multiple passes. | |
| Args: | |
| start_from: Phase number to start from | |
| stop_after: Phase number to stop after (None = run all) | |
| passes: Number of full passes through all prompts (1 = single pass, 2+ = refinement) | |
| start_pass: Pass number to start from (0 = baseline, 1+ = refinement only) | |
| force: Force re-run even if phases are marked complete | |
| """ | |
| state = self.load_state() | |
| prompts = self.get_ordered_prompts() | |
| if not prompts: | |
| print("No prompt files found matching pattern: XX_name.md") | |
| return | |
| # Ensure baseline summaries are organized | |
| self.ensure_baseline_summaries() | |
| print(f"Found {len(prompts)} phases:") | |
| for p in prompts: | |
| print(f" - {p.name}") | |
| print(f"\nPlanned passes: {passes} (starting from pass {start_pass})") | |
| # Determine starting pass - use start_pass arg, or resume from state if not forcing | |
| if force: | |
| actual_start_pass = start_pass | |
| else: | |
| actual_start_pass = max(start_pass, state.get("current_pass", 0)) | |
| for current_pass in range(actual_start_pass, passes): | |
| pass_label = "Baseline Build" if current_pass == 0 else f"Refinement Pass {current_pass}" | |
| print(f"\n{'#'*60}") | |
| print(f"# {pass_label}") | |
| print(f"{'#'*60}") | |
| # Track completed phases for this pass | |
| pass_key = f"pass_{current_pass}_completed" | |
| if force: | |
| state[pass_key] = [] | |
| for prompt_file in prompts: | |
| phase_num = int(re.match(r"(\d+)", prompt_file.name).group(1)) | |
| # Skip if before start_from (only for first pass or if resuming) | |
| if phase_num < start_from and current_pass == actual_start_pass: | |
| print(f"\nSkipping phase {phase_num} (before start_from={start_from})") | |
| continue | |
| # Stop if after stop_after | |
| if stop_after and phase_num > stop_after: | |
| print(f"\nStopping after phase {stop_after}") | |
| break | |
| # Skip if already completed in this pass (unless forcing) | |
| completed_in_pass = state.get(pass_key, []) | |
| if prompt_file.name in completed_in_pass and not force: | |
| print(f"\nSkipping phase {phase_num} (already completed in pass {current_pass})") | |
| continue | |
| # Run the phase | |
| success = self.run_phase(prompt_file, current_pass) | |
| if success: | |
| if pass_key not in state: | |
| state[pass_key] = [] | |
| state[pass_key].append(prompt_file.name) | |
| state["current_pass"] = current_pass | |
| self.save_state(state) | |
| else: | |
| print(f"\nβ Phase {phase_num} failed in pass {current_pass}. Stopping.") | |
| print(f" Resume with: python orchestrator.py --start-from {phase_num}") | |
| return | |
| # Tag pass completion | |
| print(f"\n β Tagging {pass_label} complete...") | |
| self.git_tag_pass_complete(current_pass) | |
| # Run deployment if configured | |
| if self.deploy_prompt_file: | |
| deploy_success = self.run_deployment(current_pass) | |
| if not deploy_success: | |
| print(f"\nβ Deployment failed after {pass_label}. Stopping.") | |
| print(f" Fix issues and resume with: python orchestrator.py --start-pass {current_pass + 1}") | |
| return | |
| # Reset start_from for subsequent passes | |
| start_from = 1 | |
| print("\n" + "="*60) | |
| print(f"Orchestration complete - {passes} pass(es) finished") | |
| print("="*60) | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Run staged prompt orchestration") | |
| parser.add_argument("--start-from", type=int, default=1, help="Phase number to start from") | |
| parser.add_argument("--stop-after", type=int, help="Phase number to stop after") | |
| parser.add_argument("--prompts-dir", default="prompts", help="Directory containing prompts") | |
| parser.add_argument("--list", action="store_true", help="List phases without running") | |
| parser.add_argument("--passes", type=int, default=1, help="Number of full passes (1=baseline only, 2+=refinement)") | |
| parser.add_argument("--start-pass", type=int, default=0, help="Pass number to start from (0=baseline, 1+=refinement)") | |
| parser.add_argument("--force", action="store_true", help="Force re-run, ignore completion state") | |
| parser.add_argument("--deploy-prompt", type=str, help="Path to deployment prompt file (runs after each pass)") | |
| args = parser.parse_args() | |
| orchestrator = PhaseOrchestrator( | |
| prompts_dir=args.prompts_dir, | |
| deploy_prompt=args.deploy_prompt | |
| ) | |
| if args.list: | |
| prompts = orchestrator.get_ordered_prompts() | |
| print("Phases:") | |
| for p in prompts: | |
| print(f" {p.name}") | |
| else: | |
| orchestrator.run( | |
| start_from=args.start_from, | |
| stop_after=args.stop_after, | |
| passes=args.passes, | |
| start_pass=args.start_pass, | |
| force=args.force | |
| ) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Staged Prompt Orchestrator for Claude Code
This script acts as a powerful automation wrapper around the Anthropic
claudeCLI tool ("Claude Code"). It allows you to break complex software development tasks into small, sequential phases and execute them automatically, managing context, git history, and iterative refinement along the way.π Why use this?
When building large projects with LLMs, context windows often become a bottleneck. If you feed everything into one session, the model forgets earlier instructions. This orchestrator solves that by:
01_init.md,02_database.md, etc.).β¨ Key Features
1. The "Passes" System
The script distinguishes between Baseline Builds and Refinement Passes:
2. Context Injection
The script looks for a placeholder `` in your prompt files. It dynamically replaces this with:
3. Automated Git Management
No need to manually commit. The orchestrator:
phase-01-completeandpass-01-complete.CLAUDE.mdautomatically via/init.4. Deployment Verification
If you provide a
--deploy-prompt, the script runs a verification loop after every full pass. If the deployment check fails, it instructs Claude to fix the specific errors found, commits the fix, and verifies again.π Directory Structure
Your project should look like this:
π Usage
Basic Run (Baseline Build)
Run all prompts in the
prompts/directory in numeric order (01, 02, 03...).Iterative Refinement
Run the baseline build, then immediately run a "refinement pass" to polish the code.
# Runs Pass 0 (Build) then Pass 1 (Refinement) python3 orchestrator.py --passes 2Resume Interrupted Run
If the script crashes or you stop it, it saves state. You can resume specifically:
# Start from Phase 3 python3 orchestrator.py --start-from 3Deployment Hook
Run the deployment verification prompt after the pass completes.
π Prompt File Format
Your markdown prompt files (e.g.,
01_setup.md) should look like this:# Phase 1: Project Setup Create a basic Flask application structure. ... (rest of your prompt)Note: The script automatically appends an instruction telling Claude to save a summary of its work to a specific file, which is used to feed the next phase.