Skip to content

Instantly share code, notes, and snippets.

@Vovanda
Last active July 5, 2025 23:49
Show Gist options
  • Select an option

  • Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.

Select an option

Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.
Streaming proxy with chunk filtering for Open WebUI and Zed IDE — resolves invalid chunk errors from custom models and enables function calling emulation

Open Web UI Stream Chunk Filter for Zed IDE

This adapter script resolves the following error in Zed IDE:

data did not match any variant of untagged enum ResponseStreamResult

which occurs when using a custom Open Web UI model (pipe functions). Open Web UI sends invalid chunks in the stream, causing parsing exceptions.


What the adapter does

  • Proxies requests to Open Web UI
  • Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses
  • Supports both streaming and regular JSON responses
  • Allows overriding system messages in requests (optional)

Usage

  1. Clone or download the script
  2. Set environment variables:
    • OPENWEBUI_URL — URL of your Open Web UI server
    • OPENWEBUI_API_KEY — API key for access
    • (optional) ZED_SYSTEM_PROMPT_FILE — path to a file with a system prompt
    • (optional) ZED_SYSTEM_PROMPT_MODE — mode for handling system messages, one of:
      • default (leave system messages unchanged)
      • replace (replace all system messages with the one from the file)
      • disable (remove all system messages)
  3. Install dependencies:
pip install fastapi httpx uvicorn python-dotenv
  1. Run the server:
python openwebui_filter_proxy.py
  1. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI

Important notes

  • The invalid chunk issue only occurs when using Open Web UI custom models (pipe functions)
  • No such problems occur when using models via OpenRouter
  • This script is intended as a temporary workaround for smooth operation with Zed IDE

If you have any questions, feel free to reach out.


Author: Savkin Vladimir Date: 2025


Фильтр невалидных чанков Open Web UI для Zed IDE

Этот скрипт-адаптер решает проблему с ошибкой в Zed IDE:

data did not match any variant of untagged enum ResponseStreamResult

Возникающую при использовании кастомной модели Open Web UI (pipe functions), когда Open Web UI отправляет невалидные чанки в stream, что вызывает исключения при парсинге.


Что делает адаптер

  • Проксирует запросы к Open Web UI
  • Фильтрует и удаляет невалидные чанки из stream, позволяя Zed IDE корректно принимать ответы
  • Поддерживает как streaming, так и обычные JSON-ответы
  • Позволяет переопределять системные сообщения в запросах (опционально)

Использование

  1. Клонируйте или скачайте скрипт
  2. Задайте переменные окружения:
    • OPENWEBUI_URL — URL вашего Open Web UI сервера
    • OPENWEBUI_API_KEY — API ключ для доступа
    • (опционально) ZED_SYSTEM_PROMPT_FILE — путь к файлу с системным сообщением
    • (опционально) ZED_SYSTEM_PROMPT_MODE — режим обработки системных сообщений, одно из:
      • default (оставить системные сообщения без изменений)
      • replace (заменить все системные сообщения на указанное в файле)
      • disable (удалить все системные сообщения)
  3. Установите зависимости:
pip install fastapi httpx uvicorn python-dotenv
  1. Запустите сервер:
python openwebui_filter_proxy.py
  1. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI

Важные замечания

  • Проблема с невалидными чанками возникает только при использовании кастомной модели Open Web UI (pipe functions)
  • При использовании моделей через OpenRouter подобных проблем не наблюдается
  • Скрипт сделан как временное решение для корректной работы с Zed IDE

Если возникнут вопросы — обращайтесь.


Автор: Владимир Савкин
Дата: 2025

import os
import json
import logging
import re
from urllib.parse import urljoin
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
import httpx
from dotenv import load_dotenv
# --- Конфигурация ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
app = FastAPI()
OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "https://chat.sawking.tech")
API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
TIMEOUT = 30.0
ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
#replace или disable, или default
ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()
def override_system_messages(messages: list[dict]) -> list[dict]:
ZED_SYSTEM_PROMPT = None
if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
ZED_SYSTEM_PROMPT = f.read().strip()
if ZED_SYSTEM_PROMPT_MODE == "disable":
# Удалить все system-сообщения, ничего не добавлять
return [m for m in messages if m.get("role") != "system"]
if ZED_SYSTEM_PROMPT_MODE == "replace" and ZED_SYSTEM_PROMPT:
# Удалить все system и вставить один кастомный
messages = [m for m in messages if m.get("role") != "system"]
messages.insert(0, {"role": "system", "content": ZED_SYSTEM_PROMPT})
return messages
# default — не трогаем системный промт
return messages
@app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_all(request: Request, path: str):
if path == "chat/completions":
return await openai_proxy(request)
target_url = urljoin(f"{OPENWEBUI_URL}/", path)
try:
request_body = None
if request.method in ["POST", "PUT"]:
try:
request_body = await request.json()
except json.JSONDecodeError:
request_body = None
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.request(
method=request.method,
url=target_url,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
json=request_body,
params=dict(request.query_params),
)
filtered_headers = {
k: v for k, v in response.headers.items()
if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
}
return JSONResponse(
content=response.json(),
status_code=response.status_code,
headers=filtered_headers,
)
except httpx.ReadTimeout:
logger.error("Таймаут при обращении к Open WebUI")
raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
except Exception as e:
logger.error(f"Ошибка проксирования: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/chat/completions")
async def openai_proxy(request: Request):
body = await request.json()
original_messages = body.get("messages", [])
modified_messages = override_system_messages(original_messages)
if modified_messages != original_messages:
body["messages"] = modified_messages
logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
else:
logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")
logger.info(f"Обработка chat/completions для модели: {body.get('model')}")
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream" if body.get("stream") else "application/json",
}
if body.get("stream"):
return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")
else:
return await get_json_response(body, headers)
async def get_json_response(body: dict, headers: dict):
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.post(f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)
if response.status_code != 200:
detail = response.json().get("detail", "Open WebUI error")
logger.error(f"Open WebUI error: {detail}")
raise HTTPException(status_code=response.status_code, detail=detail)
data = response.json()
return {
"id": data.get("id"),
"object": "chat.completion",
"created": data.get("created"),
"choices": [{
"index": 0,
"message": data["choices"][0]["message"],
"finish_reason": "stop"
}]
}
async def event_generator(body: dict, headers: dict):
max_log_chunk = 200
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
if response.status_code != 200:
text = await response.aread()
logger.error(f"OpenWebUI error: {text.decode()}")
yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
return
async for line in response.aiter_lines():
try:
if not line.strip():
continue
if line.startswith("data: "):
json_str = line[len("data: "):].strip()
try:
data = json.loads(json_str)
if "sources" in data:
snippet = json_str[:max_log_chunk].replace("\n", " ")
logger.info(f"Пропущен чанк с 'sources': {snippet}...")
continue
except json.JSONDecodeError:
pass
logger.info(line)
yield f"{line}\n"
except Exception as inner_e:
logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
# Не прерываем генератор, чтобы попытаться продолжить стрим
continue
except Exception as e:
logger.error(f"Ошибка стриминга: {e}")
yield f"data: {{\"error\": \"Internal server error\"}}\n\n"
if __name__ == "__main__":
import uvicorn
uvicorn.run("openAI_adapter:app", host="127.0.0.1", port=5000, log_level="info")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment