Last active
March 11, 2026 23:32
-
-
Save byt3bl33d3r/061dad31daa37b1d81dfa8cc67d45168 to your computer and use it in GitHub Desktop.
Python script that uses Claude Code's voice mode STT web socket URL to transcribe voice to text (requires Claude Code subscription)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "websockets", | |
| # ] | |
| # /// | |
| """ | |
| Claude Code Voice Mode — Python Proof of Concept | |
| Reverse-engineered from the Claude Code CLI binary (v2.1.72). | |
| Replicates the push-to-talk voice flow: | |
| 1. Hold SPACE to record audio via SoX (rec) | |
| 2. Stream raw PCM audio over WebSocket to claude.ai STT endpoint | |
| 3. Receive interim/final transcripts | |
| 4. Print the final transcript as a prompt | |
| Requirements: | |
| pip install websockets | |
| # SoX must be installed: apt install sox / brew install sox | |
| # ============================================================================= | |
| # REVERSE ENGINEERING NOTES (from Claude Code CLI v2.1.72) | |
| # ============================================================================= | |
| # | |
| # Source: Bun single-file executable at ~/.local/share/claude/versions/2.1.72 | |
| # Also readable via VS Code extension at: | |
| # ~/.vscode-server/extensions/anthropic.claude-code-*/extension.js | |
| # | |
| # --- WebSocket Endpoint --- | |
| # Path: /api/ws/speech_to_text/voice_stream | |
| # Base URL derived from CLAUDE_AI_AUTHORIZE_URL (default: https://claude.ai/oauth/authorize) | |
| # -> origin = https://claude.ai -> wss://claude.ai | |
| # Override: VOICE_STREAM_BASE_URL env var | |
| # | |
| # --- Authentication Headers (exactly 3) --- | |
| # Authorization: Bearer <oauth_access_token> | |
| # User-Agent: claude-cli/<VERSION> (external, <CLAUDE_CODE_ENTRYPOINT>) | |
| # x-app: cli | |
| # | |
| # The User-Agent is built by iv() in the binary: | |
| # `claude-cli/${VERSION} (external, ${process.env.CLAUDE_CODE_ENTRYPOINT}${sdk_suffix}${workload_suffix})` | |
| # The "x-app: cli" header is REQUIRED — without it, the server returns 403. | |
| # | |
| # NO other headers are set (no cookies, no Origin, no Referer, no x-client-app). | |
| # | |
| # --- OAuth Token --- | |
| # Read from ~/.claude/.credentials.json: | |
| # { "claudeAiOauth": { "accessToken": "sk-ant-oat01-...", "expiresAt": ..., ... } } | |
| # Token is refreshed via S$() before each connection (call `claude /login` to re-auth). | |
| # Can also be provided via CLAUDE_CODE_OAUTH_TOKEN env var. | |
| # Required scopes: user:inference, user:mcp_servers, user:profile, user:sessions:claude_code | |
| # | |
| # --- Query Parameters --- | |
| # encoding=linear16 Audio format: signed 16-bit little-endian PCM | |
| # sample_rate=16000 16kHz sample rate | |
| # channels=1 Mono audio | |
| # endpointing_ms=300 Endpoint detection window | |
| # utterance_end_ms=1000 Utterance end detection | |
| # language=en Language (configurable) | |
| # | |
| # Optional (feature-gated behind "tengu_cobalt_frost"): | |
| # use_conversation_engine=true | |
| # stt_provider=deepgram-nova3 | |
| # keyterms=<term> Repeated param for recognition hints | |
| # | |
| # --- WebSocket Protocol --- | |
| # Client -> Server: | |
| # {"type": "KeepAlive"} Sent immediately on open, then every 8 seconds (hlO=8000) | |
| # <binary PCM chunks> Raw audio data (100ms chunks = 3200 bytes at 16kHz/16bit/mono) | |
| # {"type": "CloseStream"} Signals end of audio input | |
| # | |
| # Server -> Client: | |
| # {"type": "TranscriptText", "data": "..."} Interim transcription result | |
| # {"type": "TranscriptEndpoint"} Marks end of utterance (finalizes preceding text) | |
| # {"type": "TranscriptError", "description": "...", "error_code": "..."} Error | |
| # {"type": "error", "message": "..."} Server error | |
| # | |
| # --- Finalize Timeouts --- | |
| # safety: 5000ms Max time to wait for final transcript after CloseStream | |
| # noData: 1500ms Time to wait with no new data before resolving | |
| # | |
| # --- TLS / Cloudflare Notes --- | |
| # The endpoint is behind Cloudflare bot protection which uses TLS fingerprinting | |
| # (JA3/JA4). Python's OpenSSL version affects the fingerprint: | |
| # - OpenSSL 3.5+ (Debian trixie): passes Cloudflare challenge | |
| # - OpenSSL 3.0.x (Debian bookworm): gets 403 "cf-mitigated: challenge" | |
| # Both produce the same exit IP — it's purely the TLS handshake fingerprint. | |
| # If running in Docker, use a trixie-based image (e.g. python:3.14-slim-trixie). | |
| # | |
| # --- Connection Options (Bun vs Node) --- | |
| # Bun: { headers, proxy: $U(url), tls: JS() || undefined } | |
| # Node: { headers, agent: HU(url), ...JS() } | |
| # JS() returns custom TLS/cert settings; $U()/HU() handle HTTP proxy from env. | |
| # ============================================================================= | |
| """ | |
| import websockets | |
| import asyncio | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| # --- Config --- | |
| CREDENTIALS_PATH = Path.home() / ".claude" / ".credentials.json" | |
| STT_ENDPOINT = "/api/ws/speech_to_text/voice_stream" | |
| CLAUDE_AI_ORIGIN = "https://claude.ai" | |
| CLI_VERSION = "2.1.72" | |
| AUDIO_ENCODING = "linear16" | |
| SAMPLE_RATE = 16000 | |
| CHANNELS = 1 | |
| ENDPOINTING_MS = 300 | |
| UTTERANCE_END_MS = 1000 | |
| LANGUAGE = "en" | |
| KEEPALIVE_INTERVAL = 8 # seconds (hlO=8000 in the CLI binary) | |
| CLOSE_TIMEOUT = 5.0 | |
| NO_DATA_TIMEOUT = 1.5 | |
| CHUNK_DURATION_MS = 100 # send audio every 100ms | |
| CHUNK_SIZE = SAMPLE_RATE * 2 * CHANNELS * CHUNK_DURATION_MS // 1000 # 3200 bytes | |
| def load_oauth_token() -> str: | |
| """Load OAuth access token from Claude Code credentials.""" | |
| # Check env var override first (matches CLI behavior) | |
| env_token = os.environ.get("CLAUDE_CODE_OAUTH_TOKEN") | |
| if env_token: | |
| return env_token | |
| if not CREDENTIALS_PATH.exists(): | |
| print(f"No credentials found at {CREDENTIALS_PATH}") | |
| print("Run 'claude /login' first to authenticate.") | |
| sys.exit(1) | |
| creds = json.loads(CREDENTIALS_PATH.read_text()) | |
| token = creds.get("claudeAiOauth", {}).get("accessToken") | |
| if not token: | |
| print("No OAuth access token found. Run 'claude /login' first.") | |
| sys.exit(1) | |
| return token | |
| def build_ws_url() -> str: | |
| """Build the STT WebSocket URL with query params.""" | |
| base = os.environ.get("VOICE_STREAM_BASE_URL") | |
| if not base: | |
| base = CLAUDE_AI_ORIGIN.replace("https://", "wss://").replace("http://", "ws://") | |
| params = ( | |
| f"encoding={AUDIO_ENCODING}" | |
| f"&sample_rate={SAMPLE_RATE}" | |
| f"&channels={CHANNELS}" | |
| f"&endpointing_ms={ENDPOINTING_MS}" | |
| f"&utterance_end_ms={UTTERANCE_END_MS}" | |
| f"&language={LANGUAGE}" | |
| ) | |
| return f"{base}{STT_ENDPOINT}?{params}" | |
| def build_headers(token: str) -> dict[str, str]: | |
| """Build WebSocket headers matching the Claude Code CLI exactly.""" | |
| return { | |
| "Authorization": f"Bearer {token}", | |
| "User-Agent": f"claude-cli/{CLI_VERSION} (external, cli)", | |
| "x-app": "cli", | |
| } | |
| def check_sox() -> bool: | |
| """Check if SoX (rec command) is available.""" | |
| try: | |
| subprocess.run( | |
| ["rec", "--help"], | |
| capture_output=True, | |
| timeout=5, | |
| ) | |
| return True | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| return False | |
| def check_arecord() -> bool: | |
| """Check if arecord is available (Linux ALSA fallback).""" | |
| try: | |
| subprocess.run( | |
| ["arecord", "--help"], | |
| capture_output=True, | |
| timeout=5, | |
| ) | |
| return True | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| return False | |
| def start_recording_process() -> subprocess.Popen: | |
| """Start a SoX or arecord subprocess that outputs raw PCM to stdout.""" | |
| if check_sox(): | |
| # rec: SoX recording command | |
| # Output: signed 16-bit little-endian, 16kHz, mono, raw PCM | |
| cmd = [ | |
| "rec", | |
| "--no-show-progress", | |
| "--rate", str(SAMPLE_RATE), | |
| "--channels", str(CHANNELS), | |
| "--encoding", "signed-integer", | |
| "--bits", "16", | |
| "--type", "raw", | |
| "-", # output to stdout | |
| ] | |
| elif check_arecord(): | |
| cmd = [ | |
| "arecord", | |
| "--format=S16_LE", | |
| f"--rate={SAMPLE_RATE}", | |
| f"--channels={CHANNELS}", | |
| "--file-type=raw", | |
| "--quiet", | |
| ] | |
| else: | |
| print("Neither SoX (rec) nor arecord found.") | |
| print("Install SoX: apt install sox libsox-fmt-all / brew install sox") | |
| sys.exit(1) | |
| return subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| class VoiceSession: | |
| """Manages one push-to-talk voice recording session.""" | |
| def __init__(self, token: str): | |
| self.token = token | |
| self.ws = None | |
| self.recording_process = None | |
| self.final_transcript = "" | |
| self.interim_transcript = "" | |
| self.connected = asyncio.Event() | |
| self.done = asyncio.Event() | |
| self.closed = False | |
| async def connect(self): | |
| """Connect to the STT WebSocket endpoint.""" | |
| url = build_ws_url() | |
| headers = build_headers(self.token) | |
| self.ws = await websockets.connect(url, additional_headers=headers) | |
| self.connected.set() | |
| # Send initial KeepAlive (matches CLI behavior) | |
| await self.ws.send(json.dumps({"type": "KeepAlive"})) | |
| async def send_keepalives(self): | |
| """Send periodic KeepAlive messages (every 8s, matching CLI's hlO=8000).""" | |
| try: | |
| while not self.closed: | |
| await asyncio.sleep(KEEPALIVE_INTERVAL) | |
| if self.ws and not self.closed: | |
| await self.ws.send(json.dumps({"type": "KeepAlive"})) | |
| except Exception: | |
| pass | |
| async def receive_messages(self): | |
| """Receive and process transcript messages from the server.""" | |
| try: | |
| async for message in self.ws: | |
| data = json.loads(message) | |
| msg_type = data.get("type") | |
| if msg_type == "TranscriptText": | |
| text = data.get("data", "") | |
| if text: | |
| self.interim_transcript = text | |
| # Clear line and show interim transcript | |
| sys.stdout.write(f"\r\033[K >> {text}") | |
| sys.stdout.flush() | |
| elif msg_type == "TranscriptEndpoint": | |
| # Final segment — commit the interim transcript | |
| if self.interim_transcript: | |
| if self.final_transcript: | |
| self.final_transcript += " " | |
| self.final_transcript += self.interim_transcript | |
| self.interim_transcript = "" | |
| if self.closed: | |
| self.done.set() | |
| return | |
| elif msg_type == "TranscriptError": | |
| desc = data.get("description") or data.get("error_code") or "unknown" | |
| sys.stdout.write(f"\r\033[K [STT Error: {desc}]\n") | |
| sys.stdout.flush() | |
| elif msg_type == "error": | |
| # Server-level error (different from TranscriptError) | |
| msg = data.get("message", "unknown server error") | |
| sys.stdout.write(f"\r\033[K [Server Error: {msg}]\n") | |
| sys.stdout.flush() | |
| except websockets.ConnectionClosed: | |
| pass | |
| finally: | |
| self.done.set() | |
| async def stream_audio(self): | |
| """Read audio from recording process and send to WebSocket.""" | |
| await self.connected.wait() | |
| self.recording_process = start_recording_process() | |
| try: | |
| while not self.closed: | |
| chunk = self.recording_process.stdout.read(CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| if self.ws and not self.closed: | |
| await self.ws.send(chunk) | |
| # Small yield to let other tasks run | |
| await asyncio.sleep(0) | |
| except Exception: | |
| pass | |
| async def stop_recording(self): | |
| """Stop recording and finalize the transcript.""" | |
| self.closed = True | |
| # Kill the recording subprocess | |
| if self.recording_process: | |
| self.recording_process.terminate() | |
| try: | |
| self.recording_process.wait(timeout=2) | |
| except subprocess.TimeoutExpired: | |
| self.recording_process.kill() | |
| # Send CloseStream to finalize (matches CLI behavior) | |
| if self.ws: | |
| try: | |
| await self.ws.send(json.dumps({"type": "CloseStream"})) | |
| except Exception: | |
| pass | |
| # Wait for final transcript with timeout | |
| try: | |
| await asyncio.wait_for(self.done.wait(), timeout=CLOSE_TIMEOUT) | |
| except asyncio.TimeoutError: | |
| pass | |
| # Promote any unreported interim transcript | |
| if self.interim_transcript: | |
| if self.final_transcript: | |
| self.final_transcript += " " | |
| self.final_transcript += self.interim_transcript | |
| # Close WebSocket | |
| if self.ws: | |
| try: | |
| await self.ws.close() | |
| except Exception: | |
| pass | |
| return self.final_transcript | |
| def setup_raw_terminal(): | |
| """Put terminal in raw mode to detect keypress without Enter.""" | |
| import tty | |
| import termios | |
| fd = sys.stdin.fileno() | |
| old_settings = termios.tcgetattr(fd) | |
| tty.setcbreak(fd) # cbreak mode: read chars without line buffering | |
| return old_settings | |
| def restore_terminal(old_settings): | |
| """Restore terminal to original settings.""" | |
| import termios | |
| fd = sys.stdin.fileno() | |
| termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) | |
| async def wait_for_key_release(key=" "): | |
| """Wait for the spacebar to be released (non-blocking).""" | |
| loop = asyncio.get_event_loop() | |
| while True: | |
| char = await loop.run_in_executor(None, sys.stdin.read, 1) | |
| if char != key: | |
| return char | |
| async def wait_for_key_press(key=" "): | |
| """Wait for a specific key press (non-blocking).""" | |
| loop = asyncio.get_event_loop() | |
| while True: | |
| char = await loop.run_in_executor(None, sys.stdin.read, 1) | |
| if char == key: | |
| return | |
| if char in ("\x03", "\x1b", "q"): # Ctrl+C, Escape, q | |
| raise KeyboardInterrupt | |
| async def run_voice_session(token: str) -> str: | |
| """Run a single push-to-talk voice session. Returns the transcript.""" | |
| session = VoiceSession(token) | |
| await session.connect() | |
| # Start background tasks | |
| keepalive_task = asyncio.create_task(session.send_keepalives()) | |
| receive_task = asyncio.create_task(session.receive_messages()) | |
| audio_task = asyncio.create_task(session.stream_audio()) | |
| # Wait for spacebar release to stop | |
| try: | |
| await wait_for_key_release() | |
| except (KeyboardInterrupt, EOFError): | |
| pass | |
| sys.stdout.write("\r\033[K Processing...\n") | |
| sys.stdout.flush() | |
| # Stop recording and get transcript | |
| transcript = await session.stop_recording() | |
| # Cleanup tasks | |
| keepalive_task.cancel() | |
| audio_task.cancel() | |
| receive_task.cancel() | |
| for task in [keepalive_task, audio_task, receive_task]: | |
| try: | |
| await task | |
| except (asyncio.CancelledError, Exception): | |
| pass | |
| return transcript | |
| async def main(): | |
| print("=" * 50) | |
| print(" Claude Code Voice Mode — Python PoC") | |
| print("=" * 50) | |
| print() | |
| # Load token | |
| token = load_oauth_token() | |
| print("[OK] OAuth token loaded") | |
| # Check audio recording | |
| if check_sox(): | |
| print("[OK] SoX (rec) available") | |
| elif check_arecord(): | |
| print("[OK] arecord available") | |
| else: | |
| print("[ERR] No audio recorder found") | |
| print(" Install SoX: apt install sox libsox-fmt-all") | |
| sys.exit(1) | |
| print() | |
| print("Hold SPACE to record, release to send.") | |
| print("Press 'q' or Escape to quit.") | |
| print() | |
| import termios | |
| old_settings = setup_raw_terminal() | |
| try: | |
| while True: | |
| sys.stdout.write(" Press and hold SPACE to speak... ") | |
| sys.stdout.flush() | |
| try: | |
| await wait_for_key_press(" ") | |
| except KeyboardInterrupt: | |
| break | |
| sys.stdout.write("\r\033[K Recording... (release SPACE to stop)\n") | |
| sys.stdout.flush() | |
| transcript = await run_voice_session(token) | |
| sys.stdout.write("\r\033[K") | |
| if transcript.strip(): | |
| print(f"\n Transcript: {transcript.strip()}") | |
| print(" (This would be sent as your prompt to Claude)") | |
| else: | |
| print("\n (No speech detected)") | |
| print() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| restore_terminal(old_settings) | |
| print("\n\nVoice mode disabled.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment