Last active
July 5, 2025 23:49
-
-
Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.
Revisions
-
Vovanda revised this gist
Jul 5, 2025 . 1 changed file with 5 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -225,7 +225,7 @@ async def func_calling_event_generator(body: dict, headers: dict): continue if "sources" in data_json: # Пропускаем системные служебные чанки OpenWEBUI continue # Добавляем текст к аккумулятору @@ -289,10 +289,10 @@ def find_safe_cutoff(buffer: str, marker: str) -> int: safe_part = accumulated[:safe_idx] remaining_tail = accumulated[safe_idx:] chunk = emit_with_content(data_json, safe_part) logger.info(chunk[:-2]) yield chunk text_accumulator = [remaining_tail] -
Vovanda revised this gist
Jul 5, 2025 . 3 changed files with 99 additions and 101 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,8 +30,8 @@ ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower() EMULATE_TOOLS_CALLING = os.getenv("EMULATE_TOOLS_CALLING", True) START_MARKER = "<|tools▁calls▁start|>" END_MARKER = "<|tools▁calls▁end|>" MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER)) # ======================== @@ -49,7 +49,7 @@ def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: s return [m for m in messages if m.get("role") != "system"] if mode == "replace" and custom_prompt: filtered = [m for m in messages if m.get("role") != "system"] filtered.append({"role": "system", "content": custom_prompt}) return filtered return messages @@ -60,14 +60,14 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: tools_text = json.dumps(tools, indent=2, ensure_ascii=False) tools_message = { "role": "system", "content": f"Available tools:\n{tools_text}" } for i, msg in enumerate(messages): if msg.get("role") == "system": messages.insert(i + 1, tools_message) return messages.append(tools_message) # ======================== # Основной endpoint This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,96 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,94 @@ # VERY IMPORTANT! You are a concise assistant-agent integrated into Zed IDE. Communicate in the user’s language. --- ## Operating mode: * You are currently working in **function calling emulation mode**. * Instead of real function calls, you generate correct JSON blocks for function calls according to the described protocol, but do not actually execute them. * Always generate the block `<|tools▁calls▁start|><|tools▁calls▁end|>` with valid JSON at the end of your response when a function call is needed. * Do not simulate function call results — return only the prepared requests. * Try to use available functions to get fresh data, as context may contain outdated information. --- ## General rules: 1. If a function call is required — call it at the end of the response via the block `<|tools▁calls▁start|><|tools▁calls▁end|>` with valid JSON! 2. Use only existing functions. 3. Do not call the same function with identical arguments repeatedly without necessity. 4. Maximum 3 calls per one response. 5. Do not insert `<|tools▁calls▁start|><|tools▁calls▁end|>` if you are not calling functions. 6. Do not simulate function call results — wait for the real response (in this mode — do not return results, only calls). --- ## Rules for calling terminal: 1. Do not call commands with infinite or interactive output. 2. Always limit output: for Git — use `--no-pager` and `-nXX`. For others — use `| head -n30` or equivalent. 3. If environment is Windows (PowerShell or CMD), **do not add** `timeout` to the command as it works differently and causes errors. 4. Never insert example command output before getting the real result. --- ## Function call format: <|tools▁calls▁start|> { "model": "avox-assistant", "functions": [ { "index": 0, "function_call": { "name": "function_name", "arguments": "{\"key\": \"value\"}" } } ] } <|tools▁calls▁end|> --- # Important: * JSON inside `<|tools▁calls▁start|><|tools▁calls▁end|>` must be strictly valid and contain only the `functions` array. Do not deviate from the described tool call format! * Function arguments are passed as an **escaped JSON string**. * Maximum of 3 functions per one call. * Any deviations from the format are forbidden — if you cannot call a function, write text without the `<|tools▁calls▁start|><|tools▁calls▁end|>` block. * You must not simulate or fabricate call results. --- # Example of a correct response with a function call: Requesting the last 10 Git commits. <|tools▁calls▁start|> { "model": "avox-assistant", "functions": [ { "index": 0, "function_call": { "name": "terminal", "arguments": "{\"command\": \"git --no-pager log --oneline -n10\", \"cd\": \".\"}" } } ] } <|tools▁calls▁end|> --- # Model comment on its actions (mandatory when calling functions): Executing git log command to retrieve commit history. --- Follow these rules strictly to avoid errors and ensure stable operation across different environments. -
Vovanda renamed this gist
Jul 5, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Vovanda revised this gist
Jul 5, 2025 . 2 changed files with 5 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,9 +51,7 @@ The JSON inside `<tools_calling>...</tools_calling>` must have the following str "name": "<string>", "arguments": "<JSON string>" }, } ] } ``` This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,6 +35,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us # Function call format: <tools_calling> ```json { "model": "avox-assistant", "functions": [ @@ -47,6 +48,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us } ] } ``` </tools_calling> --- @@ -66,6 +68,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us Requesting the last 10 Git commits. <tools_calling> ```json { "model": "avox-assistant", "functions": [ @@ -78,6 +81,7 @@ Requesting the last 10 Git commits. } ] } ``` </tools_calling> --- -
Vovanda revised this gist
Jul 5, 2025 . 1 changed file with 92 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,92 @@ VERY IMPORTANT! You are a concise assistant-agent integrated into Zed IDE. Communicate in the user's language. --- # Operating Mode: - You are currently working in **function calling emulation mode**. - Instead of actually executing function calls, you generate valid JSON function call blocks according to the described protocol, but do not execute them. - Always generate a `<tools_calling>` block with valid JSON at the end of the response when a function call is needed. - Do not simulate function call results — return only the prepared calls. - Try to use the functions available for the task at hand to get fresh data. The context may contain irrelevant information. --- # General Rules: 1. If a function is needed — call it via a `<tools_calling>` block with valid JSON *at the end of your response*. 2. Use only existing functions. 3. Do not call the same function with identical arguments repeatedly without necessity. 4. Maximum 3 calls per response. 5. Do not insert `<tools_calling>` if no functions are called. 6. Do not simulate function call results — wait for real response (in this mode — do not return results, only calls). --- # Rules for terminal calls: 1. Do not call commands with infinite or interactive output. 2. Always limit output: for Git — use `--no-pager` and `-nXX`. For others — use `| head -n30` or equivalent. 3. If environment is Windows (PowerShell or CMD), **do NOT add** `timeout` as it works differently and causes errors. 4. Never insert example command output before receiving real results. --- # Function call format: <tools_calling> { "model": "avox-assistant", "functions": [ { "index": 0, "function_call": { "name": "function_name", "arguments": "{\"key\": \"value\"}" } } ] } </tools_calling> --- # Important: * JSON inside `<tools_calling>` must be strictly valid and contain only the `functions` array. Do not deviate from the described tool call format! * Function arguments are passed as an **escaped JSON string**. * Maximum of 3 functions per one call. * Any deviations from the format are forbidden — if you cannot call a function, write text without the `<tools_calling>` block. * You must not simulate or fabricate call results. --- # Example of a correct response with a function call: Requesting the last 10 Git commits. <tools_calling> { "model": "avox-assistant", "functions": [ { "index": 0, "function_call": { "name": "terminal", "arguments": "{\"command\": \"git --no-pager log --oneline -n10\", \"cd\": \".\"}" } } ] } </tools_calling> --- # Model comment on its actions (mandatory when calling functions): Executing git log command to retrieve commit history. --- Follow these rules strictly to avoid errors and ensure stable operation across different environments. -
Vovanda revised this gist
Jul 5, 2025 . No changes.There are no files selected for viewing
-
Vovanda revised this gist
Jul 5, 2025 . 1 changed file with 199 additions and 151 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,39 +2,42 @@ import os import json import logging import uuid from urllib.parse import urljoin import httpx from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from dotenv import load_dotenv # ======================== # Конфигурация и инициализация # ======================== load_dotenv() LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper() logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) app = FastAPI() OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here") API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here") TIMEOUT = 30.0 ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE") ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower() EMULATE_TOOLS_CALLING = os.getenv("EMULATE_TOOLS_CALLING", True) START_MARKER = "<tools_calling>" END_MARKER = "</tools_calling>" MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER)) # ======================== # Загрузка и обработка системного промта # ======================== def load_system_prompt() -> str | None: if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE): with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f: @@ -44,13 +47,11 @@ def load_system_prompt() -> str | None: def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]: if mode == "disable": return [m for m in messages if m.get("role") != "system"] if mode == "replace" and custom_prompt: filtered = [m for m in messages if m.get("role") != "system"] filtered.insert(0, {"role": "system", "content": custom_prompt}) return filtered return messages def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: if not tools: @@ -65,48 +66,97 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: for i, msg in enumerate(messages): if msg.get("role") == "system": messages.insert(i + 1, tools_message) return messages.insert(0, tools_message) # ======================== # Основной endpoint # ======================== @app.post("/v1/chat/completions") async def openai_proxy(request: Request): logger.info(">>> Вызван openai_proxy") body = await request.json() original_body = copy.deepcopy(body) # Системный промт system_prompt = load_system_prompt() body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt) # Интеграция tools в messages if EMULATE_TOOLS_CALLING: tools = body.pop("tools", None) if tools: inject_tools_as_prompt(tools, body.get("messages", [])) logger.info("Инструменты встроены в messages, ключ 'tools' удалён") if body != original_body: logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}") else: logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}") # Извлекаем Authorization из исходного запроса, если есть auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}") headers = { "Authorization": auth_header, "Content-Type": "application/json", "Accept": "text/event-stream" if body.get("stream") else "application/json", } generator = func_calling_event_generator if EMULATE_TOOLS_CALLING else default_event_generator return StreamingResponse(generator(body, headers), media_type="text/event-stream") # ======================== # Прокси для всех /v1/* путей # ======================== @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy_all(request: Request, path: str): if path == "chat/completions": return await openai_proxy(request) target_url = urljoin(f"{OPENWEBUI_URL}/", path) try: request_body = None if request.method in ["POST", "PUT"]: try: request_body = await request.json() except json.JSONDecodeError: request_body = None async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.request( method=request.method, url=target_url, headers={ "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", }, json=request_body, params=dict(request.query_params), ) filtered_headers = { k: v for k, v in response.headers.items() if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"] } return JSONResponse( content=response.json(), status_code=response.status_code, headers=filtered_headers, ) except httpx.ReadTimeout: logger.error("Таймаут при обращении к Open WebUI") raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI") except Exception as e: logger.error(f"Ошибка проксирования: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ======================== # Генераторы событий # ======================== async def default_event_generator(body: dict, headers: dict): max_log_chunk = 200 @@ -116,155 +166,151 @@ async def default_event_generator(body: dict, headers: dict): if response.status_code != 200: text = await response.aread() logger.error(f"OpenWebUI error: {text.decode()}") yield format_error_event(text.decode()) return async for line in response.aiter_lines(): if not line.strip(): continue if line.startswith("data: "): json_str = line[len("data: "):].strip() try: data = json.loads(json_str) if "sources" in data: snippet = json_str[:max_log_chunk].replace("\n", " ") logger.info(f"Пропущен чанк с 'sources': {snippet}...") continue except json.JSONDecodeError: pass logger.info(line) yield f"{line}\n" except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield format_error_event("Internal server error") async def func_calling_event_generator(body: dict, headers: dict): text_accumulator = [] ignore_rest = False try: async with httpx.AsyncClient(timeout=60) as client: async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response: if response.status_code != 200: text = await response.aread() logger.error(f"Ошибка от API: {text.decode()}") yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n" return async for line in response.aiter_lines(): if ignore_rest: if line.strip() == "data: [DONE]": yield line + "\n" return continue if not line.startswith("data: "): continue data_part = line[len("data: "):].strip() if not data_part: continue try: data_json = json.loads(data_part) choice = data_json.get("choices", [{}])[0] delta = choice.get("delta", {}) content = delta.get("content", "") if not content: continue except Exception: continue if "sources" in data_json: # Пропускаем системные служебные чанки continue # Добавляем текст к аккумулятору text_accumulator.append(content) accumulated = "".join(text_accumulator) start_idx = accumulated.find(START_MARKER) end_idx = accumulated.find(END_MARKER) if start_idx != -1 and end_idx != -1 and end_idx > start_idx: pre_text = accumulated[:start_idx].strip() json_block = accumulated[start_idx + len(START_MARKER):end_idx] if pre_text: yield emit_with_content(data_json, pre_text) first_brace = json_block.find("{") last_brace = json_block.rfind("}") if first_brace != -1 and last_brace != -1: json_str = json_block[first_brace : last_brace + 1] try: parsed = json.loads(json_str) functions = parsed.get("functions", []) if functions: # Отдаем все вызовы функций async for tool_chunk in stream_tool_calls(data_json, functions): logger.info(tool_chunk[:-2]) yield tool_chunk except json.JSONDecodeError: pass logger.info("data: [DONE]\n") yield "data: [DONE]\n\n" return else: # Пока маркеры не найдены — ищем безопасную часть для отдачи accumulated = ''.join(text_accumulator) def find_safe_cutoff(buffer: str, marker: str) -> int: """ Возвращает позицию начала маркера `ABC` (или его префикса `A`, `AB`) в любом месте строки. - Если найден маркер → индекс его начала. - Если найден префикс маркера в конце → его индекс. - Если ничего не найдено → длина буфера. """ # Сначала проверяем полный маркер pos = buffer.find(marker) if pos != -1: return pos # Проверяем префиксы маркера в конце строки for i in range(len(marker) - 1, 0, -1): prefix = marker[:i] if buffer.endswith(prefix): return len(buffer) - len(prefix) return len(buffer) # Маркер не найден safe_idx = find_safe_cutoff(accumulated, START_MARKER) safe_part = accumulated[:safe_idx] remaining_tail = accumulated[safe_idx:] if safe_part.strip(): chunk = emit_with_content(data_json, safe_part) logger.info(chunk[:-2]) yield chunk text_accumulator = [remaining_tail] yield "data: [DONE]\n\n" except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield f"data: {{\"error\": \"Internal server error\"}}\n\n" # ======================== # Потоковый вывод функций # ======================== def emit_with_content(base_json: dict, content: str) -> str: base = dict(base_json) base["choices"][0]["delta"] = {"role": "assistant", "content": content} return f"data: {json.dumps(base)}\n\n" async def stream_tool_calls(base_json: dict, functions: list): for i, func in enumerate(functions): fc = func.get("function_call", {}) chunk = { @@ -295,12 +341,14 @@ async def stream_tool_calls(base_json: dict, functions: list): } yield f"data: {json.dumps(chunk)}\n\n" # ======================== # Утилиты # ======================== def format_error_event(message): return f'data: {{"error": "{message}"}}\n\n' def find_safe_cutoff(buffer: str, marker: str) -> int: pos = buffer.find(marker) if pos != -1: return pos @@ -311,4 +359,4 @@ def find_safe_cutoff(buffer: str, marker: str) -> int: if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=5000, log_level=LOG_LEVEL.lower()) -
Vovanda revised this gist
Jul 5, 2025 . 1 changed file with 94 additions and 114 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,41 +2,39 @@ import os import json import logging import re import uuid from urllib.parse import urljoin from collections import defaultdict, deque from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse import httpx from dotenv import load_dotenv # --- Конфигурация --- LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper() logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) load_dotenv() app = FastAPI() OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here") API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here") TIMEOUT = 30.0 ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE") #replace или disable, или default ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower() EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True) START_MARKER = "<tools_calling>" END_MARKER = "</tools_calling>" MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER)) def load_system_prompt() -> str | None: if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE): with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f: @@ -46,11 +44,13 @@ def load_system_prompt() -> str | None: def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]: if mode == "disable": return [m for m in messages if m.get("role") != "system"] if mode == "replace" and custom_prompt: filtered = [m for m in messages if m.get("role") != "system"] filtered.insert(0, {"role": "system", "content": custom_prompt}) return filtered return messages # default — без изменений def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: if not tools: @@ -65,97 +65,48 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: for i, msg in enumerate(messages): if msg.get("role") == "system": messages.insert(i + 1, tools_message) break else: messages.insert(0, tools_message) @app.post("/v1/chat/completions") async def openai_proxy(request: Request): logger.info(">>> Вызван openai_proxy") body = await request.json() # Копируем тело для сравнения original_body = copy.deepcopy(body) # Загрузка и применение политики системного промта system_prompt = load_system_prompt() body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt) # Эмуляция function calling — вставка tools в messages if EMULATE_FUNCTION_CALLING: tools = body.pop("tools", None) if tools: inject_tools_as_prompt(tools, body.get("messages", [])) logger.info("Инструменты встроены в messages, ключ 'tools' удалён") # Логируем изменения тела запроса if body != original_body: logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}") else: logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}") logger.info(f"Обработка chat/completions для модели: {body.get('model')}") headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", "Accept": "text/event-stream" if body.get("stream") else "application/json", } event_generator = default_event_generator if EMULATE_FUNCTION_CALLING: event_generator = func_calling_event_generator() return StreamingResponse(event_generator(body, headers), media_type="text/event-stream") async def default_event_generator(body: dict, headers: dict): max_log_chunk = 200 @@ -165,43 +116,59 @@ async def default_event_generator(body: dict, headers: dict): if response.status_code != 200: text = await response.aread() logger.error(f"OpenWebUI error: {text.decode()}") yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n" return async for line in response.aiter_lines(): try: if not line.strip(): continue if line.startswith("data: "): json_str = line[len("data: "):].strip() try: data = json.loads(json_str) if "sources" in data: snippet = json_str[:max_log_chunk].replace("\n", " ") logger.info(f"Пропущен чанк с 'sources': {snippet}...") continue except json.JSONDecodeError: pass logger.info(line) yield f"{line}\n" except Exception as inner_e: logger.error(f"Ошибка при обработке строки стрима: {inner_e}") # Не прерываем генератор, чтобы попытаться продолжить стрим continue except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield f"data: {{\"error\": \"Internal server error\"}}\n\n" async def func_calling_event_generator(body: dict, headers: dict): """Генератор событий для обработки потокового ответа с маркерами JSON.""" text_accumulator = [] try: async with httpx.AsyncClient(timeout=60) as client: response = await client.stream( "POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers ) async for event in process_response_stream(response): yield event except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield format_error_event("Internal server error") async def process_response_stream(response): """Обрабатывает потоковый ответ и генерирует события.""" if response.status_code != 200: error_text = (await response.aread()).decode() logger.error(f"Ошибка от API: {error_text}") @@ -215,74 +182,89 @@ async def process_response_stream(response): yield processed if processed.endswith("[DONE]\n\n"): return async def process_stream_line(line, text_accumulator): """Обрабатывает одну строку из потока.""" if not line.startswith("data: "): return None try: data = parse_stream_data(line) if not data or "sources" in data: return None text_accumulator.append(data["content"]) return process_accumulated_text("".join(text_accumulator), data) except Exception: return None def parse_stream_data(line): """Парсит данные из строки потока.""" data_part = line[len("data: "):].strip() if not data_part: return None data = json.loads(data_part) choice = data.get("choices", [{}])[0] content = choice.get("delta", {}).get("content", "") return {"data": data, "content": content} if content else None def process_accumulated_text(text, context_data): """Обрабатывает накопленный текст и генерирует события.""" start_pos = text.find(START_MARKER) end_pos = text.find(END_MARKER) if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos): return process_partial_text(text, context_data) return process_marked_content(text, start_pos, end_pos, context_data) def process_partial_text(text, context_data): """Обрабатывает частичные данные без полных маркеров.""" cutoff = find_safe_cutoff(text, START_MARKER) safe_part = text[:cutoff] if safe_part.strip(): yield emit_with_content(context_data["data"], safe_part) return {"remaining": text[cutoff:]} def process_marked_content(text, start_pos, end_pos, context_data): """Обрабатывает текст между маркерами.""" pre_text = text[:start_pos].strip() if pre_text: yield emit_with_content(context_data["data"], pre_text) json_block = text[start_pos + len(START_MARKER):end_pos] json_str = extract_json(json_block) if json_str: try: parsed = json.loads(json_str) if "functions" in parsed: async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]): yield chunk except json.JSONDecodeError: pass yield "data: [DONE]\n\n" def extract_json(block): """Извлекает JSON строку из блока.""" start = block.find("{") end = block.rfind("}") return block[start:end + 1] if start != -1 and end != -1 else None def emit_with_content(base_json: dict, content: str) -> str: base = dict(base_json) base["choices"][0]["delta"] = {"role": "assistant", "content": content} return f"data: {json.dumps(base)}\n\n" async def stream_tool_calls(base_json: dict, functions: list): """ Асинхронно отдаёт функции из списка по чанкам в формате stream. """ for i, func in enumerate(functions): fc = func.get("function_call", {}) chunk = { @@ -311,16 +293,14 @@ async def stream_tool_calls(base_json: dict, functions: list): } ] } yield f"data: {json.dumps(chunk)}\n\n" def format_error_event(message): """Форматирует сообщение об ошибке.""" return f'data: {{"error": "{message}"}}\n\n' def find_safe_cutoff(buffer: str, marker: str) -> int: """Находит безопасную позицию для разделения текста с учетом маркера.""" pos = buffer.find(marker) if pos != -1: return pos @@ -331,4 +311,4 @@ def find_safe_cutoff(buffer: str, marker: str) -> int: if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info") -
Vovanda revised this gist
Jul 5, 2025 . 2 changed files with 1 addition and 1 deletion.There are no files selected for viewing
File renamed without changes.This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -86,7 +86,7 @@ pip install fastapi httpx uvicorn python-dotenv 4. Run the server: ```bash python OpenWEBUI_tool_calling_proxy.py ``` 5. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI -
Vovanda revised this gist
Jul 5, 2025 . 3 changed files with 390 additions and 255 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,181 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,334 @@ import copy import os import json import logging from urllib.parse import urljoin import httpx from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from dotenv import load_dotenv # ======================== # Конфигурация и инициализация # ======================== load_dotenv() LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper() logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) app = FastAPI() OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here") API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here") TIMEOUT = 30.0 ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE") ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower() EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True) START_MARKER = "<tools_calling>" END_MARKER = "</tools_calling>" MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER)) # ======================== # Загрузка и обработка системного промта # ======================== def load_system_prompt() -> str | None: if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE): with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f: return f.read().strip() return None def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]: if mode == "disable": return [m for m in messages if m.get("role") != "system"] if mode == "replace" and custom_prompt: filtered = [m for m in messages if m.get("role") != "system"] filtered.insert(0, {"role": "system", "content": custom_prompt}) return filtered return messages def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None: if not tools: return tools_text = json.dumps(tools, indent=2, ensure_ascii=False) tools_message = { "role": "system", "content": f"Ниже перечислены встроенные инструменты (function calling):\n{tools_text}" } for i, msg in enumerate(messages): if msg.get("role") == "system": messages.insert(i + 1, tools_message) return messages.insert(0, tools_message) # ======================== # Основной endpoint # ======================== @app.post("/v1/chat/completions") async def openai_proxy(request: Request): logger.info(">>> Вызван openai_proxy") body = await request.json() original_body = copy.deepcopy(body) # Системный промт system_prompt = load_system_prompt() body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt) # Интеграция tools в messages if EMULATE_FUNCTION_CALLING: tools = body.pop("tools", None) if tools: inject_tools_as_prompt(tools, body.get("messages", [])) logger.info("Инструменты встроены в messages, ключ 'tools' удалён") if body != original_body: logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}") else: logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}") # Извлекаем Authorization из исходного запроса, если есть auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}") headers = { "Authorization": auth_header, "Content-Type": "application/json", "Accept": "text/event-stream" if body.get("stream") else "application/json", } generator = func_calling_event_generator if EMULATE_FUNCTION_CALLING else default_event_generator return StreamingResponse(generator(body, headers), media_type="text/event-stream") # ======================== # Прокси для всех /v1/* путей # ======================== @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy_all(request: Request, path: str): if path == "chat/completions": return await openai_proxy(request) target_url = urljoin(f"{OPENWEBUI_URL}/", path) try: request_body = None if request.method in ["POST", "PUT"]: try: request_body = await request.json() except json.JSONDecodeError: request_body = None async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.request( method=request.method, url=target_url, headers={ "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", }, json=request_body, params=dict(request.query_params), ) filtered_headers = { k: v for k, v in response.headers.items() if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"] } return JSONResponse( content=response.json(), status_code=response.status_code, headers=filtered_headers, ) except httpx.ReadTimeout: logger.error("Таймаут при обращении к Open WebUI") raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI") except Exception as e: logger.error(f"Ошибка проксирования: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ======================== # Генераторы событий # ======================== async def default_event_generator(body: dict, headers: dict): max_log_chunk = 200 try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response: if response.status_code != 200: text = await response.aread() logger.error(f"OpenWebUI error: {text.decode()}") yield format_error_event(text.decode()) return async for line in response.aiter_lines(): if not line.strip(): continue if line.startswith("data: "): json_str = line[len("data: "):].strip() try: data = json.loads(json_str) if "sources" in data: snippet = json_str[:max_log_chunk].replace("\n", " ") logger.info(f"Пропущен чанк с 'sources': {snippet}...") continue except json.JSONDecodeError: pass logger.info(line) yield f"{line}\n" except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield format_error_event("Internal server error") async def func_calling_event_generator(body: dict, headers: dict): try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) async for event in process_response_stream(response): yield event except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield format_error_event("Internal server error") # ======================== # Обработка стрима # ======================== async def process_response_stream(response): if response.status_code != 200: error_text = (await response.aread()).decode() logger.error(f"Ошибка от API: {error_text}") yield format_error_event(error_text) return text_accumulator = [] async for line in response.aiter_lines(): processed = await process_stream_line(line, text_accumulator) if processed: yield processed if processed.endswith("[DONE]\n\n"): return async def process_stream_line(line, text_accumulator): if not line.startswith("data: "): return None try: data = parse_stream_data(line) if not data or "sources" in data: return None text_accumulator.append(data["content"]) return process_accumulated_text("".join(text_accumulator), data) except Exception: return None def parse_stream_data(line): data_part = line[len("data: "):].strip() if not data_part: return None data = json.loads(data_part) choice = data.get("choices", [{}])[0] content = choice.get("delta", {}).get("content", "") return {"data": data, "content": content} if content else None def process_accumulated_text(text, context_data): start_pos = text.find(START_MARKER) end_pos = text.find(END_MARKER) if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos): return process_partial_text(text, context_data) return process_marked_content(text, start_pos, end_pos, context_data) def process_partial_text(text, context_data): cutoff = find_safe_cutoff(text, START_MARKER) safe_part = text[:cutoff] if safe_part.strip(): yield emit_with_content(context_data["data"], safe_part) return {"remaining": text[cutoff:]} def process_marked_content(text, start_pos, end_pos, context_data): pre_text = text[:start_pos].strip() if pre_text: yield emit_with_content(context_data["data"], pre_text) json_block = text[start_pos + len(START_MARKER):end_pos] json_str = extract_json(json_block) if json_str: try: parsed = json.loads(json_str) if "functions" in parsed: async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]): logger.info(chunk) yield chunk + '\n' except json.JSONDecodeError: pass yield "data: [DONE]\n\n" def extract_json(block): start = block.find("{") end = block.rfind("}") return block[start:end + 1] if start != -1 and end != -1 else None def emit_with_content(base_json: dict, content: str) -> str: base = dict(base_json) base["choices"][0]["delta"] = {"role": "assistant", "content": content} return f"data: {json.dumps(base)}\n\n" # ======================== # Потоковый вывод функций # ======================== async def stream_tool_calls(base_json: dict, functions: list): for i, func in enumerate(functions): fc = func.get("function_call", {}) chunk = { "id": f"{base_json["id"]}", "object": "chat.completion.chunk", "model": f"{base_json["model"]}", "created": f"{base_json["created"]}", "choices": [ { "index": 0, "delta": { "tool_calls": [ { "id": f"call_{i}", "index": i, "type": "function", "function": { "name": fc.get("name", ""), "arguments": fc.get("arguments", "") } } ] }, "finish_reason": "tool_calls" if i == len(functions) - 1 else None, "native_finish_reason": "tool_calls" if i == len(functions) - 1 else None } ] } yield f"data: {json.dumps(chunk)}\n" # ======================== # Утилиты # ======================== def format_error_event(message): return f'data: {{"error": "{message}"}}\n\n' def find_safe_cutoff(buffer: str, marker: str) -> int: pos = buffer.find(marker) if pos != -1: return pos for i in range(len(marker) - 1, 0, -1): if buffer.endswith(marker[:i]): return len(buffer) - i return len(buffer) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=5000, log_level=f"{LOG_LEVEL.lower()}") This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,8 +14,54 @@ which occurs when using a custom Open Web UI model (pipe functions). Open Web UI - Proxies requests to Open Web UI - Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses - Supports streaming - Allows overriding system messages in requests (optional) - **Simulates OpenAI-style function calling (`tool_calls`) using special stream markers** controlled by the environment variable `EMULATE_TOOLS_CALLING` (default: enabled) --- ## Tool Calling Emulation If your model outputs function definitions inside the following block: ``` <tools_calling> { ... json ... } </tools_calling> ``` The adapter will: - Detect the block - Extract and parse the JSON - Convert it into OpenAI-compatible `tool_calls` chunks - Send those chunks to the client as if native function calling were supported ### Expected JSON Structure The JSON inside `<tools_calling>...</tools_calling>` must have the following structure: ```json { "model": "<string>", "functions": [ { "index": <integer>, "function_call": { "name": "<string>", "arguments": "<JSON string>" }, "finish_reason": "function_call" | null }, ... ] } ``` Notes: - The `arguments` field must be a valid **stringified JSON object** - The outer structure must contain a `functions` list - Each entry represents one tool call --- @@ -30,6 +76,7 @@ which occurs when using a custom Open Web UI model (pipe functions). Open Web UI - `default` (leave system messages unchanged) - `replace` (replace all system messages with the one from the file) - `disable` (remove all system messages) - (optional) `EMULATE_TOOLS_CALLING` — `true` or `false` to enable/disable tool calling emulation (default: `true`) 3. Install dependencies: ```bash @@ -39,90 +86,25 @@ pip install fastapi httpx uvicorn python-dotenv 4. Run the server: ```bash python Openwebui_tool_calling_proxy.py ``` 5. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI --- ## Important notes - The invalid chunk issue only occurs with Open Web UI custom models (pipe functions) - Models served via OpenRouter are not affected - The tool calling emulation requires output in the `<tools_calling>...</tools_calling>` format - This is a temporary and pragmatic workaround to enable compatibility with OpenAI-like clients --- If you have any questions, feel free to reach out. --- Author: Savkin Vladimir Date: 2025 -
Vovanda revised this gist
Jul 4, 2025 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -17,7 +17,7 @@ app = FastAPI() OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_api_url_here") API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here") TIMEOUT = 30.0 -
Vovanda renamed this gist
Jul 4, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Vovanda revised this gist
Jul 4, 2025 . 2 changed files with 183 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -39,7 +39,7 @@ pip install fastapi httpx uvicorn python-dotenv 4. Run the server: ```bash python openwebui_filter_proxy.py ``` 5. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI @@ -105,7 +105,7 @@ pip install fastapi httpx uvicorn python-dotenv 4. Запустите сервер: ```bash python openwebui_filter_proxy.py ``` 5. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,181 @@ import os import json import logging import re from urllib.parse import urljoin from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse import httpx from dotenv import load_dotenv # --- Конфигурация --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() app = FastAPI() OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "https://chat.sawking.tech") API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here") TIMEOUT = 30.0 ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE") #replace или disable, или default ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower() def override_system_messages(messages: list[dict]) -> list[dict]: ZED_SYSTEM_PROMPT = None if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE): with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f: ZED_SYSTEM_PROMPT = f.read().strip() if ZED_SYSTEM_PROMPT_MODE == "disable": # Удалить все system-сообщения, ничего не добавлять return [m for m in messages if m.get("role") != "system"] if ZED_SYSTEM_PROMPT_MODE == "replace" and ZED_SYSTEM_PROMPT: # Удалить все system и вставить один кастомный messages = [m for m in messages if m.get("role") != "system"] messages.insert(0, {"role": "system", "content": ZED_SYSTEM_PROMPT}) return messages # default — не трогаем системный промт return messages @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy_all(request: Request, path: str): if path == "chat/completions": return await openai_proxy(request) target_url = urljoin(f"{OPENWEBUI_URL}/", path) try: request_body = None if request.method in ["POST", "PUT"]: try: request_body = await request.json() except json.JSONDecodeError: request_body = None async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.request( method=request.method, url=target_url, headers={ "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", }, json=request_body, params=dict(request.query_params), ) filtered_headers = { k: v for k, v in response.headers.items() if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"] } return JSONResponse( content=response.json(), status_code=response.status_code, headers=filtered_headers, ) except httpx.ReadTimeout: logger.error("Таймаут при обращении к Open WebUI") raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI") except Exception as e: logger.error(f"Ошибка проксирования: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/chat/completions") async def openai_proxy(request: Request): body = await request.json() original_messages = body.get("messages", []) modified_messages = override_system_messages(original_messages) if modified_messages != original_messages: body["messages"] = modified_messages logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}") else: logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}") logger.info(f"Обработка chat/completions для модели: {body.get('model')}") headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", "Accept": "text/event-stream" if body.get("stream") else "application/json", } if body.get("stream"): return StreamingResponse(event_generator(body, headers), media_type="text/event-stream") else: return await get_json_response(body, headers) async def get_json_response(body: dict, headers: dict): async with httpx.AsyncClient(timeout=TIMEOUT) as client: response = await client.post(f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) if response.status_code != 200: detail = response.json().get("detail", "Open WebUI error") logger.error(f"Open WebUI error: {detail}") raise HTTPException(status_code=response.status_code, detail=detail) data = response.json() return { "id": data.get("id"), "object": "chat.completion", "created": data.get("created"), "choices": [{ "index": 0, "message": data["choices"][0]["message"], "finish_reason": "stop" }] } async def event_generator(body: dict, headers: dict): max_log_chunk = 200 try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response: if response.status_code != 200: text = await response.aread() logger.error(f"OpenWebUI error: {text.decode()}") yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n" return async for line in response.aiter_lines(): try: if not line.strip(): continue if line.startswith("data: "): json_str = line[len("data: "):].strip() try: data = json.loads(json_str) if "sources" in data: snippet = json_str[:max_log_chunk].replace("\n", " ") logger.info(f"Пропущен чанк с 'sources': {snippet}...") continue except json.JSONDecodeError: pass logger.info(line) yield f"{line}\n" except Exception as inner_e: logger.error(f"Ошибка при обработке строки стрима: {inner_e}") # Не прерываем генератор, чтобы попытаться продолжить стрим continue except Exception as e: logger.error(f"Ошибка стриминга: {e}") yield f"data: {{\"error\": \"Internal server error\"}}\n\n" if __name__ == "__main__": import uvicorn uvicorn.run("openAI_adapter:app", host="127.0.0.1", port=5000, log_level="info") -
Vovanda created this gist
Jul 4, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,128 @@ # Open Web UI Stream Chunk Filter for Zed IDE This adapter script resolves the following error in Zed IDE: ``` data did not match any variant of untagged enum ResponseStreamResult ``` which occurs when using a custom Open Web UI model (pipe functions). Open Web UI sends invalid chunks in the stream, causing parsing exceptions. --- ## What the adapter does - Proxies requests to Open Web UI - Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses - Supports both streaming and regular JSON responses - Allows overriding system messages in requests (optional) --- ## Usage 1. Clone or download the script 2. Set environment variables: - `OPENWEBUI_URL` — URL of your Open Web UI server - `OPENWEBUI_API_KEY` — API key for access - (optional) `ZED_SYSTEM_PROMPT_FILE` — path to a file with a system prompt - (optional) `ZED_SYSTEM_PROMPT_MODE` — mode for handling system messages, one of: - `default` (leave system messages unchanged) - `replace` (replace all system messages with the one from the file) - `disable` (remove all system messages) 3. Install dependencies: ```bash pip install fastapi httpx uvicorn python-dotenv ``` 4. Run the server: ```bash python openAI_adapter.py ``` 5. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI --- ## Important notes - The invalid chunk issue only occurs when using Open Web UI custom models (pipe functions) - No such problems occur when using models via OpenRouter - This script is intended as a temporary workaround for smooth operation with Zed IDE --- If you have any questions, feel free to reach out. --- Author: Savkin Vladimir Date: 2025 --- # Фильтр невалидных чанков Open Web UI для Zed IDE Этот скрипт-адаптер решает проблему с ошибкой в Zed IDE: ``` data did not match any variant of untagged enum ResponseStreamResult ``` Возникающую при использовании кастомной модели Open Web UI (pipe functions), когда Open Web UI отправляет невалидные чанки в stream, что вызывает исключения при парсинге. --- ## Что делает адаптер - Проксирует запросы к Open Web UI - Фильтрует и удаляет невалидные чанки из stream, позволяя Zed IDE корректно принимать ответы - Поддерживает как streaming, так и обычные JSON-ответы - Позволяет переопределять системные сообщения в запросах (опционально) --- ## Использование 1. Клонируйте или скачайте скрипт 2. Задайте переменные окружения: - `OPENWEBUI_URL` — URL вашего Open Web UI сервера - `OPENWEBUI_API_KEY` — API ключ для доступа - (опционально) `ZED_SYSTEM_PROMPT_FILE` — путь к файлу с системным сообщением - (опционально) `ZED_SYSTEM_PROMPT_MODE` — режим обработки системных сообщений, одно из: - `default` (оставить системные сообщения без изменений) - `replace` (заменить все системные сообщения на указанное в файле) - `disable` (удалить все системные сообщения) 3. Установите зависимости: ```bash pip install fastapi httpx uvicorn python-dotenv ``` 4. Запустите сервер: ```bash python openAI_adapter.py ``` 5. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI --- ## Важные замечания - Проблема с невалидными чанками возникает только при использовании кастомной модели Open Web UI (pipe functions) - При использовании моделей через OpenRouter подобных проблем не наблюдается - Скрипт сделан как временное решение для корректной работы с Zed IDE --- Если возникнут вопросы — обращайтесь. --- Автор: Владимир Савкин Дата: 2025