Skip to content

Instantly share code, notes, and snippets.

@Vovanda
Last active July 5, 2025 23:49
Show Gist options
  • Select an option

  • Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.

Select an option

Save Vovanda/6c0a1d570a0f6d00a8baafefe6ec9f36 to your computer and use it in GitHub Desktop.

Revisions

  1. Vovanda revised this gist Jul 5, 2025. 1 changed file with 5 additions and 5 deletions.
    10 changes: 5 additions & 5 deletions OpenWEBUI_tool_calling_proxy.py
    Original file line number Diff line number Diff line change
    @@ -225,7 +225,7 @@ async def func_calling_event_generator(body: dict, headers: dict):
    continue

    if "sources" in data_json:
    # Пропускаем системные служебные чанки
    # Пропускаем системные служебные чанки OpenWEBUI
    continue

    # Добавляем текст к аккумулятору
    @@ -289,10 +289,10 @@ def find_safe_cutoff(buffer: str, marker: str) -> int:
    safe_part = accumulated[:safe_idx]
    remaining_tail = accumulated[safe_idx:]

    if safe_part.strip():
    chunk = emit_with_content(data_json, safe_part)
    logger.info(chunk[:-2])
    yield chunk

    chunk = emit_with_content(data_json, safe_part)
    logger.info(chunk[:-2])
    yield chunk

    text_accumulator = [remaining_tail]

  2. Vovanda revised this gist Jul 5, 2025. 3 changed files with 99 additions and 101 deletions.
    10 changes: 5 additions & 5 deletions OpenWEBUI_tool_calling_proxy.py
    Original file line number Diff line number Diff line change
    @@ -30,8 +30,8 @@
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()
    EMULATE_TOOLS_CALLING = os.getenv("EMULATE_TOOLS_CALLING", True)

    START_MARKER = "<tools_calling>"
    END_MARKER = "</tools_calling>"
    START_MARKER = "<|tools▁calls▁start|>"
    END_MARKER = "<|tools▁calls▁end|>"
    MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))

    # ========================
    @@ -49,7 +49,7 @@ def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: s
    return [m for m in messages if m.get("role") != "system"]
    if mode == "replace" and custom_prompt:
    filtered = [m for m in messages if m.get("role") != "system"]
    filtered.insert(0, {"role": "system", "content": custom_prompt})
    filtered.append({"role": "system", "content": custom_prompt})
    return filtered
    return messages

    @@ -60,14 +60,14 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    tools_text = json.dumps(tools, indent=2, ensure_ascii=False)
    tools_message = {
    "role": "system",
    "content": f"Ниже перечислены встроенные инструменты (function calling):\n{tools_text}"
    "content": f"Available tools:\n{tools_text}"
    }

    for i, msg in enumerate(messages):
    if msg.get("role") == "system":
    messages.insert(i + 1, tools_message)
    return
    messages.insert(0, tools_message)
    messages.append(tools_message)

    # ========================
    # Основной endpoint
    96 changes: 0 additions & 96 deletions tools_calling_system_promt_en.md
    Original file line number Diff line number Diff line change
    @@ -1,96 +0,0 @@
    VERY IMPORTANT!
    You are a concise assistant-agent integrated into Zed IDE. Communicate in the user's language.

    ---

    # Operating Mode:

    - You are currently working in **function calling emulation mode**.
    - Instead of actually executing function calls, you generate valid JSON function call blocks according to the described protocol, but do not execute them.
    - Always generate a `<tools_calling>` block with valid JSON at the end of the response when a function call is needed.
    - Do not simulate function call results — return only the prepared calls.
    - Try to use the functions available for the task at hand to get fresh data. The context may contain irrelevant information.
    ---

    # General Rules:

    1. If a function is needed — call it via a `<tools_calling>` block with valid JSON *at the end of your response*.
    2. Use only existing functions.
    3. Do not call the same function with identical arguments repeatedly without necessity.
    4. Maximum 3 calls per response.
    5. Do not insert `<tools_calling>` if no functions are called.
    6. Do not simulate function call results — wait for real response (in this mode — do not return results, only calls).

    ---

    # Rules for terminal calls:

    1. Do not call commands with infinite or interactive output.
    2. Always limit output: for Git — use `--no-pager` and `-nXX`. For others — use `| head -n30` or equivalent.
    3. If environment is Windows (PowerShell or CMD), **do NOT add** `timeout` as it works differently and causes errors.
    4. Never insert example command output before receiving real results.

    ---

    # Function call format:

    <tools_calling>
    ```json
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "function_name",
    "arguments": "{\"key\": \"value\"}"
    }
    }
    ]
    }
    ```
    </tools_calling>

    ---

    # Important:

    * JSON inside `<tools_calling>` must be strictly valid and contain only the `functions` array. Do not deviate from the described tool call format!
    * Function arguments are passed as an **escaped JSON string**.
    * Maximum of 3 functions per one call.
    * Any deviations from the format are forbidden — if you cannot call a function, write text without the `<tools_calling>` block.
    * You must not simulate or fabricate call results.

    ---

    # Example of a correct response with a function call:

    Requesting the last 10 Git commits.

    <tools_calling>
    ```json
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "terminal",
    "arguments": "{\"command\": \"git --no-pager log --oneline -n10\", \"cd\": \".\"}"
    }
    }
    ]
    }
    ```
    </tools_calling>

    ---

    # Model comment on its actions (mandatory when calling functions):

    Executing git log command to retrieve commit history.


    ---

    Follow these rules strictly to avoid errors and ensure stable operation across different environments.
    94 changes: 94 additions & 0 deletions tools_calling_system_promt_en.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,94 @@
    # VERY IMPORTANT!

    You are a concise assistant-agent integrated into Zed IDE. Communicate in the user’s language.

    ---

    ## Operating mode:

    * You are currently working in **function calling emulation mode**.
    * Instead of real function calls, you generate correct JSON blocks for function calls according to the described protocol, but do not actually execute them.
    * Always generate the block `<|tools▁calls▁start|><|tools▁calls▁end|>` with valid JSON at the end of your response when a function call is needed.
    * Do not simulate function call results — return only the prepared requests.
    * Try to use available functions to get fresh data, as context may contain outdated information.

    ---

    ## General rules:

    1. If a function call is required — call it at the end of the response via the block `<|tools▁calls▁start|><|tools▁calls▁end|>` with valid JSON!
    2. Use only existing functions.
    3. Do not call the same function with identical arguments repeatedly without necessity.
    4. Maximum 3 calls per one response.
    5. Do not insert `<|tools▁calls▁start|><|tools▁calls▁end|>` if you are not calling functions.
    6. Do not simulate function call results — wait for the real response (in this mode — do not return results, only calls).

    ---

    ## Rules for calling terminal:

    1. Do not call commands with infinite or interactive output.
    2. Always limit output: for Git — use `--no-pager` and `-nXX`. For others — use `| head -n30` or equivalent.
    3. If environment is Windows (PowerShell or CMD), **do not add** `timeout` to the command as it works differently and causes errors.
    4. Never insert example command output before getting the real result.

    ---

    ## Function call format:

    <|tools▁calls▁start|>
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "function_name",
    "arguments": "{\"key\": \"value\"}"
    }
    }
    ]
    }
    <|tools▁calls▁end|>

    ---

    # Important:

    * JSON inside `<|tools▁calls▁start|><|tools▁calls▁end|>` must be strictly valid and contain only the `functions` array. Do not deviate from the described tool call format!
    * Function arguments are passed as an **escaped JSON string**.
    * Maximum of 3 functions per one call.
    * Any deviations from the format are forbidden — if you cannot call a function, write text without the `<|tools▁calls▁start|><|tools▁calls▁end|>` block.
    * You must not simulate or fabricate call results.

    ---

    # Example of a correct response with a function call:

    Requesting the last 10 Git commits.

    <|tools▁calls▁start|>
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "terminal",
    "arguments": "{\"command\": \"git --no-pager log --oneline -n10\", \"cd\": \".\"}"
    }
    }
    ]
    }
    <|tools▁calls▁end|>

    ---

    # Model comment on its actions (mandatory when calling functions):

    Executing git log command to retrieve commit history.


    ---

    Follow these rules strictly to avoid errors and ensure stable operation across different environments.
  3. Vovanda renamed this gist Jul 5, 2025. 1 changed file with 0 additions and 0 deletions.
  4. Vovanda revised this gist Jul 5, 2025. 2 changed files with 5 additions and 3 deletions.
    4 changes: 1 addition & 3 deletions Readme.md
    Original file line number Diff line number Diff line change
    @@ -51,9 +51,7 @@ The JSON inside `<tools_calling>...</tools_calling>` must have the following str
    "name": "<string>",
    "arguments": "<JSON string>"
    },
    "finish_reason": "function_call" | null
    },
    ...
    }
    ]
    }
    ```
    4 changes: 4 additions & 0 deletions tools_calling_en.md
    Original file line number Diff line number Diff line change
    @@ -35,6 +35,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us
    # Function call format:

    <tools_calling>
    ```json
    {
    "model": "avox-assistant",
    "functions": [
    @@ -47,6 +48,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us
    }
    ]
    }
    ```
    </tools_calling>

    ---
    @@ -66,6 +68,7 @@ You are a concise assistant-agent integrated into Zed IDE. Communicate in the us
    Requesting the last 10 Git commits.

    <tools_calling>
    ```json
    {
    "model": "avox-assistant",
    "functions": [
    @@ -78,6 +81,7 @@ Requesting the last 10 Git commits.
    }
    ]
    }
    ```
    </tools_calling>

    ---
  5. Vovanda revised this gist Jul 5, 2025. 1 changed file with 92 additions and 0 deletions.
    92 changes: 92 additions & 0 deletions tools_calling_en.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,92 @@
    VERY IMPORTANT!
    You are a concise assistant-agent integrated into Zed IDE. Communicate in the user's language.

    ---

    # Operating Mode:

    - You are currently working in **function calling emulation mode**.
    - Instead of actually executing function calls, you generate valid JSON function call blocks according to the described protocol, but do not execute them.
    - Always generate a `<tools_calling>` block with valid JSON at the end of the response when a function call is needed.
    - Do not simulate function call results — return only the prepared calls.
    - Try to use the functions available for the task at hand to get fresh data. The context may contain irrelevant information.
    ---

    # General Rules:

    1. If a function is needed — call it via a `<tools_calling>` block with valid JSON *at the end of your response*.
    2. Use only existing functions.
    3. Do not call the same function with identical arguments repeatedly without necessity.
    4. Maximum 3 calls per response.
    5. Do not insert `<tools_calling>` if no functions are called.
    6. Do not simulate function call results — wait for real response (in this mode — do not return results, only calls).

    ---

    # Rules for terminal calls:

    1. Do not call commands with infinite or interactive output.
    2. Always limit output: for Git — use `--no-pager` and `-nXX`. For others — use `| head -n30` or equivalent.
    3. If environment is Windows (PowerShell or CMD), **do NOT add** `timeout` as it works differently and causes errors.
    4. Never insert example command output before receiving real results.

    ---

    # Function call format:

    <tools_calling>
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "function_name",
    "arguments": "{\"key\": \"value\"}"
    }
    }
    ]
    }
    </tools_calling>

    ---

    # Important:

    * JSON inside `<tools_calling>` must be strictly valid and contain only the `functions` array. Do not deviate from the described tool call format!
    * Function arguments are passed as an **escaped JSON string**.
    * Maximum of 3 functions per one call.
    * Any deviations from the format are forbidden — if you cannot call a function, write text without the `<tools_calling>` block.
    * You must not simulate or fabricate call results.

    ---

    # Example of a correct response with a function call:

    Requesting the last 10 Git commits.

    <tools_calling>
    {
    "model": "avox-assistant",
    "functions": [
    {
    "index": 0,
    "function_call": {
    "name": "terminal",
    "arguments": "{\"command\": \"git --no-pager log --oneline -n10\", \"cd\": \".\"}"
    }
    }
    ]
    }
    </tools_calling>

    ---

    # Model comment on its actions (mandatory when calling functions):

    Executing git log command to retrieve commit history.


    ---

    Follow these rules strictly to avoid errors and ensure stable operation across different environments.
  6. Vovanda revised this gist Jul 5, 2025. No changes.
  7. Vovanda revised this gist Jul 5, 2025. 1 changed file with 199 additions and 151 deletions.
    350 changes: 199 additions & 151 deletions OpenWEBUI_tool_calling_proxy.py
    Original file line number Diff line number Diff line change
    @@ -2,39 +2,42 @@
    import os
    import json
    import logging
    import re
    import uuid
    from urllib.parse import urljoin
    from collections import defaultdict, deque

    import httpx
    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import StreamingResponse, JSONResponse
    import httpx
    from dotenv import load_dotenv

    # --- Конфигурация ---
    # ========================
    # Конфигурация и инициализация
    # ========================

    load_dotenv()

    LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper()
    logging.basicConfig(level=LOG_LEVEL)
    logger = logging.getLogger(__name__)

    load_dotenv()

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

    ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
    #replace или disable, или default
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()

    EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True)
    EMULATE_TOOLS_CALLING = os.getenv("EMULATE_TOOLS_CALLING", True)

    START_MARKER = "<tools_calling>"
    END_MARKER = "</tools_calling>"
    MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))

    # ========================
    # Загрузка и обработка системного промта
    # ========================

    def load_system_prompt() -> str | None:
    if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
    with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
    @@ -44,13 +47,11 @@ def load_system_prompt() -> str | None:
    def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]:
    if mode == "disable":
    return [m for m in messages if m.get("role") != "system"]

    if mode == "replace" and custom_prompt:
    filtered = [m for m in messages if m.get("role") != "system"]
    filtered.insert(0, {"role": "system", "content": custom_prompt})
    return filtered

    return messages # default — без изменений
    return messages

    def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    if not tools:
    @@ -65,48 +66,97 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    for i, msg in enumerate(messages):
    if msg.get("role") == "system":
    messages.insert(i + 1, tools_message)
    break
    else:
    messages.insert(0, tools_message)
    return
    messages.insert(0, tools_message)

    # ========================
    # Основной endpoint
    # ========================

    @app.post("/v1/chat/completions")
    async def openai_proxy(request: Request):
    logger.info(">>> Вызван openai_proxy")
    body = await request.json()

    # Копируем тело для сравнения
    original_body = copy.deepcopy(body)

    # Загрузка и применение политики системного промта
    # Системный промт
    system_prompt = load_system_prompt()
    body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt)

    # Эмуляция function calling — вставка tools в messages
    if EMULATE_FUNCTION_CALLING:
    # Интеграция tools в messages
    if EMULATE_TOOLS_CALLING:
    tools = body.pop("tools", None)
    if tools:
    inject_tools_as_prompt(tools, body.get("messages", []))
    logger.info("Инструменты встроены в messages, ключ 'tools' удалён")

    # Логируем изменения тела запроса
    if body != original_body:
    logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
    else:
    logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")

    logger.info(f"Обработка chat/completions для модели: {body.get('model')}")

    # Извлекаем Authorization из исходного запроса, если есть
    auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}")
    headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Authorization": auth_header,
    "Content-Type": "application/json",
    "Accept": "text/event-stream" if body.get("stream") else "application/json",
    }

    event_generator = default_event_generator
    if EMULATE_FUNCTION_CALLING:
    event_generator = func_calling_event_generator()
    generator = func_calling_event_generator if EMULATE_TOOLS_CALLING else default_event_generator
    return StreamingResponse(generator(body, headers), media_type="text/event-stream")

    return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")
    # ========================
    # Прокси для всех /v1/* путей
    # ========================

    @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
    async def proxy_all(request: Request, path: str):
    if path == "chat/completions":
    return await openai_proxy(request)

    target_url = urljoin(f"{OPENWEBUI_URL}/", path)
    try:
    request_body = None
    if request.method in ["POST", "PUT"]:
    try:
    request_body = await request.json()
    except json.JSONDecodeError:
    request_body = None

    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.request(
    method=request.method,
    url=target_url,
    headers={
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    },
    json=request_body,
    params=dict(request.query_params),
    )

    filtered_headers = {
    k: v for k, v in response.headers.items()
    if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
    }

    return JSONResponse(
    content=response.json(),
    status_code=response.status_code,
    headers=filtered_headers,
    )

    except httpx.ReadTimeout:
    logger.error("Таймаут при обращении к Open WebUI")
    raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
    except Exception as e:
    logger.error(f"Ошибка проксирования: {str(e)}")
    raise HTTPException(status_code=500, detail=str(e))

    # ========================
    # Генераторы событий
    # ========================

    async def default_event_generator(body: dict, headers: dict):
    max_log_chunk = 200
    @@ -116,155 +166,151 @@ async def default_event_generator(body: dict, headers: dict):
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"OpenWebUI error: {text.decode()}")
    yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
    yield format_error_event(text.decode())
    return

    async for line in response.aiter_lines():
    try:
    if not line.strip():
    continue

    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass

    logger.info(line)
    yield f"{line}\n"

    except Exception as inner_e:
    logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
    # Не прерываем генератор, чтобы попытаться продолжить стрим
    if not line.strip():
    continue

    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield f"data: {{\"error\": \"Internal server error\"}}\n\n"


    async def func_calling_event_generator(body: dict, headers: dict):
    """Генератор событий для обработки потокового ответа с маркерами JSON."""
    text_accumulator = []
    try:
    async with httpx.AsyncClient(timeout=60) as client:
    response = await client.stream(
    "POST",
    f"{OPENWEBUI_URL}/api/chat/completions",
    json=body,
    headers=headers
    )

    async for event in process_response_stream(response):
    yield event

    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass
    logger.info(line)
    yield f"{line}\n"
    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield format_error_event("Internal server error")

    async def process_response_stream(response):
    """Обрабатывает потоковый ответ и генерирует события."""
    if response.status_code != 200:
    error_text = (await response.aread()).decode()
    logger.error(f"Ошибка от API: {error_text}")
    yield format_error_event(error_text)
    return

    async def func_calling_event_generator(body: dict, headers: dict):
    text_accumulator = []
    async for line in response.aiter_lines():
    processed = await process_stream_line(line, text_accumulator)
    if processed:
    yield processed
    if processed.endswith("[DONE]\n\n"):
    return
    async def process_stream_line(line, text_accumulator):
    """Обрабатывает одну строку из потока."""
    if not line.startswith("data: "):
    return None
    ignore_rest = False

    try:
    data = parse_stream_data(line)
    if not data or "sources" in data:
    return None

    text_accumulator.append(data["content"])
    return process_accumulated_text("".join(text_accumulator), data)

    except Exception:
    return None


    def parse_stream_data(line):
    """Парсит данные из строки потока."""
    data_part = line[len("data: "):].strip()
    if not data_part:
    return None
    async with httpx.AsyncClient(timeout=60) as client:
    async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"Ошибка от API: {text.decode()}")
    yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
    return

    data = json.loads(data_part)
    choice = data.get("choices", [{}])[0]
    content = choice.get("delta", {}).get("content", "")
    return {"data": data, "content": content} if content else None
    async for line in response.aiter_lines():
    if ignore_rest:
    if line.strip() == "data: [DONE]":
    yield line + "\n"
    return
    continue

    if not line.startswith("data: "):
    continue

    def process_accumulated_text(text, context_data):
    """Обрабатывает накопленный текст и генерирует события."""
    start_pos = text.find(START_MARKER)
    end_pos = text.find(END_MARKER)
    data_part = line[len("data: "):].strip()
    if not data_part:
    continue

    if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos):
    return process_partial_text(text, context_data)
    try:
    data_json = json.loads(data_part)
    choice = data_json.get("choices", [{}])[0]
    delta = choice.get("delta", {})
    content = delta.get("content", "")
    if not content:
    continue
    except Exception:
    continue

    return process_marked_content(text, start_pos, end_pos, context_data)
    if "sources" in data_json:
    # Пропускаем системные служебные чанки
    continue

    def process_partial_text(text, context_data):
    """Обрабатывает частичные данные без полных маркеров."""
    cutoff = find_safe_cutoff(text, START_MARKER)
    safe_part = text[:cutoff]
    # Добавляем текст к аккумулятору
    text_accumulator.append(content)
    accumulated = "".join(text_accumulator)

    if safe_part.strip():
    yield emit_with_content(context_data["data"], safe_part)
    start_idx = accumulated.find(START_MARKER)
    end_idx = accumulated.find(END_MARKER)

    return {"remaining": text[cutoff:]}
    def process_marked_content(text, start_pos, end_pos, context_data):
    """Обрабатывает текст между маркерами."""
    pre_text = text[:start_pos].strip()
    if pre_text:
    yield emit_with_content(context_data["data"], pre_text)
    if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
    pre_text = accumulated[:start_idx].strip()
    json_block = accumulated[start_idx + len(START_MARKER):end_idx]

    json_block = text[start_pos + len(START_MARKER):end_pos]
    json_str = extract_json(json_block)
    if pre_text:
    yield emit_with_content(data_json, pre_text)

    if json_str:
    try:
    parsed = json.loads(json_str)
    if "functions" in parsed:
    async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]):
    yield chunk
    except json.JSONDecodeError:
    pass
    first_brace = json_block.find("{")
    last_brace = json_block.rfind("}")

    yield "data: [DONE]\n\n"
    if first_brace != -1 and last_brace != -1:
    json_str = json_block[first_brace : last_brace + 1]
    try:
    parsed = json.loads(json_str)
    functions = parsed.get("functions", [])
    if functions:
    # Отдаем все вызовы функций
    async for tool_chunk in stream_tool_calls(data_json, functions):
    logger.info(tool_chunk[:-2])
    yield tool_chunk
    except json.JSONDecodeError:
    pass
    logger.info("data: [DONE]\n")
    yield "data: [DONE]\n\n"
    return
    else:
    # Пока маркеры не найдены — ищем безопасную часть для отдачи
    accumulated = ''.join(text_accumulator)

    def find_safe_cutoff(buffer: str, marker: str) -> int:
    """
    Возвращает позицию начала маркера `ABC` (или его префикса `A`, `AB`) в любом месте строки.
    - Если найден маркер → индекс его начала.
    - Если найден префикс маркера в конце → его индекс.
    - Если ничего не найдено → длина буфера.
    """
    # Сначала проверяем полный маркер
    pos = buffer.find(marker)
    if pos != -1:
    return pos

    # Проверяем префиксы маркера в конце строки
    for i in range(len(marker) - 1, 0, -1):
    prefix = marker[:i]
    if buffer.endswith(prefix):
    return len(buffer) - len(prefix)

    return len(buffer) # Маркер не найден

    safe_idx = find_safe_cutoff(accumulated, START_MARKER)

    safe_part = accumulated[:safe_idx]
    remaining_tail = accumulated[safe_idx:]

    if safe_part.strip():
    chunk = emit_with_content(data_json, safe_part)
    logger.info(chunk[:-2])
    yield chunk

    text_accumulator = [remaining_tail]

    yield "data: [DONE]\n\n"

    def extract_json(block):
    """Извлекает JSON строку из блока."""
    start = block.find("{")
    end = block.rfind("}")
    return block[start:end + 1] if start != -1 and end != -1 else None
    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield f"data: {{\"error\": \"Internal server error\"}}\n\n"

    # ========================
    # Потоковый вывод функций
    # ========================
    def emit_with_content(base_json: dict, content: str) -> str:
    base = dict(base_json)
    base["choices"][0]["delta"] = {"role": "assistant", "content": content}
    return f"data: {json.dumps(base)}\n\n"

    async def stream_tool_calls(base_json: dict, functions: list):
    """
    Асинхронно отдаёт функции из списка по чанкам в формате stream.
    """
    for i, func in enumerate(functions):
    fc = func.get("function_call", {})
    chunk = {
    @@ -295,12 +341,14 @@ async def stream_tool_calls(base_json: dict, functions: list):
    }
    yield f"data: {json.dumps(chunk)}\n\n"

    # ========================
    # Утилиты
    # ========================

    def format_error_event(message):
    """Форматирует сообщение об ошибке."""
    return f'data: {{"error": "{message}"}}\n\n'

    def find_safe_cutoff(buffer: str, marker: str) -> int:
    """Находит безопасную позицию для разделения текста с учетом маркера."""
    pos = buffer.find(marker)
    if pos != -1:
    return pos
    @@ -311,4 +359,4 @@ def find_safe_cutoff(buffer: str, marker: str) -> int:

    if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level=LOG_LEVEL.lower())
  8. Vovanda revised this gist Jul 5, 2025. 1 changed file with 94 additions and 114 deletions.
    208 changes: 94 additions & 114 deletions OpenWEBUI_tool_calling_proxy.py
    Original file line number Diff line number Diff line change
    @@ -2,41 +2,39 @@
    import os
    import json
    import logging
    import re
    import uuid
    from urllib.parse import urljoin
    from collections import defaultdict, deque

    import httpx
    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import StreamingResponse, JSONResponse
    import httpx
    from dotenv import load_dotenv

    # ========================
    # Конфигурация и инициализация
    # ========================

    load_dotenv()

    # --- Конфигурация ---
    LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper()
    logging.basicConfig(level=LOG_LEVEL)
    logger = logging.getLogger(__name__)

    load_dotenv()

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

    ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
    #replace или disable, или default
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()

    EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True)

    START_MARKER = "<tools_calling>"
    END_MARKER = "</tools_calling>"
    MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))

    # ========================
    # Загрузка и обработка системного промта
    # ========================

    def load_system_prompt() -> str | None:
    if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
    with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
    @@ -46,11 +44,13 @@ def load_system_prompt() -> str | None:
    def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]:
    if mode == "disable":
    return [m for m in messages if m.get("role") != "system"]

    if mode == "replace" and custom_prompt:
    filtered = [m for m in messages if m.get("role") != "system"]
    filtered.insert(0, {"role": "system", "content": custom_prompt})
    return filtered
    return messages

    return messages # default — без изменений

    def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    if not tools:
    @@ -65,97 +65,48 @@ def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    for i, msg in enumerate(messages):
    if msg.get("role") == "system":
    messages.insert(i + 1, tools_message)
    return
    messages.insert(0, tools_message)

    # ========================
    # Основной endpoint
    # ========================
    break
    else:
    messages.insert(0, tools_message)

    @app.post("/v1/chat/completions")
    async def openai_proxy(request: Request):
    logger.info(">>> Вызван openai_proxy")
    body = await request.json()

    # Копируем тело для сравнения
    original_body = copy.deepcopy(body)

    # Системный промт
    # Загрузка и применение политики системного промта
    system_prompt = load_system_prompt()
    body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt)

    # Интеграция tools в messages
    # Эмуляция function calling — вставка tools в messages
    if EMULATE_FUNCTION_CALLING:
    tools = body.pop("tools", None)
    if tools:
    inject_tools_as_prompt(tools, body.get("messages", []))
    logger.info("Инструменты встроены в messages, ключ 'tools' удалён")

    # Логируем изменения тела запроса
    if body != original_body:
    logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
    else:
    logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")

    # Извлекаем Authorization из исходного запроса, если есть
    auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}")
    logger.info(f"Обработка chat/completions для модели: {body.get('model')}")

    headers = {
    "Authorization": auth_header,
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    "Accept": "text/event-stream" if body.get("stream") else "application/json",
    }

    generator = func_calling_event_generator if EMULATE_FUNCTION_CALLING else default_event_generator
    return StreamingResponse(generator(body, headers), media_type="text/event-stream")

    # ========================
    # Прокси для всех /v1/* путей
    # ========================

    @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
    async def proxy_all(request: Request, path: str):
    if path == "chat/completions":
    return await openai_proxy(request)

    target_url = urljoin(f"{OPENWEBUI_URL}/", path)
    try:
    request_body = None
    if request.method in ["POST", "PUT"]:
    try:
    request_body = await request.json()
    except json.JSONDecodeError:
    request_body = None

    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.request(
    method=request.method,
    url=target_url,
    headers={
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    },
    json=request_body,
    params=dict(request.query_params),
    )

    filtered_headers = {
    k: v for k, v in response.headers.items()
    if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
    }

    return JSONResponse(
    content=response.json(),
    status_code=response.status_code,
    headers=filtered_headers,
    )

    except httpx.ReadTimeout:
    logger.error("Таймаут при обращении к Open WebUI")
    raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
    except Exception as e:
    logger.error(f"Ошибка проксирования: {str(e)}")
    raise HTTPException(status_code=500, detail=str(e))
    event_generator = default_event_generator
    if EMULATE_FUNCTION_CALLING:
    event_generator = func_calling_event_generator()

    # ========================
    # Генераторы событий
    # ========================
    return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")

    async def default_event_generator(body: dict, headers: dict):
    max_log_chunk = 200
    @@ -165,43 +116,59 @@ async def default_event_generator(body: dict, headers: dict):
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"OpenWebUI error: {text.decode()}")
    yield format_error_event(text.decode())
    yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
    return

    async for line in response.aiter_lines():
    if not line.strip():
    try:
    if not line.strip():
    continue

    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass

    logger.info(line)
    yield f"{line}\n"

    except Exception as inner_e:
    logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
    # Не прерываем генератор, чтобы попытаться продолжить стрим
    continue
    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass
    logger.info(line)
    yield f"{line}\n"

    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield format_error_event("Internal server error")
    yield f"data: {{\"error\": \"Internal server error\"}}\n\n"


    async def func_calling_event_generator(body: dict, headers: dict):
    """Генератор событий для обработки потокового ответа с маркерами JSON."""
    text_accumulator = []
    try:
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)
    async with httpx.AsyncClient(timeout=60) as client:
    response = await client.stream(
    "POST",
    f"{OPENWEBUI_URL}/api/chat/completions",
    json=body,
    headers=headers
    )

    async for event in process_response_stream(response):
    yield event

    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield format_error_event("Internal server error")

    # ========================
    # Обработка стрима
    # ========================

    async def process_response_stream(response):
    """Обрабатывает потоковый ответ и генерирует события."""
    if response.status_code != 200:
    error_text = (await response.aread()).decode()
    logger.error(f"Ошибка от API: {error_text}")
    @@ -215,74 +182,89 @@ async def process_response_stream(response):
    yield processed
    if processed.endswith("[DONE]\n\n"):
    return

    async def process_stream_line(line, text_accumulator):
    """Обрабатывает одну строку из потока."""
    if not line.startswith("data: "):
    return None

    try:
    data = parse_stream_data(line)
    if not data or "sources" in data:
    return None

    text_accumulator.append(data["content"])
    return process_accumulated_text("".join(text_accumulator), data)

    except Exception:
    return None


    def parse_stream_data(line):
    """Парсит данные из строки потока."""
    data_part = line[len("data: "):].strip()
    if not data_part:
    return None

    data = json.loads(data_part)
    choice = data.get("choices", [{}])[0]
    content = choice.get("delta", {}).get("content", "")
    return {"data": data, "content": content} if content else None


    def process_accumulated_text(text, context_data):
    """Обрабатывает накопленный текст и генерирует события."""
    start_pos = text.find(START_MARKER)
    end_pos = text.find(END_MARKER)

    if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos):
    return process_partial_text(text, context_data)

    return process_marked_content(text, start_pos, end_pos, context_data)

    def process_partial_text(text, context_data):
    """Обрабатывает частичные данные без полных маркеров."""
    cutoff = find_safe_cutoff(text, START_MARKER)
    safe_part = text[:cutoff]

    if safe_part.strip():
    yield emit_with_content(context_data["data"], safe_part)
    return {"remaining": text[cutoff:]}

    return {"remaining": text[cutoff:]}
    def process_marked_content(text, start_pos, end_pos, context_data):
    """Обрабатывает текст между маркерами."""
    pre_text = text[:start_pos].strip()
    if pre_text:
    yield emit_with_content(context_data["data"], pre_text)

    json_block = text[start_pos + len(START_MARKER):end_pos]
    json_str = extract_json(json_block)

    if json_str:
    try:
    parsed = json.loads(json_str)
    if "functions" in parsed:
    async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]):
    logger.info(chunk)
    yield chunk + '\n'
    yield chunk
    except json.JSONDecodeError:
    pass

    yield "data: [DONE]\n\n"

    def extract_json(block):
    start = block.find("{")
    end = block.rfind("}")
    return block[start:end + 1] if start != -1 and end != -1 else None
    """Извлекает JSON строку из блока."""
    start = block.find("{")
    end = block.rfind("}")
    return block[start:end + 1] if start != -1 and end != -1 else None

    def emit_with_content(base_json: dict, content: str) -> str:
    base = dict(base_json)
    base["choices"][0]["delta"] = {"role": "assistant", "content": content}
    return f"data: {json.dumps(base)}\n\n"

    # ========================
    # Потоковый вывод функций
    # ========================

    async def stream_tool_calls(base_json: dict, functions: list):
    """
    Асинхронно отдаёт функции из списка по чанкам в формате stream.
    """
    for i, func in enumerate(functions):
    fc = func.get("function_call", {})
    chunk = {
    @@ -311,16 +293,14 @@ async def stream_tool_calls(base_json: dict, functions: list):
    }
    ]
    }
    yield f"data: {json.dumps(chunk)}\n"

    # ========================
    # Утилиты
    # ========================
    yield f"data: {json.dumps(chunk)}\n\n"

    def format_error_event(message):
    """Форматирует сообщение об ошибке."""
    return f'data: {{"error": "{message}"}}\n\n'

    def find_safe_cutoff(buffer: str, marker: str) -> int:
    """Находит безопасную позицию для разделения текста с учетом маркера."""
    pos = buffer.find(marker)
    if pos != -1:
    return pos
    @@ -331,4 +311,4 @@ def find_safe_cutoff(buffer: str, marker: str) -> int:

    if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level=f"{LOG_LEVEL.lower()}")
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
  9. Vovanda revised this gist Jul 5, 2025. 2 changed files with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion Readme.md
    Original file line number Diff line number Diff line change
    @@ -86,7 +86,7 @@ pip install fastapi httpx uvicorn python-dotenv
    4. Run the server:

    ```bash
    python Openwebui_tool_calling_proxy.py
    python OpenWEBUI_tool_calling_proxy.py
    ```

    5. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI
  10. Vovanda revised this gist Jul 5, 2025. 3 changed files with 390 additions and 255 deletions.
    181 changes: 0 additions & 181 deletions Openwebui_filter_proxy.py
    Original file line number Diff line number Diff line change
    @@ -1,181 +0,0 @@
    import os
    import json
    import logging
    import re
    from urllib.parse import urljoin

    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import StreamingResponse, JSONResponse
    import httpx
    from dotenv import load_dotenv

    # --- Конфигурация ---
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    load_dotenv()

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_api_url_here")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

    ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
    #replace или disable, или default
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()

    def override_system_messages(messages: list[dict]) -> list[dict]:
    ZED_SYSTEM_PROMPT = None
    if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
    with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
    ZED_SYSTEM_PROMPT = f.read().strip()

    if ZED_SYSTEM_PROMPT_MODE == "disable":
    # Удалить все system-сообщения, ничего не добавлять
    return [m for m in messages if m.get("role") != "system"]

    if ZED_SYSTEM_PROMPT_MODE == "replace" and ZED_SYSTEM_PROMPT:
    # Удалить все system и вставить один кастомный
    messages = [m for m in messages if m.get("role") != "system"]
    messages.insert(0, {"role": "system", "content": ZED_SYSTEM_PROMPT})
    return messages

    # default — не трогаем системный промт
    return messages

    @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
    async def proxy_all(request: Request, path: str):
    if path == "chat/completions":
    return await openai_proxy(request)

    target_url = urljoin(f"{OPENWEBUI_URL}/", path)
    try:
    request_body = None
    if request.method in ["POST", "PUT"]:
    try:
    request_body = await request.json()
    except json.JSONDecodeError:
    request_body = None

    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.request(
    method=request.method,
    url=target_url,
    headers={
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    },
    json=request_body,
    params=dict(request.query_params),
    )

    filtered_headers = {
    k: v for k, v in response.headers.items()
    if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
    }

    return JSONResponse(
    content=response.json(),
    status_code=response.status_code,
    headers=filtered_headers,
    )

    except httpx.ReadTimeout:
    logger.error("Таймаут при обращении к Open WebUI")
    raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
    except Exception as e:
    logger.error(f"Ошибка проксирования: {str(e)}")
    raise HTTPException(status_code=500, detail=str(e))

    @app.post("/v1/chat/completions")
    async def openai_proxy(request: Request):
    body = await request.json()

    original_messages = body.get("messages", [])
    modified_messages = override_system_messages(original_messages)

    if modified_messages != original_messages:
    body["messages"] = modified_messages
    logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
    else:
    logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")

    logger.info(f"Обработка chat/completions для модели: {body.get('model')}")

    headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    "Accept": "text/event-stream" if body.get("stream") else "application/json",
    }

    if body.get("stream"):
    return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")
    else:
    return await get_json_response(body, headers)

    async def get_json_response(body: dict, headers: dict):
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.post(f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)

    if response.status_code != 200:
    detail = response.json().get("detail", "Open WebUI error")
    logger.error(f"Open WebUI error: {detail}")
    raise HTTPException(status_code=response.status_code, detail=detail)

    data = response.json()
    return {
    "id": data.get("id"),
    "object": "chat.completion",
    "created": data.get("created"),
    "choices": [{
    "index": 0,
    "message": data["choices"][0]["message"],
    "finish_reason": "stop"
    }]
    }


    async def event_generator(body: dict, headers: dict):
    max_log_chunk = 200
    try:
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"OpenWebUI error: {text.decode()}")
    yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
    return

    async for line in response.aiter_lines():
    try:
    if not line.strip():
    continue

    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass

    logger.info(line)
    yield f"{line}\n"

    except Exception as inner_e:
    logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
    # Не прерываем генератор, чтобы попытаться продолжить стрим
    continue

    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield f"data: {{\"error\": \"Internal server error\"}}\n\n"


    if __name__ == "__main__":
    import uvicorn
    uvicorn.run("openAI_adapter:app", host="127.0.0.1", port=5000, log_level="info")
    334 changes: 334 additions & 0 deletions Openwebui_tool_calling_proxy.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,334 @@
    import copy
    import os
    import json
    import logging
    from urllib.parse import urljoin

    import httpx
    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import StreamingResponse, JSONResponse
    from dotenv import load_dotenv

    # ========================
    # Конфигурация и инициализация
    # ========================

    load_dotenv()

    LOG_LEVEL = os.getenv("LOG_LEVEL", "info").upper()
    logging.basicConfig(level=LOG_LEVEL)
    logger = logging.getLogger(__name__)

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_endpoint_here")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

    ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()
    EMULATE_FUNCTION_CALLING = os.getenv("ZED_SYSTEM_PROMPT_MODE", True)

    START_MARKER = "<tools_calling>"
    END_MARKER = "</tools_calling>"
    MARKER_MAX_LEN = max(len(START_MARKER), len(END_MARKER))

    # ========================
    # Загрузка и обработка системного промта
    # ========================

    def load_system_prompt() -> str | None:
    if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
    with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
    return f.read().strip()
    return None

    def apply_system_prompt_policy(messages: list[dict], mode: str, custom_prompt: str | None) -> list[dict]:
    if mode == "disable":
    return [m for m in messages if m.get("role") != "system"]
    if mode == "replace" and custom_prompt:
    filtered = [m for m in messages if m.get("role") != "system"]
    filtered.insert(0, {"role": "system", "content": custom_prompt})
    return filtered
    return messages

    def inject_tools_as_prompt(tools: dict, messages: list[dict]) -> None:
    if not tools:
    return

    tools_text = json.dumps(tools, indent=2, ensure_ascii=False)
    tools_message = {
    "role": "system",
    "content": f"Ниже перечислены встроенные инструменты (function calling):\n{tools_text}"
    }

    for i, msg in enumerate(messages):
    if msg.get("role") == "system":
    messages.insert(i + 1, tools_message)
    return
    messages.insert(0, tools_message)

    # ========================
    # Основной endpoint
    # ========================

    @app.post("/v1/chat/completions")
    async def openai_proxy(request: Request):
    logger.info(">>> Вызван openai_proxy")
    body = await request.json()
    original_body = copy.deepcopy(body)

    # Системный промт
    system_prompt = load_system_prompt()
    body["messages"] = apply_system_prompt_policy(body.get("messages", []), ZED_SYSTEM_PROMPT_MODE, system_prompt)

    # Интеграция tools в messages
    if EMULATE_FUNCTION_CALLING:
    tools = body.pop("tools", None)
    if tools:
    inject_tools_as_prompt(tools, body.get("messages", []))
    logger.info("Инструменты встроены в messages, ключ 'tools' удалён")

    if body != original_body:
    logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
    else:
    logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")

    # Извлекаем Authorization из исходного запроса, если есть
    auth_header = request.headers.get("Authorization", f"Bearer {API_KEY}")
    headers = {
    "Authorization": auth_header,
    "Content-Type": "application/json",
    "Accept": "text/event-stream" if body.get("stream") else "application/json",
    }

    generator = func_calling_event_generator if EMULATE_FUNCTION_CALLING else default_event_generator
    return StreamingResponse(generator(body, headers), media_type="text/event-stream")

    # ========================
    # Прокси для всех /v1/* путей
    # ========================

    @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
    async def proxy_all(request: Request, path: str):
    if path == "chat/completions":
    return await openai_proxy(request)

    target_url = urljoin(f"{OPENWEBUI_URL}/", path)
    try:
    request_body = None
    if request.method in ["POST", "PUT"]:
    try:
    request_body = await request.json()
    except json.JSONDecodeError:
    request_body = None

    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.request(
    method=request.method,
    url=target_url,
    headers={
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    },
    json=request_body,
    params=dict(request.query_params),
    )

    filtered_headers = {
    k: v for k, v in response.headers.items()
    if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
    }

    return JSONResponse(
    content=response.json(),
    status_code=response.status_code,
    headers=filtered_headers,
    )

    except httpx.ReadTimeout:
    logger.error("Таймаут при обращении к Open WebUI")
    raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
    except Exception as e:
    logger.error(f"Ошибка проксирования: {str(e)}")
    raise HTTPException(status_code=500, detail=str(e))

    # ========================
    # Генераторы событий
    # ========================

    async def default_event_generator(body: dict, headers: dict):
    max_log_chunk = 200
    try:
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"OpenWebUI error: {text.decode()}")
    yield format_error_event(text.decode())
    return

    async for line in response.aiter_lines():
    if not line.strip():
    continue
    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass
    logger.info(line)
    yield f"{line}\n"
    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield format_error_event("Internal server error")

    async def func_calling_event_generator(body: dict, headers: dict):
    try:
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)
    async for event in process_response_stream(response):
    yield event
    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield format_error_event("Internal server error")

    # ========================
    # Обработка стрима
    # ========================

    async def process_response_stream(response):
    if response.status_code != 200:
    error_text = (await response.aread()).decode()
    logger.error(f"Ошибка от API: {error_text}")
    yield format_error_event(error_text)
    return

    text_accumulator = []
    async for line in response.aiter_lines():
    processed = await process_stream_line(line, text_accumulator)
    if processed:
    yield processed
    if processed.endswith("[DONE]\n\n"):
    return

    async def process_stream_line(line, text_accumulator):
    if not line.startswith("data: "):
    return None
    try:
    data = parse_stream_data(line)
    if not data or "sources" in data:
    return None
    text_accumulator.append(data["content"])
    return process_accumulated_text("".join(text_accumulator), data)
    except Exception:
    return None

    def parse_stream_data(line):
    data_part = line[len("data: "):].strip()
    if not data_part:
    return None
    data = json.loads(data_part)
    choice = data.get("choices", [{}])[0]
    content = choice.get("delta", {}).get("content", "")
    return {"data": data, "content": content} if content else None

    def process_accumulated_text(text, context_data):
    start_pos = text.find(START_MARKER)
    end_pos = text.find(END_MARKER)
    if not (start_pos != -1 and end_pos != -1 and end_pos > start_pos):
    return process_partial_text(text, context_data)
    return process_marked_content(text, start_pos, end_pos, context_data)

    def process_partial_text(text, context_data):
    cutoff = find_safe_cutoff(text, START_MARKER)
    safe_part = text[:cutoff]
    if safe_part.strip():
    yield emit_with_content(context_data["data"], safe_part)
    return {"remaining": text[cutoff:]}

    def process_marked_content(text, start_pos, end_pos, context_data):
    pre_text = text[:start_pos].strip()
    if pre_text:
    yield emit_with_content(context_data["data"], pre_text)
    json_block = text[start_pos + len(START_MARKER):end_pos]
    json_str = extract_json(json_block)
    if json_str:
    try:
    parsed = json.loads(json_str)
    if "functions" in parsed:
    async for chunk in stream_tool_calls(context_data["data"], parsed["functions"]):
    logger.info(chunk)
    yield chunk + '\n'
    except json.JSONDecodeError:
    pass
    yield "data: [DONE]\n\n"

    def extract_json(block):
    start = block.find("{")
    end = block.rfind("}")
    return block[start:end + 1] if start != -1 and end != -1 else None

    def emit_with_content(base_json: dict, content: str) -> str:
    base = dict(base_json)
    base["choices"][0]["delta"] = {"role": "assistant", "content": content}
    return f"data: {json.dumps(base)}\n\n"

    # ========================
    # Потоковый вывод функций
    # ========================

    async def stream_tool_calls(base_json: dict, functions: list):
    for i, func in enumerate(functions):
    fc = func.get("function_call", {})
    chunk = {
    "id": f"{base_json["id"]}",
    "object": "chat.completion.chunk",
    "model": f"{base_json["model"]}",
    "created": f"{base_json["created"]}",
    "choices": [
    {
    "index": 0,
    "delta": {
    "tool_calls": [
    {
    "id": f"call_{i}",
    "index": i,
    "type": "function",
    "function": {
    "name": fc.get("name", ""),
    "arguments": fc.get("arguments", "")
    }
    }
    ]
    },
    "finish_reason": "tool_calls" if i == len(functions) - 1 else None,
    "native_finish_reason": "tool_calls" if i == len(functions) - 1 else None
    }
    ]
    }
    yield f"data: {json.dumps(chunk)}\n"

    # ========================
    # Утилиты
    # ========================

    def format_error_event(message):
    return f'data: {{"error": "{message}"}}\n\n'

    def find_safe_cutoff(buffer: str, marker: str) -> int:
    pos = buffer.find(marker)
    if pos != -1:
    return pos
    for i in range(len(marker) - 1, 0, -1):
    if buffer.endswith(marker[:i]):
    return len(buffer) - i
    return len(buffer)

    if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level=f"{LOG_LEVEL.lower()}")
    130 changes: 56 additions & 74 deletions Readme.md
    Original file line number Diff line number Diff line change
    @@ -14,8 +14,54 @@ which occurs when using a custom Open Web UI model (pipe functions). Open Web UI

    - Proxies requests to Open Web UI
    - Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses
    - Supports both streaming and regular JSON responses
    - Supports streaming
    - Allows overriding system messages in requests (optional)
    - **Simulates OpenAI-style function calling (`tool_calls`) using special stream markers** controlled by the environment variable `EMULATE_TOOLS_CALLING` (default: enabled)

    ---

    ## Tool Calling Emulation

    If your model outputs function definitions inside the following block:

    ```
    <tools_calling>
    { ... json ... }
    </tools_calling>
    ```

    The adapter will:

    - Detect the block
    - Extract and parse the JSON
    - Convert it into OpenAI-compatible `tool_calls` chunks
    - Send those chunks to the client as if native function calling were supported

    ### Expected JSON Structure

    The JSON inside `<tools_calling>...</tools_calling>` must have the following structure:

    ```json
    {
    "model": "<string>",
    "functions": [
    {
    "index": <integer>,
    "function_call": {
    "name": "<string>",
    "arguments": "<JSON string>"
    },
    "finish_reason": "function_call" | null
    },
    ...
    ]
    }
    ```

    Notes:
    - The `arguments` field must be a valid **stringified JSON object**
    - The outer structure must contain a `functions` list
    - Each entry represents one tool call

    ---

    @@ -30,6 +76,7 @@ which occurs when using a custom Open Web UI model (pipe functions). Open Web UI
    - `default` (leave system messages unchanged)
    - `replace` (replace all system messages with the one from the file)
    - `disable` (remove all system messages)
    - (optional) `EMULATE_TOOLS_CALLING``true` or `false` to enable/disable tool calling emulation (default: `true`)
    3. Install dependencies:

    ```bash
    @@ -39,90 +86,25 @@ pip install fastapi httpx uvicorn python-dotenv
    4. Run the server:

    ```bash
    python openwebui_filter_proxy.py
    python Openwebui_tool_calling_proxy.py
    ```

    5. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI
    5. Configure Zed IDE to use this proxy instead of a direct connection to Open Web UI

    ---

    ## Important notes

    - The invalid chunk issue only occurs when using Open Web UI custom models (pipe functions)
    - No such problems occur when using models via OpenRouter
    - This script is intended as a temporary workaround for smooth operation with Zed IDE
    - The invalid chunk issue only occurs with Open Web UI custom models (pipe functions)
    - Models served via OpenRouter are not affected
    - The tool calling emulation requires output in the `<tools_calling>...</tools_calling>` format
    - This is a temporary and pragmatic workaround to enable compatibility with OpenAI-like clients

    ---

    If you have any questions, feel free to reach out.

    ---

    Author: Savkin Vladimir
    Date: 2025


    ---

    # Фильтр невалидных чанков Open Web UI для Zed IDE

    Этот скрипт-адаптер решает проблему с ошибкой в Zed IDE:

    ```
    data did not match any variant of untagged enum ResponseStreamResult
    ```

    Возникающую при использовании кастомной модели Open Web UI (pipe functions), когда Open Web UI отправляет невалидные чанки в stream, что вызывает исключения при парсинге.

    ---

    ## Что делает адаптер

    - Проксирует запросы к Open Web UI
    - Фильтрует и удаляет невалидные чанки из stream, позволяя Zed IDE корректно принимать ответы
    - Поддерживает как streaming, так и обычные JSON-ответы
    - Позволяет переопределять системные сообщения в запросах (опционально)

    ---

    ## Использование

    1. Клонируйте или скачайте скрипт
    2. Задайте переменные окружения:
    - `OPENWEBUI_URL` — URL вашего Open Web UI сервера
    - `OPENWEBUI_API_KEY` — API ключ для доступа
    - (опционально) `ZED_SYSTEM_PROMPT_FILE` — путь к файлу с системным сообщением
    - (опционально) `ZED_SYSTEM_PROMPT_MODE` — режим обработки системных сообщений, одно из:
    - `default` (оставить системные сообщения без изменений)
    - `replace` (заменить все системные сообщения на указанное в файле)
    - `disable` (удалить все системные сообщения)
    3. Установите зависимости:

    ```bash
    pip install fastapi httpx uvicorn python-dotenv
    ```

    4. Запустите сервер:

    ```bash
    python openwebui_filter_proxy.py
    ```

    5. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI

    ---

    ## Важные замечания

    - Проблема с невалидными чанками возникает только при использовании кастомной модели Open Web UI (pipe functions)
    - При использовании моделей через OpenRouter подобных проблем не наблюдается
    - Скрипт сделан как временное решение для корректной работы с Zed IDE

    ---

    Если возникнут вопросы — обращайтесь.

    ---

    Автор: Владимир Савкин
    Дата: 2025
    Author: Savkin Vladimir
    Date: 2025
  11. Vovanda revised this gist Jul 4, 2025. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion Openwebui_filter_proxy.py
    Original file line number Diff line number Diff line change
    @@ -17,7 +17,7 @@

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "https://chat.sawking.tech")
    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "your_api_url_here")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

  12. Vovanda renamed this gist Jul 4, 2025. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  13. Vovanda revised this gist Jul 4, 2025. 2 changed files with 183 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions Readme.md
    Original file line number Diff line number Diff line change
    @@ -39,7 +39,7 @@ pip install fastapi httpx uvicorn python-dotenv
    4. Run the server:

    ```bash
    python openAI_adapter.py
    python openwebui_filter_proxy.py
    ```

    5. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI
    @@ -105,7 +105,7 @@ pip install fastapi httpx uvicorn python-dotenv
    4. Запустите сервер:

    ```bash
    python openAI_adapter.py
    python openwebui_filter_proxy.py
    ```

    5. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI
    181 changes: 181 additions & 0 deletions openwebui_filter_proxy.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,181 @@
    import os
    import json
    import logging
    import re
    from urllib.parse import urljoin

    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import StreamingResponse, JSONResponse
    import httpx
    from dotenv import load_dotenv

    # --- Конфигурация ---
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    load_dotenv()

    app = FastAPI()

    OPENWEBUI_URL = os.getenv("OPENWEBUI_URL", "https://chat.sawking.tech")
    API_KEY = os.getenv("OPENWEBUI_API_KEY", "your_api_key_here")
    TIMEOUT = 30.0

    ZED_SYSTEM_PROMPT_FILE = os.getenv("ZED_SYSTEM_PROMPT_FILE")
    #replace или disable, или default
    ZED_SYSTEM_PROMPT_MODE = os.getenv("ZED_SYSTEM_PROMPT_MODE", "default").lower()

    def override_system_messages(messages: list[dict]) -> list[dict]:
    ZED_SYSTEM_PROMPT = None
    if ZED_SYSTEM_PROMPT_FILE and os.path.exists(ZED_SYSTEM_PROMPT_FILE):
    with open(ZED_SYSTEM_PROMPT_FILE, encoding="utf-8") as f:
    ZED_SYSTEM_PROMPT = f.read().strip()

    if ZED_SYSTEM_PROMPT_MODE == "disable":
    # Удалить все system-сообщения, ничего не добавлять
    return [m for m in messages if m.get("role") != "system"]

    if ZED_SYSTEM_PROMPT_MODE == "replace" and ZED_SYSTEM_PROMPT:
    # Удалить все system и вставить один кастомный
    messages = [m for m in messages if m.get("role") != "system"]
    messages.insert(0, {"role": "system", "content": ZED_SYSTEM_PROMPT})
    return messages

    # default — не трогаем системный промт
    return messages

    @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
    async def proxy_all(request: Request, path: str):
    if path == "chat/completions":
    return await openai_proxy(request)

    target_url = urljoin(f"{OPENWEBUI_URL}/", path)
    try:
    request_body = None
    if request.method in ["POST", "PUT"]:
    try:
    request_body = await request.json()
    except json.JSONDecodeError:
    request_body = None

    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.request(
    method=request.method,
    url=target_url,
    headers={
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    },
    json=request_body,
    params=dict(request.query_params),
    )

    filtered_headers = {
    k: v for k, v in response.headers.items()
    if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
    }

    return JSONResponse(
    content=response.json(),
    status_code=response.status_code,
    headers=filtered_headers,
    )

    except httpx.ReadTimeout:
    logger.error("Таймаут при обращении к Open WebUI")
    raise HTTPException(status_code=504, detail="Таймаут соединения с Open WebUI")
    except Exception as e:
    logger.error(f"Ошибка проксирования: {str(e)}")
    raise HTTPException(status_code=500, detail=str(e))

    @app.post("/v1/chat/completions")
    async def openai_proxy(request: Request):
    body = await request.json()

    original_messages = body.get("messages", [])
    modified_messages = override_system_messages(original_messages)

    if modified_messages != original_messages:
    body["messages"] = modified_messages
    logger.info(f"Тело запроса изменено: {json.dumps(body, ensure_ascii=False)}")
    else:
    logger.info(f"Тело запроса без изменений: {json.dumps(body, ensure_ascii=False)}")

    logger.info(f"Обработка chat/completions для модели: {body.get('model')}")

    headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    "Accept": "text/event-stream" if body.get("stream") else "application/json",
    }

    if body.get("stream"):
    return StreamingResponse(event_generator(body, headers), media_type="text/event-stream")
    else:
    return await get_json_response(body, headers)

    async def get_json_response(body: dict, headers: dict):
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    response = await client.post(f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers)

    if response.status_code != 200:
    detail = response.json().get("detail", "Open WebUI error")
    logger.error(f"Open WebUI error: {detail}")
    raise HTTPException(status_code=response.status_code, detail=detail)

    data = response.json()
    return {
    "id": data.get("id"),
    "object": "chat.completion",
    "created": data.get("created"),
    "choices": [{
    "index": 0,
    "message": data["choices"][0]["message"],
    "finish_reason": "stop"
    }]
    }


    async def event_generator(body: dict, headers: dict):
    max_log_chunk = 200
    try:
    async with httpx.AsyncClient(timeout=TIMEOUT) as client:
    async with client.stream("POST", f"{OPENWEBUI_URL}/api/chat/completions", json=body, headers=headers) as response:
    if response.status_code != 200:
    text = await response.aread()
    logger.error(f"OpenWebUI error: {text.decode()}")
    yield f"data: {{\"error\": \"{text.decode()}\"}}\n\n"
    return

    async for line in response.aiter_lines():
    try:
    if not line.strip():
    continue

    if line.startswith("data: "):
    json_str = line[len("data: "):].strip()
    try:
    data = json.loads(json_str)
    if "sources" in data:
    snippet = json_str[:max_log_chunk].replace("\n", " ")
    logger.info(f"Пропущен чанк с 'sources': {snippet}...")
    continue
    except json.JSONDecodeError:
    pass

    logger.info(line)
    yield f"{line}\n"

    except Exception as inner_e:
    logger.error(f"Ошибка при обработке строки стрима: {inner_e}")
    # Не прерываем генератор, чтобы попытаться продолжить стрим
    continue

    except Exception as e:
    logger.error(f"Ошибка стриминга: {e}")
    yield f"data: {{\"error\": \"Internal server error\"}}\n\n"


    if __name__ == "__main__":
    import uvicorn
    uvicorn.run("openAI_adapter:app", host="127.0.0.1", port=5000, log_level="info")
  14. Vovanda created this gist Jul 4, 2025.
    128 changes: 128 additions & 0 deletions Readme.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,128 @@
    # Open Web UI Stream Chunk Filter for Zed IDE

    This adapter script resolves the following error in Zed IDE:

    ```
    data did not match any variant of untagged enum ResponseStreamResult
    ```

    which occurs when using a custom Open Web UI model (pipe functions). Open Web UI sends invalid chunks in the stream, causing parsing exceptions.

    ---

    ## What the adapter does

    - Proxies requests to Open Web UI
    - Filters out and removes invalid chunks from the stream, allowing Zed IDE to properly receive responses
    - Supports both streaming and regular JSON responses
    - Allows overriding system messages in requests (optional)

    ---

    ## Usage

    1. Clone or download the script
    2. Set environment variables:
    - `OPENWEBUI_URL` — URL of your Open Web UI server
    - `OPENWEBUI_API_KEY` — API key for access
    - (optional) `ZED_SYSTEM_PROMPT_FILE` — path to a file with a system prompt
    - (optional) `ZED_SYSTEM_PROMPT_MODE` — mode for handling system messages, one of:
    - `default` (leave system messages unchanged)
    - `replace` (replace all system messages with the one from the file)
    - `disable` (remove all system messages)
    3. Install dependencies:

    ```bash
    pip install fastapi httpx uvicorn python-dotenv
    ```

    4. Run the server:

    ```bash
    python openAI_adapter.py
    ```

    5. Configure Zed IDE to use this proxy instead of direct connection to Open Web UI

    ---

    ## Important notes

    - The invalid chunk issue only occurs when using Open Web UI custom models (pipe functions)
    - No such problems occur when using models via OpenRouter
    - This script is intended as a temporary workaround for smooth operation with Zed IDE

    ---

    If you have any questions, feel free to reach out.

    ---

    Author: Savkin Vladimir
    Date: 2025


    ---

    # Фильтр невалидных чанков Open Web UI для Zed IDE

    Этот скрипт-адаптер решает проблему с ошибкой в Zed IDE:

    ```
    data did not match any variant of untagged enum ResponseStreamResult
    ```

    Возникающую при использовании кастомной модели Open Web UI (pipe functions), когда Open Web UI отправляет невалидные чанки в stream, что вызывает исключения при парсинге.

    ---

    ## Что делает адаптер

    - Проксирует запросы к Open Web UI
    - Фильтрует и удаляет невалидные чанки из stream, позволяя Zed IDE корректно принимать ответы
    - Поддерживает как streaming, так и обычные JSON-ответы
    - Позволяет переопределять системные сообщения в запросах (опционально)

    ---

    ## Использование

    1. Клонируйте или скачайте скрипт
    2. Задайте переменные окружения:
    - `OPENWEBUI_URL` — URL вашего Open Web UI сервера
    - `OPENWEBUI_API_KEY` — API ключ для доступа
    - (опционально) `ZED_SYSTEM_PROMPT_FILE` — путь к файлу с системным сообщением
    - (опционально) `ZED_SYSTEM_PROMPT_MODE` — режим обработки системных сообщений, одно из:
    - `default` (оставить системные сообщения без изменений)
    - `replace` (заменить все системные сообщения на указанное в файле)
    - `disable` (удалить все системные сообщения)
    3. Установите зависимости:

    ```bash
    pip install fastapi httpx uvicorn python-dotenv
    ```

    4. Запустите сервер:

    ```bash
    python openAI_adapter.py
    ```

    5. Настройте Zed IDE использовать этот прокси вместо прямого подключения к Open Web UI

    ---

    ## Важные замечания

    - Проблема с невалидными чанками возникает только при использовании кастомной модели Open Web UI (pipe functions)
    - При использовании моделей через OpenRouter подобных проблем не наблюдается
    - Скрипт сделан как временное решение для корректной работы с Zed IDE

    ---

    Если возникнут вопросы — обращайтесь.

    ---

    Автор: Владимир Савкин
    Дата: 2025