Skip to content

Instantly share code, notes, and snippets.

@bradleat
Created October 6, 2025 17:52
Show Gist options
  • Select an option

  • Save bradleat/0d13d6da86601d567dd0ffeb15cd4b68 to your computer and use it in GitHub Desktop.

Select an option

Save bradleat/0d13d6da86601d567dd0ffeb15cd4b68 to your computer and use it in GitHub Desktop.
export interface TypedEventSourceHandle<M, E> {
/** One-shot: transitions CONNECTING -> OPEN and fires a single "open" event. */
open(): void;
/** Publish a typed message. */
message(data: M): void;
/** Publish a typed error (also fires the spec "error" Event). */
error(err: E): void;
/** Close the stream; no "close" event per spec. */
close(): void;
}
export class TypedEventSource<M = unknown, E = unknown> implements EventSource {
// EventSource constants
readonly CONNECTING = 0 as const;
readonly OPEN = 1 as const;
readonly CLOSED = 2 as const;
readonly url: string;
readonly withCredentials: boolean;
readyState: 0 | 1 | 2 = this.CONNECTING;
onopen: ((this: EventSource, ev: Event) => any) | null = null;
onerror: ((this: EventSource, ev: Event) => any) | null = null;
onmessage: ((this: EventSource, ev: MessageEvent<M>) => any) | null = null;
#et = new EventTarget();
#source: EventSource | null = null;
#teardowns: Array<() => void> = [];
private constructor(opts: {
source?: EventSource;
url?: string;
withCredentials?: boolean;
startOpen?: boolean;
}) {
this.url = opts.url ?? "typed://local";
this.withCredentials = opts.withCredentials ?? false;
// Bridge property handlers
this.#et.addEventListener("open", (ev) => this.onopen?.call(this, ev));
this.#et.addEventListener("error", (ev) => this.onerror?.call(this, ev));
this.#et.addEventListener("message", (ev) =>
this.onmessage?.call(this, ev as MessageEvent<M>)
);
if (opts.source) {
const src = (this.#source = opts.source);
const hOpen = (ev: Event) => {
this.readyState = this.OPEN;
this.#dispatch(ev);
};
const hError = (ev: Event) => this.#dispatch(ev);
const hMsg = (ev: MessageEvent) => {
const out = new MessageEvent<M>("message", {
data: ev.data as M,
lastEventId: ev.lastEventId,
origin: (ev as any).origin,
});
this.#dispatch(out);
};
src.addEventListener("open", hOpen as EventListener);
src.addEventListener("error", hError as EventListener);
src.addEventListener("message", hMsg as EventListener);
this.#teardowns.push(() => {
src.removeEventListener("open", hOpen as EventListener);
src.removeEventListener("error", hError as EventListener);
src.removeEventListener("message", hMsg as EventListener);
try { (src as any).close?.(); } catch {}
});
} else if (opts.startOpen) {
queueMicrotask(() => {
if (this.readyState === this.CONNECTING) {
this.readyState = this.OPEN;
this.#dispatch(new Event("open"));
}
});
}
}
/** Factory: a sealed consumer SSE plus a private creator handle. */
static create<M = unknown, E = unknown>(opts: {
source?: EventSource;
url?: string;
withCredentials?: boolean;
startOpen?: boolean;
} = {}): { sse: TypedEventSource<M, E>; handle: TypedEventSourceHandle<M, E> } {
const sse = new TypedEventSource<M, E>(opts);
let opened = false; // enforce one-shot open()
const handle: TypedEventSourceHandle<M, E> = {
open: () => {
if (sse.readyState === sse.CLOSED) throw new Error("EventSource is closed");
if (opened) throw new Error("open() may only be called once");
opened = true;
sse.readyState = sse.OPEN;
sse.#dispatch(new Event("open"));
},
message: (data: M) => {
if (sse.readyState === sse.CLOSED) return;
sse.#dispatch(new MessageEvent<M>("message", { data }));
},
error: (err: E) => {
if (sse.readyState === sse.CLOSED) return;
// spec-compatible error Event
sse.#dispatch(new Event("error"));
// typed companion (same "error" type, accessible via CustomEvent<E>.detail)
sse.#dispatch(new CustomEvent<E>("error", { detail: err }) as Event);
},
close: () => sse.close(),
};
return { sse, handle };
}
// Typed listener overloads (message is MessageEvent<M>, error/open are Event)
addEventListener(
type: "message",
listener: (this: EventSource, ev: MessageEvent<M>) => any,
options?: boolean | AddEventListenerOptions
): void;
addEventListener(
type: "open" | "error",
listener: (this: EventSource, ev: Event) => any,
options?: boolean | AddEventListenerOptions
): void;
addEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions
): void {
this.#et.addEventListener(type, listener as EventListener, options);
}
removeEventListener(
type: "message",
listener: (this: EventSource, ev: MessageEvent<M>) => any,
options?: boolean | EventListenerOptions
): void;
removeEventListener(
type: "open" | "error",
listener: (this: EventSource, ev: Event) => any,
options?: boolean | EventListenerOptions
): void;
removeEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | EventListenerOptions
): void {
this.#et.removeEventListener(type, listener as EventListener, options);
}
close(): void {
if (this.readyState === this.CLOSED) return;
this.readyState = this.CLOSED;
for (const td of this.#teardowns.splice(0)) {
try { td(); } catch {}
}
this.#source = null;
}
// sealed dispatcher (callers cannot dispatch)
#dispatch(ev: Event) {
this.#et.dispatchEvent(ev);
}
}
/* ---------------- Example ---------------- */
type Msg = { text: string; ts: number };
type Err = { code: string; reason?: string };
const { sse, handle } = TypedEventSource.create<Msg, Err>({ startOpen: true });
sse.onopen = () => console.log("OPEN");
sse.onmessage = (ev) => console.log("MSG:", ev.data.text);
sse.addEventListener("error", (ev) => {
const ce = ev as CustomEvent<Err>;
if (ce.detail) console.warn("typed error:", ce.detail.code);
});
handle.message({ text: "hello", ts: Date.now() });
handle.error({ code: "E_TIMEOUT", reason: "stall" });
// Second open will throw:
try { handle.open(); } catch (e) { console.warn(String(e)); }
handle.close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment