Skip to content

Instantly share code, notes, and snippets.

@Vovanda
Last active July 5, 2025 23:49
Show Gist options
  • Select an option

  • Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.

Select an option

Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.
Streaming proxy with chunk filtering for Open WebUI and Zed IDE — resolves invalid chunk errors from custom models and enables function calling emulation

Open Web UI Stream Chunk Filter for Zed IDE

This adapter script resolves the following error in Zed IDE:

data did not match any variant of untagged enum ResponseStreamResult

which occurs when using a custom Open Web UI model (pipe functions). Open Web UI sends invalid chunks in the stream, causing parsing exceptions.


What the adapter does

  • Proxies requests to Open Web UI
  • Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses
  • Supports streaming
  • Allows overriding system messages in requests (optional)
  • Simulates OpenAI-style function calling (tool_calls) using special stream markers controlled by the environment variable EMULATE_TOOLS_CALLING (default: enabled)

Tool Calling Emulation

If your model outputs function definitions inside the following block:

<tools_calling>
{ ... json ... }
</tools_calling>

The adapter will:

  • Detect the block
  • Extract and parse the JSON
  • Convert it into OpenAI-compatible tool_calls chunks
  • Send those chunks to the client as if native function calling were supported

Expected JSON Structure

The JSON inside <tools_calling>...</tools_calling> must have the following structure:

{
  "model": "<string>",
  "functions": [
    {
      "index": <integer>,
      "function_call": {
        "name": "<string>",
        "arguments": "<JSON string>"
      },
      "finish_reason": "function_call" | null
    },
    ...
  ]
}

Notes:

  • The arguments field must be a valid stringified JSON object
  • The outer structure must contain a functions list
  • Each entry represents one tool call

Usage

  1. Clone or download the script
  2. Set environment variables:
    • OPENWEBUI_URL — URL of your Open Web UI server
    • OPENWEBUI_API_KEY — API key for access
    • (optional) ZED_SYSTEM_PROMPT_FILE — path to a file with a system prompt
    • (optional) ZED_SYSTEM_PROMPT_MODE — mode for handling system messages, one of:
      • default (leave system messages unchanged)
      • replace (replace all system messages with the one from the file)
      • disable (remove all system messages)
    • (optional) EMULATE_TOOLS_CALLINGtrue or false to enable/disable tool calling emulation (default: true)
  3. Install dependencies:
pip install fastapi httpx uvicorn python-dotenv
  1. Run the server:
python OpenWEBUI_tool_calling_proxy.py
  1. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI

Important notes

  • The invalid chunk issue only occurs with Open Web UI custom models (pipe functions)
  • Models served via OpenRouter are not affected
  • The tool calling emulation requires output in the <tools_calling>...</tools_calling> format
  • This is a temporary and pragmatic workaround to enable compatibility with OpenAI-like clients

If you have any questions, feel free to reach out.


Author: Savkin Vladimir
Date: 2025

import copy
import os
import json
import logging
import re
import uuid
from urllib.parse import urljoin
from collections import defaultdict, deque
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
import httpx
from dotenv import load_dotenv
# --- Конфигурация ---
LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper()
logging.basicConfig(level=LOG_LEVEL)
logger = logging.getLogger(__name__)
load_dotenv()
app = FastAPI()
OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here")
API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
TIMEOUT = 30.0
ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
#replace или disable, или default
ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()
EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True)
START_MARKER = "<tools_calling>"
END_MARKER = "</tools_calling>"
MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))
def load_system_prompt() -> str | None:
if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
return f.read().strip()
return None
def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]:
if mode == "disable":
return [m for m in messages if m.get("role") != "system"]
if mode == "replace" and custom_prompt:
filtered = [m for m in messages if m.get("role") != "system"]
filtered.insert(0, {"role": "system", "content": custom_prompt})
return filtered
return messages # default — без изменений
def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
if not tools:
return
tools_text = json.dumps(tools, indent=2, ensure_ascii=False)
tools_message = {
"role": "system",
"content": f"Ниже перечислены встроенные инструменты (function calling):\n{tools_text}"
}
for i, msg in enumerate(messages):
if msg.get("role") == "system":
messages.insert(i + 1, tools_message)
break
else:
messages.insert(0, tools_message)
@app.post("/v1/chat/completions")
async def openai_proxy(request: Request):
logger.info(">>> Вызван openai_proxy")
body = await request.json()
# Копируем тело для сравнения
original_body = copy.deepcopy(body)
# Загрузка и применение политики системного промта
system_prompt = load_system_prompt()
body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt)
# Эмуляция function calling — вставка tools в messages
if EMULATE_FUNCTION_CALLING:
tools = body.pop("tools", None)
if tools:
inject_tools_as_prompt(tools, body.get("messages", []))
logger.info("Инструменты встроены в messages, ключ 'tools' удалён")
# Логируем изменения тела запроса
if body != original_body:
logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
else:
logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")
logger.info(f"Обработка chat/completions для модели: {body.get('model')}")
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream" if body.get("stream") else "application/json",
}
event_generator = default_event_generator
if EMULATE_FUNCTION_CALLING:
event_generator = func_calling_event_generator()
return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")
async def default_event_generator(body: dict, headers: dict):
max_log_chunk = 200
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
if response.status_code != 200:
text = await response.aread()
logger.error(f"OpenWebUI error: {text.decode()}")
yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
return
async for line in response.aiter_lines():
try:
if not line.strip():
continue
if line.startswith("data: "):
json_str = line[len("data: "):].strip()
try:
data = json.loads(json_str)
if "sources" in data:
snippet = json_str[:max_log_chunk].replace("\n", " ")
logger.info(f"Пропущен чанк с 'sources': {snippet}...")
continue
except json.JSONDecodeError:
pass
logger.info(line)
yield f"{line}\n"
except Exception as inner_e:
logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
# Не прерываем генератор, чтобы попытаться продолжить стрим
continue
except Exception as e:
logger.error(f"Ошибка стриминга: {e}")
yield f"data: {{\"error\": \"Internal server error\"}}\n\n"
async def func_calling_event_generator(body: dict, headers: dict):
"""Генератор событий для обработки потокового ответа с маркерами JSON."""
text_accumulator = []
try:
async with httpx.AsyncClient(timeout=60) as client:
response = await client.stream(
"POST",
f"{OPENWEBUI_URL}/api/chat/completions",
json=body,
headers=headers
)
async for event in process_response_stream(response):
yield event
except Exception as e:
logger.error(f"Ошибка стриминга: {e}")
yield format_error_event("Internal server error")
async def process_response_stream(response):
"""Обрабатывает потоковый ответ и генерирует события."""
if response.status_code != 200:
error_text = (await response.aread()).decode()
logger.error(f"Ошибка от API: {error_text}")
yield format_error_event(error_text)
return
text_accumulator = []
async for line in response.aiter_lines():
processed = await process_stream_line(line, text_accumulator)
if processed:
yield processed
if processed.endswith("[DONE]\n\n"):
return
async def process_stream_line(line, text_accumulator):
"""Обрабатывает одну строку из потока."""
if not line.startswith("data: "):
return None
try:
data = parse_stream_data(line)
if not data or "sources" in data:
return None
text_accumulator.append(data["content"])
return process_accumulated_text("".join(text_accumulator), data)
except Exception:
return None
def parse_stream_data(line):
"""Парсит данные из строки потока."""
data_part = line[len("data: "):].strip()
if not data_part:
return None
data = json.loads(data_part)
choice = data.get("choices", [{}])[0]
content = choice.get("delta", {}).get("content", "")
return {"data": data, "content": content} if content else None
def process_accumulated_text(text, context_data):
"""Обрабатывает накопленный текст и генерирует события."""
start_pos = text.find(START_MARKER)
end_pos = text.find(END_MARKER)
if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos):
return process_partial_text(text, context_data)
return process_marked_content(text, start_pos, end_pos, context_data)
def process_partial_text(text, context_data):
"""Обрабатывает частичные данные без полных маркеров."""
cutoff = find_safe_cutoff(text, START_MARKER)
safe_part = text[:cutoff]
if safe_part.strip():
yield emit_with_content(context_data["data"], safe_part)
return {"remaining": text[cutoff:]}
def process_marked_content(text, start_pos, end_pos, context_data):
"""Обрабатывает текст между маркерами."""
pre_text = text[:start_pos].strip()
if pre_text:
yield emit_with_content(context_data["data"], pre_text)
json_block = text[start_pos + len(START_MARKER):end_pos]
json_str = extract_json(json_block)
if json_str:
try:
parsed = json.loads(json_str)
if "functions" in parsed:
async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]):
yield chunk
except json.JSONDecodeError:
pass
yield "data: [DONE]\n\n"
def extract_json(block):
"""Извлекает JSON строку из блока."""
start = block.find("{")
end = block.rfind("}")
return block[start:end + 1] if start != -1 and end != -1 else None
def emit_with_content(base_json: dict, content: str) -> str:
base = dict(base_json)
base["choices"][0]["delta"] = {"role": "assistant", "content": content}
return f"data: {json.dumps(base)}\n\n"
async def stream_tool_calls(base_json: dict, functions: list):
"""
Асинхронно отдаёт функции из списка по чанкам в формате stream.
"""
for i, func in enumerate(functions):
fc = func.get("function_call", {})
chunk = {
"id": f"{base_json["id"]}",
"object": "chat.completion.chunk",
"model": f"{base_json["model"]}",
"created": f"{base_json["created"]}",
"choices": [
{
"index": 0,
"delta": {
"tool_calls": [
{
"id": f"call_{i}",
"index": i,
"type": "function",
"function": {
"name": fc.get("name", ""),
"arguments": fc.get("arguments", "")
}
}
]
},
"finish_reason": "tool_calls" if i == len(functions) - 1 else None,
"native_finish_reason": "tool_calls" if i == len(functions) - 1 else None
}
]
}
yield f"data: {json.dumps(chunk)}\n\n"
def format_error_event(message):
"""Форматирует сообщение об ошибке."""
return f'data: {{"error": "{message}"}}\n\n'
def find_safe_cutoff(buffer: str, marker: str) -> int:
"""Находит безопасную позицию для разделения текста с учетом маркера."""
pos = buffer.find(marker)
if pos != -1:
return pos
for i in range(len(marker) - 1, 0, -1):
if buffer.endswith(marker[:i]):
return len(buffer) - i
return len(buffer)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment