Skip to content

Instantly share code, notes, and snippets.

@quinncomendant
Created February 19, 2026 01:54
Show Gist options
  • Select an option

  • Save quinncomendant/b787b205a0c9806e3be9dfe9570e5e70 to your computer and use it in GitHub Desktop.

Select an option

Save quinncomendant/b787b205a0c9806e3be9dfe9570e5e70 to your computer and use it in GitHub Desktop.
Summarize images or videos via chat completions API: `summarize-image.py --model qwen3-vl-2b-instruct-mlx@bf16 --stats /path/to/video.mp4`
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests>=2.31",
# "opencv-python-headless>=4.8",
# ]
# ///
from __future__ import annotations
import argparse
import base64
import json
import mimetypes
import os
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple
import requests
try:
import cv2
except ImportError:
cv2 = None # Will be caught when trying to process video
DEFAULT_URL = "http://localhost:1234/v1/chat/completions"
DEFAULT_SYSTEM = "You are a helpful assistant."
DEFAULT_PROMPT = """Describe the contents of this image or video.
"""
@dataclass
class OneResult:
image_path: str
summary: str
wall_s: float
prompt_tokens: Optional[int]
completion_tokens: Optional[int]
total_tokens: Optional[int]
error: Optional[str] = None
def eprint(*args: object, **kwargs: object) -> None:
print(*args, file=sys.stderr, **kwargs)
def guess_mime(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
if mime:
return mime
# Reasonable default for unknown extensions
return "image/jpeg"
def is_video(path: str) -> bool:
"""Check if file is a video based on mime type or extension."""
mime, _ = mimetypes.guess_type(path)
if mime and mime.startswith("video/"):
return True
# Check extension as fallback
return path.lower().endswith((".mp4", ".webm", ".mov", ".avi", ".mkv"))
def get_video_metadata(path: str) -> Tuple[float, int]:
"""
Get video FPS and calculate frame sampling parameters.
Returns:
(video_fps, video_max_frames): FPS for sampling (e.g., 0.2 = 1 frame per 5 seconds)
and max frames to extract (based on video duration)
"""
if cv2 is None:
raise RuntimeError("opencv-python-headless is required for video processing. "
"Install with: uv pip install opencv-python-headless")
cap = cv2.VideoCapture(path)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video file: {path}")
try:
# Get original video FPS
original_fps = cap.get(cv2.CAP_PROP_FPS)
if original_fps <= 0:
original_fps = 30 # Default fallback
# Get total frame count
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Get video duration in seconds
duration = total_frames / original_fps if original_fps > 0 else 0
# Calculate sampling parameters
# Use 0.2 fps = 1 frame every 5 seconds for reasonable coverage
# This is the sampling rate, not the original video FPS
video_fps = 0.2
# Calculate max frames based on duration (capped at reasonable limits)
# For a 60 second video at 0.2 fps = 12 frames
video_max_frames = min(int(duration * video_fps) + 1, 64) # Cap at 64 frames max
return video_fps, video_max_frames
finally:
cap.release()
def file_to_data_url(path: str) -> str:
mime = guess_mime(path)
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("ascii")
return f"data:{mime};base64,{b64}"
def truncate_data_uris(obj: Any, max_len: int = 50) -> Any:
"""Recursively truncate data: URIs in dicts/lists for verbose output."""
if isinstance(obj, dict):
result = {}
for k, v in obj.items():
if isinstance(v, str) and v.startswith("data:"):
# Truncate data URIs like "data:video/mp4;base64,AAAA…"
parts = v.split(",", 1)
if len(parts) == 2:
prefix = parts[0]
data = parts[1]
if len(data) > max_len:
result[k] = f"{prefix},{data[:max_len]}…"
else:
result[k] = v
else:
result[k] = v
else:
result[k] = truncate_data_uris(v, max_len)
return result
elif isinstance(obj, list):
return [truncate_data_uris(item, max_len) for item in obj]
else:
return obj
def build_payload(
model: str,
system_prompt: str,
user_prompt: str,
media_path: str,
temperature: float,
max_tokens: int,
*,
is_video: bool = False,
is_remote_url: bool = False,
video_fps: Optional[float] = None,
video_max_frames: Optional[int] = None,
) -> Dict[str, Any]:
"""
Build the chat completion payload.
For videos:
- Remote URLs: {"type": "video_url", "video_url": {"url": "https://..."}}
- Local files: {"type": "video", "video": "/absolute/path"}
For images:
- {"type": "image_url", "image_url": {"url": "data:image/..."}}
"""
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
],
},
],
"temperature": temperature,
"max_tokens": max_tokens,
"stream": False,
}
if is_video:
# Video: local file or remote URL
if is_remote_url:
# Remote URL: {"type": "video_url", "video_url": {"url": "https://..."}}
payload["messages"][1]["content"].append({
"type": "video_url",
"video_url": {"url": media_path}
})
else:
# Local file: {"type": "video", "video": "/absolute/path"}
payload["messages"][1]["content"].append({
"type": "video",
"video": media_path
})
# Add video processing parameters at top level
if video_fps is not None:
payload["video_fps"] = video_fps
if video_max_frames is not None:
payload["video_max_frames"] = video_max_frames
else:
# Image: base64 data URL
payload["messages"][1]["content"].append({
"type": "image_url",
"image_url": {"url": media_path}
})
return payload
def extract_summary(resp_json: Dict[str, Any]) -> str:
# OpenAI-style: choices[0].message.content
try:
content = resp_json["choices"][0]["message"]["content"]
except Exception:
# Fallback for unexpected schemas
return json.dumps(resp_json, ensure_ascii=False)
if isinstance(content, str):
return content.strip()
# Some servers may return structured content; best-effort flattening
if isinstance(content, list):
parts: List[str] = []
for part in content:
if isinstance(part, dict) and "text" in part and isinstance(part["text"], str):
parts.append(part["text"])
if parts:
return "\n".join(s.strip() for s in parts if s.strip())
return str(content).strip()
def extract_usage(resp_json: Dict[str, Any]) -> Tuple[Optional[int], Optional[int], Optional[int]]:
usage = resp_json.get("usage")
if not isinstance(usage, dict):
return None, None, None
pt = usage.get("prompt_tokens")
ct = usage.get("completion_tokens")
tt = usage.get("total_tokens")
return (
int(pt) if isinstance(pt, int) else None,
int(ct) if isinstance(ct, int) else None,
int(tt) if isinstance(tt, int) else None,
)
def post_with_retries(
url: str,
headers: Dict[str, str],
payload: Dict[str, Any],
timeout_s: float,
retries: int,
) -> Dict[str, Any]:
last_err: Optional[Exception] = None
for attempt in range(retries + 1):
try:
r = requests.post(url, headers=headers, json=payload, timeout=timeout_s)
# If API returns JSON error bodies, surface them
if r.status_code >= 400:
try:
j = r.json()
raise RuntimeError(f"HTTP {r.status_code}: {json.dumps(j, ensure_ascii=False)}")
except ValueError:
raise RuntimeError(f"HTTP {r.status_code}: {r.text[:500]}")
return r.json()
except Exception as ex:
last_err = ex
# Simple backoff (small, since localhost usually)
if attempt < retries:
time.sleep(0.25 * (attempt + 1))
assert last_err is not None
raise last_err
def run_one(
*,
image_path: str,
url: str,
api_key: Optional[str],
model: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int,
timeout_s: float,
retries: int,
verbose: bool = False,
) -> OneResult:
is_video_file = is_video(image_path)
is_remote_url = image_path.startswith("http://") or image_path.startswith("https://")
# Prepare media path/URL based on type
media_path: str
video_fps: Optional[float] = None
video_max_frames: Optional[int] = None
if is_video_file:
if is_remote_url:
# Remote video URL: use the URL directly
media_path = image_path
# Can't easily get metadata for remote URLs without downloading
# Use reasonable defaults
video_fps = 0.2
video_max_frames = 16
else:
# Local video file: use absolute path (not base64)
media_path = os.path.abspath(image_path)
try:
video_fps, video_max_frames = get_video_metadata(image_path)
except Exception as e:
eprint(f"[warning] Failed to get video metadata for {image_path}: {e}")
# Use defaults as fallback
video_fps = 0.2
video_max_frames = 16
else:
# Image: use base64 data URL
media_path = file_to_data_url(image_path)
payload = build_payload(
model=model,
system_prompt=system_prompt,
user_prompt=user_prompt,
media_path=media_path,
temperature=temperature,
max_tokens=max_tokens,
is_video=is_video_file,
is_remote_url=is_remote_url,
video_fps=video_fps,
video_max_frames=video_max_frames,
)
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# Verbose: print request
if verbose:
eprint(f"[verbose] Request URL: {url}")
eprint(f"[verbose] Request payload:")
eprint(json.dumps(truncate_data_uris(payload), indent=2, ensure_ascii=False))
eprint(f"[verbose] Request headers: {json.dumps(headers)}")
t0 = time.perf_counter()
try:
resp_json = post_with_retries(url, headers, payload, timeout_s=timeout_s, retries=retries)
t1 = time.perf_counter()
# Verbose: print response
if verbose:
eprint(f"[verbose] Response JSON:")
eprint(json.dumps(resp_json, indent=2, ensure_ascii=False))
summary = extract_summary(resp_json)
pt, ct, tt = extract_usage(resp_json)
return OneResult(
image_path=image_path,
summary=summary,
wall_s=t1 - t0,
prompt_tokens=pt,
completion_tokens=ct,
total_tokens=tt,
)
except Exception as ex:
t1 = time.perf_counter()
return OneResult(
image_path=image_path,
summary="",
wall_s=t1 - t0,
prompt_tokens=None,
completion_tokens=None,
total_tokens=None,
error=str(ex),
)
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Send images or videos to an OpenAI-compatible /v1/chat/completions API and print summaries."
)
p.add_argument(
"images",
nargs="*",
help="Image/video file paths or URLs (positional). You can also pass repeated -i/--image.",
)
p.add_argument(
"-i",
"--image",
action="append",
default=[],
help="Image/video file path or URL (repeatable). Supports local files and http(s) URLs for videos.",
)
p.add_argument("--url", default=DEFAULT_URL, help=f"Chat Completions URL (default: {DEFAULT_URL})")
p.add_argument("--model", default=os.getenv("SUMMARIZE_IMAGE_MODEL"), help="Model identifier (or set SUMMARIZE_IMAGE_MODEL).")
p.add_argument("--api-key", default=os.getenv("SUMMARIZE_IMAGE_API_KEY"), help="API key (or set SUMMARIZE_IMAGE_API_KEY).")
p.add_argument("-S", "--system", default=DEFAULT_SYSTEM, help="System prompt.")
p.add_argument("-p", "--prompt", default=DEFAULT_PROMPT, help="User prompt to apply to each image.")
p.add_argument("--temperature", type=float, default=0.0, help="Sampling temperature.")
p.add_argument("--max-tokens", type=int, default=1000, help="Max completion tokens.")
p.add_argument("--timeout", type=float, default=1500.0, help="Request timeout (seconds).")
p.add_argument("--retries", type=int, default=0, help="Retry count on failure.")
p.add_argument("-s", "--stats", action="store_true", help="Print per-image and average performance metrics to STDERR.")
p.add_argument("-v", "--verbose", action="store_true", help="Print full request/response JSON to STDERR.")
p.add_argument("--fail-fast", action="store_true", help="Exit immediately on first error.")
return p.parse_args(list(argv))
def main(argv: Sequence[str]) -> int:
args = parse_args(argv)
if not args.model:
eprint("error: --model is required (or set SUMMARIZE_IMAGE_MODEL).")
return 2
images: List[str] = []
images.extend(args.images or [])
images.extend(args.image or [])
if not images:
eprint("error: provide at least one image or video path/URL (positional or -i/--image).")
return 2
# Validate paths early (skip validation for URLs)
missing = []
for p in images:
if not (p.startswith("http://") or p.startswith("https://")):
if not os.path.isfile(p):
missing.append(p)
if missing:
eprint("error: missing files:")
for p in missing:
eprint(f" - {p}")
return 2
results: List[OneResult] = []
start_all = time.perf_counter()
for img in images:
r = run_one(
image_path=img,
url=args.url,
api_key=args.api_key,
model=args.model,
system_prompt=args.system,
user_prompt=args.prompt,
temperature=args.temperature,
max_tokens=args.max_tokens,
timeout_s=args.timeout,
retries=args.retries,
verbose=args.verbose,
)
results.append(r)
if r.error:
eprint(f"[error] {img}: {r.error}")
if args.fail_fast:
return 1
continue
# Summaries to STDOUT (clean + pipe-friendly)
print(f"{img}\t{r.summary}", flush=True)
# Optional per-image stats to STDERR
if args.stats:
tps = None
if r.completion_tokens is not None and r.wall_s > 0:
tps = r.completion_tokens / r.wall_s
eprint(
f"[stats] {img}: wall={r.wall_s:.3f}s"
+ (f", prompt_tokens={r.prompt_tokens}" if r.prompt_tokens is not None else "")
+ (f", completion_tokens={r.completion_tokens}" if r.completion_tokens is not None else "")
+ (f", total_tokens={r.total_tokens}" if r.total_tokens is not None else "")
+ (f", completion_tok/s={tps:.2f}" if tps is not None else "")
)
end_all = time.perf_counter()
ok = [r for r in results if not r.error]
failed = [r for r in results if r.error]
if args.stats and ok:
avg_wall = sum(r.wall_s for r in ok) / len(ok)
avg_pt = (sum(r.prompt_tokens for r in ok if r.prompt_tokens is not None) / sum(1 for r in ok if r.prompt_tokens is not None)) if any(r.prompt_tokens is not None for r in ok) else None
avg_ct = (sum(r.completion_tokens for r in ok if r.completion_tokens is not None) / sum(1 for r in ok if r.completion_tokens is not None)) if any(r.completion_tokens is not None for r in ok) else None
avg_tt = (sum(r.total_tokens for r in ok if r.total_tokens is not None) / sum(1 for r in ok if r.total_tokens is not None)) if any(r.total_tokens is not None for r in ok) else None
total_elapsed = end_all - start_all
rps = (len(ok) / total_elapsed) if total_elapsed > 0 else None
eprint("[stats] ---- Averages (successful requests) ----")
eprint(f"[stats] count_ok={len(ok)}, count_failed={len(failed)}")
eprint(f"[stats] avg_wall_s={avg_wall:.3f}")
if avg_pt is not None:
eprint(f"[stats] avg_prompt_tokens={avg_pt:.1f}")
if avg_ct is not None:
eprint(f"[stats] avg_completion_tokens={avg_ct:.1f}")
if avg_tt is not None:
eprint(f"[stats] avg_total_tokens={avg_tt:.1f}")
if rps is not None:
eprint(f"[stats] overall_req_per_s={rps:.3f}")
return 0 if not failed else 1
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment