-
-
Save krish240574/2e3b848b4a9b8afcff72880a7398490e to your computer and use it in GitHub Desktop.
Nano Harness: Agent Harness in 223 lines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import os | |
| import re | |
| import shlex | |
| import subprocess | |
| import sys | |
| import time | |
| import traceback | |
| import urllib.error | |
| import urllib.request | |
| from pathlib import Path | |
| TASK = "Inspect the workspace and provide a short summary." | |
| MODEL = os.getenv("NANO_MODEL", "zai-org/GLM-5") | |
| BASE_URL = os.getenv("OPENAI_BASE_URL", "https://router.huggingface.co/v1") | |
| API_KEY = os.getenv("HF_TOKEN", "") | |
| WORKSPACE = str(Path.cwd()) | |
| MAX_STEPS = 50 | |
| TIMEOUT_S = 30 | |
| REQUEST_TIMEOUT_S = 180 | |
| TEMPERATURE = 0.2 | |
| SEED = None | |
| MAX_TOKENS = 10000 | |
| MAX_CHARS = 8000 | |
| ALLOW_COMMANDS = ["ls", "cat", "pwd", "echo", "head", "tail", "wc", "rg"] | |
| ALLOW_WRITE = True | |
| MODEL_RETRIES = 2 | |
| MOCK_MODEL = os.getenv("NANO_MOCK", "0") == "1" | |
| SYSTEM_PROMPT = f"""You are a code-first agent. | |
| Reply with executable Python only. | |
| Use tools as Python functions and call final_answer(value) when done. | |
| If task needs a file, create it early with write_file. | |
| No prose/markdown outside python code. | |
| Tools: list_dir(path='.'), read_file(path,max_chars=4000), write_file(path,content), exec_cmd(args) | |
| Allowed exec_cmd first token: {ALLOW_COMMANDS} | |
| """ | |
| RETRY_PROMPT = "Invalid format. Respond ONLY with executable Python in one ```python code block. No prose." | |
| def clip(x, n=MAX_CHARS): | |
| s = str(x) | |
| return s if len(s) <= n else s[:n] + f"\n...[truncated {len(s) - n} chars]" | |
| def run(task): | |
| ws = Path(WORKSPACE).resolve() | |
| def safe_path(path): | |
| p = Path(path) | |
| if not p.is_absolute(): | |
| p = ws / p | |
| p = p.resolve() | |
| if p == ws or ws in p.parents: | |
| return p | |
| raise ValueError(f"path escapes workspace: {path}") | |
| def list_dir(path="."): | |
| p = safe_path(path) | |
| if not p.is_dir(): | |
| raise NotADirectoryError(str(p)) | |
| return sorted(x.name + ("/" if x.is_dir() else "") for x in p.iterdir()) | |
| def read_file(path, max_chars=4000): | |
| p = safe_path(path) | |
| return clip(p.read_text(encoding="utf-8", errors="replace"), min(max_chars, MAX_CHARS)) | |
| def write_file(path, content): | |
| if not ALLOW_WRITE: | |
| raise PermissionError("write_file disabled") | |
| p = safe_path(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(str(content), encoding="utf-8") | |
| return f"wrote {len(str(content))} bytes to {p}" | |
| def exec_cmd(args): | |
| cmd = shlex.split(args) if isinstance(args, str) else [str(x) for x in args] | |
| if not cmd: | |
| raise ValueError("empty command") | |
| if cmd[0] not in ALLOW_COMMANDS: | |
| raise PermissionError(f"command not allowed: {cmd[0]}") | |
| p = subprocess.run( | |
| cmd, | |
| cwd=str(ws), | |
| capture_output=True, | |
| text=True, | |
| timeout=TIMEOUT_S, | |
| shell=False, | |
| check=False, | |
| ) | |
| return {"exit_code": p.returncode, "stdout": clip(p.stdout or ""), "stderr": clip(p.stderr or "")} | |
| tools = {"list_dir": list_dir, "read_file": read_file, "write_file": write_file, "exec_cmd": exec_cmd} | |
| def call_model(messages): | |
| payload = { | |
| "model": MODEL, | |
| "messages": messages, | |
| "temperature": TEMPERATURE, | |
| "max_tokens": MAX_TOKENS, | |
| "stream": False, | |
| } | |
| headers = {"Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}"} | |
| req = urllib.request.Request( | |
| url=BASE_URL.rstrip("/") + "/chat/completions", | |
| data=json.dumps(payload).encode("utf-8"), | |
| headers=headers, | |
| method="POST", | |
| ) | |
| last = "unknown" | |
| for i in range(MODEL_RETRIES + 1): | |
| try: | |
| with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT_S) as r: | |
| d = json.loads(r.read().decode("utf-8")) | |
| return d["choices"][0]["message"]["content"] | |
| except urllib.error.HTTPError as e: | |
| last = f"HTTP {e.code}: {e.read().decode('utf-8', errors='replace')}" | |
| except Exception as e: | |
| last = f"{type(e).__name__}: {e}" | |
| if i < MODEL_RETRIES: | |
| time.sleep(1.2 * (i + 1)) | |
| raise RuntimeError(f"model request failed: {last}") | |
| def parse_code(text): | |
| t = str(text).strip() | |
| if t.startswith("{") and t.endswith("}"): | |
| try: | |
| j = json.loads(t) | |
| if isinstance(j, dict) and isinstance(j.get("code"), str): | |
| return j["code"].strip() | |
| except Exception: | |
| pass | |
| for block in re.findall(r"```(?:python|py)\s*(.*?)```", t, re.DOTALL | re.IGNORECASE): | |
| c = block.strip() | |
| if not c: | |
| continue | |
| try: | |
| compile(c, "<agent>", "exec") | |
| return c | |
| except Exception: | |
| pass | |
| try: | |
| compile(t, "<agent>", "exec") | |
| return t | |
| except Exception as e: | |
| raise ValueError(f"not executable python: {e} | preview={clip(t, 220).replace(chr(10), ' ')}") | |
| def exec_code(code): | |
| printed, done = [], {"v": None} | |
| def p(*a, **k): | |
| printed.append(k.get("sep", " ").join(str(x) for x in a) + k.get("end", "\n")) | |
| def final_answer(v): | |
| done["v"] = v | |
| raise StopIteration | |
| env = { | |
| "__builtins__": { | |
| "print": p, | |
| "len": len, | |
| "range": range, | |
| "str": str, | |
| "int": int, | |
| "float": float, | |
| "bool": bool, | |
| "list": list, | |
| "dict": dict, | |
| "set": set, | |
| "tuple": tuple, | |
| "enumerate": enumerate, | |
| "zip": zip, | |
| "sum": sum, | |
| "min": min, | |
| "max": max, | |
| "sorted": sorted, | |
| "Exception": Exception, | |
| "__import__": __import__, | |
| }, | |
| "__name__": "__nano_agent__", | |
| "final_answer": final_answer, | |
| **tools, | |
| } | |
| try: | |
| exec(code, env) | |
| return False, None, clip("".join(printed)), None | |
| except StopIteration: | |
| return True, done["v"], clip("".join(printed)), None | |
| except Exception: | |
| return False, None, clip("".join(printed)), clip(traceback.format_exc()) | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": task}] | |
| for step in range(1, MAX_STEPS + 1): | |
| print(f"[{step}/{MAX_STEPS}] : {messages[-1]['content'][:100]}") | |
| try: | |
| text = call_model(messages) | |
| except Exception as e: | |
| print(f"[{step}/{MAX_STEPS}] : model call failed: {e}", file=sys.stderr) | |
| return 4 | |
| try: | |
| code = parse_code(text) | |
| except Exception as e: | |
| print(f"[{step}/{MAX_STEPS}] : parse error: {e}", file=sys.stderr) | |
| messages += [{"role": "assistant", "content": text}, {"role": "user", "content": RETRY_PROMPT}] | |
| continue | |
| done, value, out, err = exec_code(code) | |
| messages.append({"role": "assistant", "content": f"```python\n{code}\n```"}) | |
| if done: | |
| print(f"[{step}/{MAX_STEPS}] : {value}", file=sys.stderr) | |
| return 0 | |
| messages.append( | |
| { | |
| "role": "user", | |
| "content": f"Observation from step {step}: stdout={clip(out, 1200)} error={clip(err, 1200)} Continue.", | |
| } | |
| ) | |
| print(f"[{MAX_STEPS}/{MAX_STEPS}] : max steps reached", file=sys.stderr) | |
| return 2 | |
| if __name__ == "__main__": | |
| raise SystemExit(run(" ".join(sys.argv[1:]).strip() or TASK)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment