Skip to content

Instantly share code, notes, and snippets.

@amphineko
Created April 12, 2026 22:41
Show Gist options
  • Select an option

  • Save amphineko/072e1efd5e5c569c531bc0973d3e1821 to your computer and use it in GitHub Desktop.

Select an option

Save amphineko/072e1efd5e5c569c531bc0973d3e1821 to your computer and use it in GitHub Desktop.
[amphineko@lab-asta]~/amcode/BangumiMCP% uv run ./test_query.py
Connecting to http://127.0.0.1:8000/sse...
Received session endpoint: /messages/?session_id=3b4299ba580d431780ca2cda194861e8
Sending initialize request...
Received initialize response. Sending initialized notification...
Sending tools/list request...
Total tools found: 55
Searching for '羊宫妃那'...
--- Search Result ---
Found 20 subjects (Total matched: 421).
[REAL] 羊宮妃那の声ラブ (ID: 506834)
Score: 7
Rank: 0
Summary: 文化放送「超!A&G+」隔週月・木曜日の17時から放送の「声ラブ」は、
未来を担う若手声優をラジオから応援する特別プログラムです。
木曜日のパーソナリティを担当するのは羊宮妃那さんです!
番組を通して、羊宮さんが様々なことに挑戦していきます。
ぜひお聞きください!
Image: https://lain.bgm.tv/r/400/pic/cover/l/a5/e7/506834_cV62c.jpg
---
[BOOK] 迷える羊の森 ~フィトセラピスト花宮の不思議なカルテ~ (ID: 574080)
Score: 0
Rank: 0
Summary: 物言わぬ不思議な植物が、人間の心を救う――。
 犬や猫は人間の感情を察して寄り添ってくれるが、植物も同じ。植物にも感情や思考がある。そんな植物の力を知り抜いた男、花宮が主人をつとめる植物療法店には、今日も悩める...
import httpx
import asyncio
import json
async def query_mcp():
url = "http://127.0.0.1:8000/sse"
print(f"Connecting to {url}...")
async with httpx.AsyncClient(timeout=30.0) as client:
async with client.stream("GET", url) as response:
session_url = None
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:].strip()
if data.startswith("/messages/"):
print(f"Received session endpoint: {data}")
session_url = f"http://127.0.0.1:8000{data}"
init_payload = {
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
}
}
print("Sending initialize request...")
asyncio.create_task(client.post(session_url, json=init_payload))
continue
try:
res = json.loads(data)
if res.get("id") == 0:
print("Received initialize response. Sending initialized notification...")
notif_payload = {"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
await client.post(session_url, json=notif_payload)
print("Sending tools/list request...")
list_payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
await client.post(session_url, json=list_payload)
elif res.get("id") == 1:
print(f"\nTotal tools found: {len(res.get('result', {}).get('tools', []))}")
search_payload = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "search_subjects",
"arguments": {"keyword": "羊宫妃那"}
}
}
print("\nSearching for '羊宫妃那'...")
await client.post(session_url, json=search_payload)
elif res.get("id") == 2:
print("\n--- Search Result ---")
result = res.get("result", {})
content = result.get("content", [])
if content:
print(content[0].get("text")[:500] + "...")
break
except json.JSONDecodeError:
pass
if __name__ == "__main__":
asyncio.run(query_mcp())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment