Created
February 19, 2026 01:54
-
-
Save quinncomendant/b787b205a0c9806e3be9dfe9570e5e70 to your computer and use it in GitHub Desktop.
Summarize images or videos via chat completions API: `summarize-image.py --model qwen3-vl-2b-instruct-mlx@bf16 --stats /path/to/video.mp4`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "requests>=2.31", | |
| # "opencv-python-headless>=4.8", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import json | |
| import mimetypes | |
| import os | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Sequence, Tuple | |
| import requests | |
| try: | |
| import cv2 | |
| except ImportError: | |
| cv2 = None # Will be caught when trying to process video | |
| DEFAULT_URL = "http://localhost:1234/v1/chat/completions" | |
| DEFAULT_SYSTEM = "You are a helpful assistant." | |
| DEFAULT_PROMPT = """Describe the contents of this image or video. | |
| """ | |
| @dataclass | |
| class OneResult: | |
| image_path: str | |
| summary: str | |
| wall_s: float | |
| prompt_tokens: Optional[int] | |
| completion_tokens: Optional[int] | |
| total_tokens: Optional[int] | |
| error: Optional[str] = None | |
| def eprint(*args: object, **kwargs: object) -> None: | |
| print(*args, file=sys.stderr, **kwargs) | |
| def guess_mime(path: str) -> str: | |
| mime, _ = mimetypes.guess_type(path) | |
| if mime: | |
| return mime | |
| # Reasonable default for unknown extensions | |
| return "image/jpeg" | |
| def is_video(path: str) -> bool: | |
| """Check if file is a video based on mime type or extension.""" | |
| mime, _ = mimetypes.guess_type(path) | |
| if mime and mime.startswith("video/"): | |
| return True | |
| # Check extension as fallback | |
| return path.lower().endswith((".mp4", ".webm", ".mov", ".avi", ".mkv")) | |
| def get_video_metadata(path: str) -> Tuple[float, int]: | |
| """ | |
| Get video FPS and calculate frame sampling parameters. | |
| Returns: | |
| (video_fps, video_max_frames): FPS for sampling (e.g., 0.2 = 1 frame per 5 seconds) | |
| and max frames to extract (based on video duration) | |
| """ | |
| if cv2 is None: | |
| raise RuntimeError("opencv-python-headless is required for video processing. " | |
| "Install with: uv pip install opencv-python-headless") | |
| cap = cv2.VideoCapture(path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Failed to open video file: {path}") | |
| try: | |
| # Get original video FPS | |
| original_fps = cap.get(cv2.CAP_PROP_FPS) | |
| if original_fps <= 0: | |
| original_fps = 30 # Default fallback | |
| # Get total frame count | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Get video duration in seconds | |
| duration = total_frames / original_fps if original_fps > 0 else 0 | |
| # Calculate sampling parameters | |
| # Use 0.2 fps = 1 frame every 5 seconds for reasonable coverage | |
| # This is the sampling rate, not the original video FPS | |
| video_fps = 0.2 | |
| # Calculate max frames based on duration (capped at reasonable limits) | |
| # For a 60 second video at 0.2 fps = 12 frames | |
| video_max_frames = min(int(duration * video_fps) + 1, 64) # Cap at 64 frames max | |
| return video_fps, video_max_frames | |
| finally: | |
| cap.release() | |
| def file_to_data_url(path: str) -> str: | |
| mime = guess_mime(path) | |
| with open(path, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode("ascii") | |
| return f"data:{mime};base64,{b64}" | |
| def truncate_data_uris(obj: Any, max_len: int = 50) -> Any: | |
| """Recursively truncate data: URIs in dicts/lists for verbose output.""" | |
| if isinstance(obj, dict): | |
| result = {} | |
| for k, v in obj.items(): | |
| if isinstance(v, str) and v.startswith("data:"): | |
| # Truncate data URIs like "data:video/mp4;base64,AAAA…" | |
| parts = v.split(",", 1) | |
| if len(parts) == 2: | |
| prefix = parts[0] | |
| data = parts[1] | |
| if len(data) > max_len: | |
| result[k] = f"{prefix},{data[:max_len]}…" | |
| else: | |
| result[k] = v | |
| else: | |
| result[k] = v | |
| else: | |
| result[k] = truncate_data_uris(v, max_len) | |
| return result | |
| elif isinstance(obj, list): | |
| return [truncate_data_uris(item, max_len) for item in obj] | |
| else: | |
| return obj | |
| def build_payload( | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| media_path: str, | |
| temperature: float, | |
| max_tokens: int, | |
| *, | |
| is_video: bool = False, | |
| is_remote_url: bool = False, | |
| video_fps: Optional[float] = None, | |
| video_max_frames: Optional[int] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Build the chat completion payload. | |
| For videos: | |
| - Remote URLs: {"type": "video_url", "video_url": {"url": "https://..."}} | |
| - Local files: {"type": "video", "video": "/absolute/path"} | |
| For images: | |
| - {"type": "image_url", "image_url": {"url": "data:image/..."}} | |
| """ | |
| payload = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": user_prompt}, | |
| ], | |
| }, | |
| ], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| "stream": False, | |
| } | |
| if is_video: | |
| # Video: local file or remote URL | |
| if is_remote_url: | |
| # Remote URL: {"type": "video_url", "video_url": {"url": "https://..."}} | |
| payload["messages"][1]["content"].append({ | |
| "type": "video_url", | |
| "video_url": {"url": media_path} | |
| }) | |
| else: | |
| # Local file: {"type": "video", "video": "/absolute/path"} | |
| payload["messages"][1]["content"].append({ | |
| "type": "video", | |
| "video": media_path | |
| }) | |
| # Add video processing parameters at top level | |
| if video_fps is not None: | |
| payload["video_fps"] = video_fps | |
| if video_max_frames is not None: | |
| payload["video_max_frames"] = video_max_frames | |
| else: | |
| # Image: base64 data URL | |
| payload["messages"][1]["content"].append({ | |
| "type": "image_url", | |
| "image_url": {"url": media_path} | |
| }) | |
| return payload | |
| def extract_summary(resp_json: Dict[str, Any]) -> str: | |
| # OpenAI-style: choices[0].message.content | |
| try: | |
| content = resp_json["choices"][0]["message"]["content"] | |
| except Exception: | |
| # Fallback for unexpected schemas | |
| return json.dumps(resp_json, ensure_ascii=False) | |
| if isinstance(content, str): | |
| return content.strip() | |
| # Some servers may return structured content; best-effort flattening | |
| if isinstance(content, list): | |
| parts: List[str] = [] | |
| for part in content: | |
| if isinstance(part, dict) and "text" in part and isinstance(part["text"], str): | |
| parts.append(part["text"]) | |
| if parts: | |
| return "\n".join(s.strip() for s in parts if s.strip()) | |
| return str(content).strip() | |
| def extract_usage(resp_json: Dict[str, Any]) -> Tuple[Optional[int], Optional[int], Optional[int]]: | |
| usage = resp_json.get("usage") | |
| if not isinstance(usage, dict): | |
| return None, None, None | |
| pt = usage.get("prompt_tokens") | |
| ct = usage.get("completion_tokens") | |
| tt = usage.get("total_tokens") | |
| return ( | |
| int(pt) if isinstance(pt, int) else None, | |
| int(ct) if isinstance(ct, int) else None, | |
| int(tt) if isinstance(tt, int) else None, | |
| ) | |
| def post_with_retries( | |
| url: str, | |
| headers: Dict[str, str], | |
| payload: Dict[str, Any], | |
| timeout_s: float, | |
| retries: int, | |
| ) -> Dict[str, Any]: | |
| last_err: Optional[Exception] = None | |
| for attempt in range(retries + 1): | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=timeout_s) | |
| # If API returns JSON error bodies, surface them | |
| if r.status_code >= 400: | |
| try: | |
| j = r.json() | |
| raise RuntimeError(f"HTTP {r.status_code}: {json.dumps(j, ensure_ascii=False)}") | |
| except ValueError: | |
| raise RuntimeError(f"HTTP {r.status_code}: {r.text[:500]}") | |
| return r.json() | |
| except Exception as ex: | |
| last_err = ex | |
| # Simple backoff (small, since localhost usually) | |
| if attempt < retries: | |
| time.sleep(0.25 * (attempt + 1)) | |
| assert last_err is not None | |
| raise last_err | |
| def run_one( | |
| *, | |
| image_path: str, | |
| url: str, | |
| api_key: Optional[str], | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| temperature: float, | |
| max_tokens: int, | |
| timeout_s: float, | |
| retries: int, | |
| verbose: bool = False, | |
| ) -> OneResult: | |
| is_video_file = is_video(image_path) | |
| is_remote_url = image_path.startswith("http://") or image_path.startswith("https://") | |
| # Prepare media path/URL based on type | |
| media_path: str | |
| video_fps: Optional[float] = None | |
| video_max_frames: Optional[int] = None | |
| if is_video_file: | |
| if is_remote_url: | |
| # Remote video URL: use the URL directly | |
| media_path = image_path | |
| # Can't easily get metadata for remote URLs without downloading | |
| # Use reasonable defaults | |
| video_fps = 0.2 | |
| video_max_frames = 16 | |
| else: | |
| # Local video file: use absolute path (not base64) | |
| media_path = os.path.abspath(image_path) | |
| try: | |
| video_fps, video_max_frames = get_video_metadata(image_path) | |
| except Exception as e: | |
| eprint(f"[warning] Failed to get video metadata for {image_path}: {e}") | |
| # Use defaults as fallback | |
| video_fps = 0.2 | |
| video_max_frames = 16 | |
| else: | |
| # Image: use base64 data URL | |
| media_path = file_to_data_url(image_path) | |
| payload = build_payload( | |
| model=model, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt, | |
| media_path=media_path, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| is_video=is_video_file, | |
| is_remote_url=is_remote_url, | |
| video_fps=video_fps, | |
| video_max_frames=video_max_frames, | |
| ) | |
| headers = {"Content-Type": "application/json"} | |
| if api_key: | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| # Verbose: print request | |
| if verbose: | |
| eprint(f"[verbose] Request URL: {url}") | |
| eprint(f"[verbose] Request payload:") | |
| eprint(json.dumps(truncate_data_uris(payload), indent=2, ensure_ascii=False)) | |
| eprint(f"[verbose] Request headers: {json.dumps(headers)}") | |
| t0 = time.perf_counter() | |
| try: | |
| resp_json = post_with_retries(url, headers, payload, timeout_s=timeout_s, retries=retries) | |
| t1 = time.perf_counter() | |
| # Verbose: print response | |
| if verbose: | |
| eprint(f"[verbose] Response JSON:") | |
| eprint(json.dumps(resp_json, indent=2, ensure_ascii=False)) | |
| summary = extract_summary(resp_json) | |
| pt, ct, tt = extract_usage(resp_json) | |
| return OneResult( | |
| image_path=image_path, | |
| summary=summary, | |
| wall_s=t1 - t0, | |
| prompt_tokens=pt, | |
| completion_tokens=ct, | |
| total_tokens=tt, | |
| ) | |
| except Exception as ex: | |
| t1 = time.perf_counter() | |
| return OneResult( | |
| image_path=image_path, | |
| summary="", | |
| wall_s=t1 - t0, | |
| prompt_tokens=None, | |
| completion_tokens=None, | |
| total_tokens=None, | |
| error=str(ex), | |
| ) | |
| def parse_args(argv: Sequence[str]) -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description="Send images or videos to an OpenAI-compatible /v1/chat/completions API and print summaries." | |
| ) | |
| p.add_argument( | |
| "images", | |
| nargs="*", | |
| help="Image/video file paths or URLs (positional). You can also pass repeated -i/--image.", | |
| ) | |
| p.add_argument( | |
| "-i", | |
| "--image", | |
| action="append", | |
| default=[], | |
| help="Image/video file path or URL (repeatable). Supports local files and http(s) URLs for videos.", | |
| ) | |
| p.add_argument("--url", default=DEFAULT_URL, help=f"Chat Completions URL (default: {DEFAULT_URL})") | |
| p.add_argument("--model", default=os.getenv("SUMMARIZE_IMAGE_MODEL"), help="Model identifier (or set SUMMARIZE_IMAGE_MODEL).") | |
| p.add_argument("--api-key", default=os.getenv("SUMMARIZE_IMAGE_API_KEY"), help="API key (or set SUMMARIZE_IMAGE_API_KEY).") | |
| p.add_argument("-S", "--system", default=DEFAULT_SYSTEM, help="System prompt.") | |
| p.add_argument("-p", "--prompt", default=DEFAULT_PROMPT, help="User prompt to apply to each image.") | |
| p.add_argument("--temperature", type=float, default=0.0, help="Sampling temperature.") | |
| p.add_argument("--max-tokens", type=int, default=1000, help="Max completion tokens.") | |
| p.add_argument("--timeout", type=float, default=1500.0, help="Request timeout (seconds).") | |
| p.add_argument("--retries", type=int, default=0, help="Retry count on failure.") | |
| p.add_argument("-s", "--stats", action="store_true", help="Print per-image and average performance metrics to STDERR.") | |
| p.add_argument("-v", "--verbose", action="store_true", help="Print full request/response JSON to STDERR.") | |
| p.add_argument("--fail-fast", action="store_true", help="Exit immediately on first error.") | |
| return p.parse_args(list(argv)) | |
| def main(argv: Sequence[str]) -> int: | |
| args = parse_args(argv) | |
| if not args.model: | |
| eprint("error: --model is required (or set SUMMARIZE_IMAGE_MODEL).") | |
| return 2 | |
| images: List[str] = [] | |
| images.extend(args.images or []) | |
| images.extend(args.image or []) | |
| if not images: | |
| eprint("error: provide at least one image or video path/URL (positional or -i/--image).") | |
| return 2 | |
| # Validate paths early (skip validation for URLs) | |
| missing = [] | |
| for p in images: | |
| if not (p.startswith("http://") or p.startswith("https://")): | |
| if not os.path.isfile(p): | |
| missing.append(p) | |
| if missing: | |
| eprint("error: missing files:") | |
| for p in missing: | |
| eprint(f" - {p}") | |
| return 2 | |
| results: List[OneResult] = [] | |
| start_all = time.perf_counter() | |
| for img in images: | |
| r = run_one( | |
| image_path=img, | |
| url=args.url, | |
| api_key=args.api_key, | |
| model=args.model, | |
| system_prompt=args.system, | |
| user_prompt=args.prompt, | |
| temperature=args.temperature, | |
| max_tokens=args.max_tokens, | |
| timeout_s=args.timeout, | |
| retries=args.retries, | |
| verbose=args.verbose, | |
| ) | |
| results.append(r) | |
| if r.error: | |
| eprint(f"[error] {img}: {r.error}") | |
| if args.fail_fast: | |
| return 1 | |
| continue | |
| # Summaries to STDOUT (clean + pipe-friendly) | |
| print(f"{img}\t{r.summary}", flush=True) | |
| # Optional per-image stats to STDERR | |
| if args.stats: | |
| tps = None | |
| if r.completion_tokens is not None and r.wall_s > 0: | |
| tps = r.completion_tokens / r.wall_s | |
| eprint( | |
| f"[stats] {img}: wall={r.wall_s:.3f}s" | |
| + (f", prompt_tokens={r.prompt_tokens}" if r.prompt_tokens is not None else "") | |
| + (f", completion_tokens={r.completion_tokens}" if r.completion_tokens is not None else "") | |
| + (f", total_tokens={r.total_tokens}" if r.total_tokens is not None else "") | |
| + (f", completion_tok/s={tps:.2f}" if tps is not None else "") | |
| ) | |
| end_all = time.perf_counter() | |
| ok = [r for r in results if not r.error] | |
| failed = [r for r in results if r.error] | |
| if args.stats and ok: | |
| avg_wall = sum(r.wall_s for r in ok) / len(ok) | |
| avg_pt = (sum(r.prompt_tokens for r in ok if r.prompt_tokens is not None) / sum(1 for r in ok if r.prompt_tokens is not None)) if any(r.prompt_tokens is not None for r in ok) else None | |
| avg_ct = (sum(r.completion_tokens for r in ok if r.completion_tokens is not None) / sum(1 for r in ok if r.completion_tokens is not None)) if any(r.completion_tokens is not None for r in ok) else None | |
| avg_tt = (sum(r.total_tokens for r in ok if r.total_tokens is not None) / sum(1 for r in ok if r.total_tokens is not None)) if any(r.total_tokens is not None for r in ok) else None | |
| total_elapsed = end_all - start_all | |
| rps = (len(ok) / total_elapsed) if total_elapsed > 0 else None | |
| eprint("[stats] ---- Averages (successful requests) ----") | |
| eprint(f"[stats] count_ok={len(ok)}, count_failed={len(failed)}") | |
| eprint(f"[stats] avg_wall_s={avg_wall:.3f}") | |
| if avg_pt is not None: | |
| eprint(f"[stats] avg_prompt_tokens={avg_pt:.1f}") | |
| if avg_ct is not None: | |
| eprint(f"[stats] avg_completion_tokens={avg_ct:.1f}") | |
| if avg_tt is not None: | |
| eprint(f"[stats] avg_total_tokens={avg_tt:.1f}") | |
| if rps is not None: | |
| eprint(f"[stats] overall_req_per_s={rps:.3f}") | |
| return 0 if not failed else 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment