Skip to content

Instantly share code, notes, and snippets.

@Vovanda
Last active July 5, 2025 23:49
Show Gist options
  • Select an option

  • Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.

Select an option

Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.
Streaming proxy with chunk filtering for Open WebUI and Zed IDE — resolves invalid chunk errors from custom models and enables function calling emulation

Open Web UI Stream Chunk Filter for Zed IDE

This adapter script resolves the following error in Zed IDE:

data did not match any variant of untagged enum ResponseStreamResult

which occurs when using a custom Open Web UI model (pipe functions). Open Web UI sends invalid chunks in the stream, causing parsing exceptions.


What the adapter does

  • Proxies requests to Open Web UI
  • Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses
  • Supports streaming
  • Allows overriding system messages in requests (optional)
  • Simulates OpenAI-style function calling (tool_calls) using special stream markers controlled by the environment variable EMULATE_TOOLS_CALLING (default: enabled)

Tool Calling Emulation

If your model outputs function definitions inside the following block:

<tools_calling>
{ ... json ... }
</tools_calling>

The adapter will:

  • Detect the block
  • Extract and parse the JSON
  • Convert it into OpenAI-compatible tool_calls chunks
  • Send those chunks to the client as if native function calling were supported

Expected JSON Structure

The JSON inside <tools_calling>...</tools_calling> must have the following structure:

{
  "model": "<string>",
  "functions": [
    {
      "index": <integer>,
      "function_call": {
        "name": "<string>",
        "arguments": "<JSON string>"
      },
      "finish_reason": "function_call" | null
    },
    ...
  ]
}

Notes:

  • The arguments field must be a valid stringified JSON object
  • The outer structure must contain a functions list
  • Each entry represents one tool call

Usage

  1. Clone or download the script
  2. Set environment variables:
    • OPENWEBUI_URL — URL of your Open Web UI server
    • OPENWEBUI_API_KEY — API key for access
    • (optional) ZED_SYSTEM_PROMPT_FILE — path to a file with a system prompt
    • (optional) ZED_SYSTEM_PROMPT_MODE — mode for handling system messages, one of:
      • default (leave system messages unchanged)
      • replace (replace all system messages with the one from the file)
      • disable (remove all system messages)
    • (optional) EMULATE_TOOLS_CALLINGtrue or false to enable/disable tool calling emulation (default: true)
  3. Install dependencies:
pip install fastapi httpx uvicorn python-dotenv
  1. Run the server:
python OpenWEBUI_tool_calling_proxy.py
  1. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI

Important notes

  • The invalid chunk issue only occurs with Open Web UI custom models (pipe functions)
  • Models served via OpenRouter are not affected
  • The tool calling emulation requires output in the <tools_calling>...</tools_calling> format
  • This is a temporary and pragmatic workaround to enable compatibility with OpenAI-like clients

If you have any questions, feel free to reach out.


Author: Savkin Vladimir
Date: 2025

import copy
import os
import json
import logging
from urllib.parse import urljoin
import httpx
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from dotenv import load_dotenv
# ========================
# Конфигурация и инициализация
# ========================
load_dotenv()
LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper()
logging.basicConfig(level=LOG_LEVEL)
logger = logging.getLogger(__name__)
app = FastAPI()
OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here")
API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
TIMEOUT = 30.0
ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()
EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True)
START_MARKER = "<tools_calling>"
END_MARKER = "</tools_calling>"
MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))
# ========================
# Загрузка и обработка системного промта
# ========================
def load_system_prompt() -> str | None:
if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
return f.read().strip()
return None
def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]:
if mode == "disable":
return [m for m in messages if m.get("role") != "system"]
if mode == "replace" and custom_prompt:
filtered = [m for m in messages if m.get("role") != "system"]
filtered.insert(0, {"role": "system", "content": custom_prompt})
return filtered
return messages
def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
if not tools:
return
tools_text = json.dumps(tools, indent=2, ensure_ascii=False)
tools_message = {
"role": "system",
"content": f"Ниже перечислены встроенные инструменты (function calling):\n{tools_text}"
}
for i, msg in enumerate(messages):
if msg.get("role") == "system":
messages.insert(i + 1, tools_message)
return
messages.insert(0, tools_message)
# ========================
# Основной endpoint
# ========================
@app.post("/v1/chat/completions")
async def openai_proxy(request: Request):
logger.info(">>> Вызван openai_proxy")
body = await request.json()
original_body = copy.deepcopy(body)
# Системный промт
system_prompt = load_system_prompt()
body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt)
# Интеграция tools в messages
if EMULATE_FUNCTION_CALLING:
tools = body.pop("tools", None)
if tools:
inject_tools_as_prompt(tools, body.get("messages", []))
logger.info("Инструменты встроены в messages, ключ 'tools' удалён")
if body != original_body:
logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
else:
logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")
# Извлекаем Authorization из исходного запроса, если есть
auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}")
headers = {
"Authorization": auth_header,
"Content-Type": "application/json",
"Accept": "text/event-stream" if body.get("stream") else "application/json",
}
generator = func_calling_event_generator if EMULATE_FUNCTION_CALLING else default_event_generator
return StreamingResponse(generator(body, headers), media_type="text/event-stream")
# ========================
# Прокси для всех /v1/* путей
# ========================
@app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_all(request: Request, path: str):
if path == "chat/completions":
return await openai_proxy(request)
target_url = urljoin(f"{OPENWEBUI_URL}/", path)
try:
request_body = None
if request.method in ["POST", "PUT"]:
try:
request_body = await request.json()
except json.JSONDecodeError:
request_body = None
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.request(
method=request.method,
url=target_url,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
json=request_body,
params=dict(request.query_params),
)
filtered_headers = {
k: v for k, v in response.headers.items()
if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
}
return JSONResponse(
content=response.json(),
status_code=response.status_code,
headers=filtered_headers,
)
except httpx.ReadTimeout:
logger.error("Таймаут при обращении к Open WebUI")
raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
except Exception as e:
logger.error(f"Ошибка проксирования: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ========================
# Генераторы событий
# ========================
async def default_event_generator(body: dict, headers: dict):
max_log_chunk = 200
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
if response.status_code != 200:
text = await response.aread()
logger.error(f"OpenWebUI error: {text.decode()}")
yield format_error_event(text.decode())
return
async for line in response.aiter_lines():
if not line.strip():
continue
if line.startswith("data: "):
json_str = line[len("data: "):].strip()
try:
data = json.loads(json_str)
if "sources" in data:
snippet = json_str[:max_log_chunk].replace("\n", " ")
logger.info(f"Пропущен чанк с 'sources': {snippet}...")
continue
except json.JSONDecodeError:
pass
logger.info(line)
yield f"{line}\n"
except Exception as e:
logger.error(f"Ошибка стриминга: {e}")
yield format_error_event("Internal server error")
async def func_calling_event_generator(body: dict, headers: dict):
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)
async for event in process_response_stream(response):
yield event
except Exception as e:
logger.error(f"Ошибка стриминга: {e}")
yield format_error_event("Internal server error")
# ========================
# Обработка стрима
# ========================
async def process_response_stream(response):
if response.status_code != 200:
error_text = (await response.aread()).decode()
logger.error(f"Ошибка от API: {error_text}")
yield format_error_event(error_text)
return
text_accumulator = []
async for line in response.aiter_lines():
processed = await process_stream_line(line, text_accumulator)
if processed:
yield processed
if processed.endswith("[DONE]\n\n"):
return
async def process_stream_line(line, text_accumulator):
if not line.startswith("data: "):
return None
try:
data = parse_stream_data(line)
if not data or "sources" in data:
return None
text_accumulator.append(data["content"])
return process_accumulated_text("".join(text_accumulator), data)
except Exception:
return None
def parse_stream_data(line):
data_part = line[len("data: "):].strip()
if not data_part:
return None
data = json.loads(data_part)
choice = data.get("choices", [{}])[0]
content = choice.get("delta", {}).get("content", "")
return {"data": data, "content": content} if content else None
def process_accumulated_text(text, context_data):
start_pos = text.find(START_MARKER)
end_pos = text.find(END_MARKER)
if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos):
return process_partial_text(text, context_data)
return process_marked_content(text, start_pos, end_pos, context_data)
def process_partial_text(text, context_data):
cutoff = find_safe_cutoff(text, START_MARKER)
safe_part = text[:cutoff]
if safe_part.strip():
yield emit_with_content(context_data["data"], safe_part)
return {"remaining": text[cutoff:]}
def process_marked_content(text, start_pos, end_pos, context_data):
pre_text = text[:start_pos].strip()
if pre_text:
yield emit_with_content(context_data["data"], pre_text)
json_block = text[start_pos + len(START_MARKER):end_pos]
json_str = extract_json(json_block)
if json_str:
try:
parsed = json.loads(json_str)
if "functions" in parsed:
async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]):
logger.info(chunk)
yield chunk + '\n'
except json.JSONDecodeError:
pass
yield "data: [DONE]\n\n"
def extract_json(block):
start = block.find("{")
end = block.rfind("}")
return block[start:end + 1] if start != -1 and end != -1 else None
def emit_with_content(base_json: dict, content: str) -> str:
base = dict(base_json)
base["choices"][0]["delta"] = {"role": "assistant", "content": content}
return f"data: {json.dumps(base)}\n\n"
# ========================
# Потоковый вывод функций
# ========================
async def stream_tool_calls(base_json: dict, functions: list):
for i, func in enumerate(functions):
fc = func.get("function_call", {})
chunk = {
"id": f"{base_json["id"]}",
"object": "chat.completion.chunk",
"model": f"{base_json["model"]}",
"created": f"{base_json["created"]}",
"choices": [
{
"index": 0,
"delta": {
"tool_calls": [
{
"id": f"call_{i}",
"index": i,
"type": "function",
"function": {
"name": fc.get("name", ""),
"arguments": fc.get("arguments", "")
}
}
]
},
"finish_reason": "tool_calls" if i == len(functions) - 1 else None,
"native_finish_reason": "tool_calls" if i == len(functions) - 1 else None
}
]
}
yield f"data: {json.dumps(chunk)}\n"
# ========================
# Утилиты
# ========================
def format_error_event(message):
return f'data: {{"error": "{message}"}}\n\n'
def find_safe_cutoff(buffer: str, marker: str) -> int:
pos = buffer.find(marker)
if pos != -1:
return pos
for i in range(len(marker) - 1, 0, -1):
if buffer.endswith(marker[:i]):
return len(buffer) - i
return len(buffer)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level=f"{LOG_LEVEL.lower()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment