Skip to content

Instantly share code, notes, and snippets.

@nileshtrivedi
Created November 12, 2025 03:13
Show Gist options
  • Select an option

  • Save nileshtrivedi/8b2899c9e4b09c86a9efbb5d3a804135 to your computer and use it in GitHub Desktop.

Select an option

Save nileshtrivedi/8b2899c9e4b09c86a9efbb5d3a804135 to your computer and use it in GitHub Desktop.

Revisions

  1. nileshtrivedi created this gist Nov 12, 2025.
    502 changes: 502 additions & 0 deletions crdt-distsys-simulator.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,502 @@
    // A CRDT is a wrapper data-structure for a value that knows
    // how to synchronize with other replicas to achieve eventual consistency
    // A single special method "apply" is expected to incorporate messages received from replicas.
    // Local mutations are applied immediately ("optimistically")
    // Eventual consistency is achieved with the apply method being commutative, associative and sometimes idempotent
    // Mutators must produce a new local CRDT instance (that includes the changes requested) and a Message/Effect to broadcast
    // For state-based CRDT, the whole internal state gets broadcasted
    // For op-based CRDT, the operation itself gets broadcasted
    // message = null is treated as a no-op (for eg: adding an existing item to a set) and skips broadcasting when not needed

    type MutationResult<C, MessageType> = {updatedCrdt: C, message: MessageType | null};

    interface CRDT<ValueType, MessageType> {
    apply(remoteMessage: MessageType): this
    }

    // A state-based CRDT receives full states from other replicas and "merges" them into its own
    // it demands a weaker comm channel: messages may be lost, arrived out-of-order or multiple times
    // Updates are propagated reliably even if the network partitions, as long as eventually connectivity is restored
    // Might be inefficient if state is large
    interface StateBasedCRDT<ValueType, StateMessageType> extends CRDT<ValueType, StateMessageType> {
    readonly state: StateMessageType; // Full serializable state that gets propagated to other replicas.
    // apply for StateBasedCRDT is a "merging" of a remote state into the local state.
    // This merging must be commutative, associative and idempotent
    }

    // A delta-based CRDT propagates deltas (changes) instead of full states.
    // The "apply" method merges a remote delta.
    // The merge function must be commutative, associative, and idempotent (a "join").
    interface DeltaBasedCRDT<ValueType, DeltaMessageType> extends CRDT<ValueType, DeltaMessageType> {
    // apply(remoteDelta: DeltaMessageType): DeltaBasedCRDT<ValueType, DeltaMessageType>
    }

    // Op-based CRDT receives operations from other replicas and applies them to its local state
    // Applies a remote operation to the local state. Must be commutative.
    // Needs a more reliable network: All updates must be delivered at every replica either in the right order or be commutative
    interface OperationBasedCRDT<ValueType, OpMessageType> extends CRDT<ValueType, OpMessageType> {
    // No extensions are necessary, but the expectation from apply are weaker, and from channel are stronger
    // apply(remoteMessage) must be commutative? associative? idempotent?
    }

    // A basic CRDT primitive and example
    // Last-Writer-Wins CRDT's "merge" of remote state is simply copies the latest state based on the timestamp
    type LWWRegisterState<VT> = {timestamp: number, nodeId: string; value: VT};
    class LWWRegister<VT> implements StateBasedCRDT<VT, LWWRegisterState<VT>> {
    readonly nodeId: string;
    state: LWWRegisterState<VT>;

    constructor(nodeId: string, state: LWWRegisterState<VT>) {
    this.nodeId = nodeId;
    this.state = state;
    }

    getValue() : VT {
    return this.state.value;
    }

    setValue(newValue: VT): MutationResult<LWWRegister<VT>, LWWRegisterState<VT>> {
    const newState: LWWRegisterState<VT> = {
    timestamp: Date.now(), // Keeping it simple for now. Might use Lamport clocks in the future
    nodeId: this.nodeId,
    value: newValue
    };
    return {updatedCrdt: new LWWRegister(this.nodeId, newState), message: newState};
    }

    apply(remoteState: LWWRegisterState<VT>): this {
    if (this.state.timestamp > remoteState.timestamp) return this; // discard the incoming value
    if (this.state.timestamp == remoteState.timestamp && this.nodeId > remoteState.nodeId) return this;
    return new LWWRegister<VT>(this.nodeId, remoteState) as this;
    }
    }

    /* LWWMap - a container CRDT for named values which are themselves of type LWWRegister */
    // As a map, it will be best implemented as a delta-based CRDT
    type LWWMapState<VT> = Map<string, LWWRegisterState<VT | null | undefined>>;
    class LWWMap<VT> implements DeltaBasedCRDT<Map<string, VT>, LWWMapState<VT>> {
    readonly nodeId: string;
    readonly #data = new Map<string, LWWRegister<VT | null | undefined>>();

    private constructor(nodeId: string, data: Map<string, LWWRegister<VT | null | undefined>>) {
    this.nodeId = nodeId;
    this.#data = data;
    }

    // Public factory for creating an empty map
    static startEmpty<VT>(nodeId: string): LWWMap<VT> {
    return new LWWMap(nodeId, new Map());
    }

    // Public factory for creating from a state
    static fromState<VT>(nodeId: string, state: LWWMapState<VT>): LWWMap<VT> {
    const data = new Map<string, LWWRegister<VT | null | undefined>>();
    for (const [key, registerState] of state.entries()) {
    data.set(key, new LWWRegister(nodeId, registerState));
    }
    return new LWWMap(nodeId, data);
    }

    getValueForKey(key: string): VT | null | undefined {
    return this.#data.get(key)?.getValue() ?? undefined;
    }

    hasKey(key: string): boolean {
    const val = this.#data.get(key)?.getValue();
    return val !== null && val !== undefined;
    }

    countKeys(): number {
    return this.#data.size;
    }

    getValue(): Map<string, VT> {
    const value = new Map<string, VT>();
    for (const [key, register] of this.#data.entries()) {
    const registerValue = register.getValue();
    if (registerValue !== null && registerValue !== undefined) {
    value.set(key, registerValue as VT);
    }
    }
    return value;
    }

    // Helper to get or create a register *instance* (not state)
    private getOrCreateRegister(key: string): LWWRegister<VT | null | undefined> {
    let register = this.#data.get(key);
    if (!register) {
    register = new LWWRegister<VT | null | undefined>(
    this.nodeId,
    {timestamp: 0, nodeId: this.nodeId, value: undefined} // use timestamp=0 for a new register
    );
    }
    return register;
    }

    removeKey(key: string) : MutationResult<LWWMap<VT>, LWWMapState<VT>> {
    const register = this.getOrCreateRegister(key);
    const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(null);
    const newData = new Map(this.#data);
    newData.set(key, newRegister);
    const deltaMessage = new Map([[key, newRegisterState!]]);
    const newMapInstance = new LWWMap(this.nodeId, newData);
    return {updatedCrdt: newMapInstance as this, message: deltaMessage};
    }

    setValueForKey(key: string, value: VT | null) : MutationResult<LWWMap<VT>, LWWMapState<VT>> {
    const register = this.getOrCreateRegister(key);
    const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(value);
    const newData = new Map(this.#data);
    newData.set(key, newRegister);
    const deltaMessage = new Map([[key, newRegisterState!]]);
    const newMapInstance = new LWWMap<VT>(this.nodeId, newData);
    return {updatedCrdt: newMapInstance, message: deltaMessage}
    }

    get state() {
    const state: LWWMapState<VT> = new Map<string, LWWRegisterState<VT>>();
    for (const [key, register] of this.#data.entries()) {
    state.set(key, register.state);
    }
    return state;
    }

    // This needs to be able to merge deltas - which might be full states
    apply(remoteDelta: LWWMapState<VT>): this {
    // We need a new data map for the new immutable instance
    const newData = new Map(this.#data);

    for (const [key, remoteRegisterState] of remoteDelta.entries()) {
    const localRegister = this.#data.get(key);

    if (localRegister) {
    // Register exists, merge it and store the new *register instance*
    const mergedRegister = localRegister.apply(remoteRegisterState);
    newData.set(key, mergedRegister);
    } else {
    // New key, create a new register *instance* from the remote state
    newData.set(key, new LWWRegister(this.nodeId, remoteRegisterState));
    }
    }
    return new LWWMap<VT>(this.nodeId, newData) as this;
    }
    }



    // Grow-only Set as an op-based CRDT
    // The only operation on GSet is 'add', which carries the value.
    type GSetOp<VT> = {type: 'add'; value: VT;};

    class GSet<VT> implements OperationBasedCRDT<Set<VT>, GSetOp<VT>> {
    // The internal state is just a standard Set
    readonly #data = new Set<VT>();

    constructor(initialSet?: Set<VT>){
    this.#data = new Set(initialSet);
    }

    getValue() : Set<VT> {
    return new Set(this.#data);
    }

    has(item: VT): boolean {
    return this.#data.has(item);
    }

    add(newItem: VT) : MutationResult<this, GSetOp<VT>> {
    if(this.#data.has(newItem)) { // If we already have it, no need to update or broadcast
    return {updatedCrdt: this, message: null}
    }

    // Update local state by simply adding the item to the set.
    // Guaranteed to be commutative because Set.add() is.
    // Set.add() does nothing, making it idempotent.
    const newData = new Set(this.#data);
    newData.add(newItem);

    // Return the updated crdt and the op to be broadcast
    return {updatedCrdt: new GSet<VT>(newData) as this, message: {type: 'add', value: newItem}}
    }

    apply(remoteOp: GSetOp<VT>): this {
    return this.add(remoteOp.value).updatedCrdt; // We ignore message because we RECEIVED this op from outside
    }
    }

    // Now we define types to specify a Distributed System
    // Each network zone gets multiple nodes. Each node can have local state and computations.
    // Some states are the source of truth. Others are derived values from other states via computations.
    // The AppSpec gets used to instantiate an actual runtime App (which is a network of nodes)
    // all computations can only depend on the local state
    // all local state must be declared with the proper crdt type which determines how updates are generated and applied
    // all derived state explicitly specifies the function that compute updates, and its dependencies
    // all computations receive an additional event parameter which includes session info that makes access-control possible


    type Path = string; // A zone-prefixed path to a state variable or a computation, e.g., "/client/my_todos"
    type CrdtType = "LWWRegister" | "LWWMap" | "GSet";
    type ValueType = "object" | "number" | "string" | "boolean" | "array";

    type SourceState = {
    crdtType: CrdtType;
    valueType: ValueType;
    init: any;
    };

    type DerivedState = {
    crdtType: CrdtType;
    valueType: ValueType;
    computer: Path;
    depends_on: Path[]
    };

    type Effect<VT> = {
    nodeId: string;
    path: Path;
    msg: LWWRegisterState<VT> | LWWMapState<VT> | GSetOp<VT>
    };

    type Session = { claims: any; signature: string; }
    type ZEvent = {nodeId: string; session: Session, payload: any};

    /*
    Computation, even reactive, always gets the event as an argument.
    This is mostly useful for getting session info so that access-control can work.
    Explicit events also make it easy to replay and debug things.
    For reactive computations, the framework will provide the event.
    */
    type Computation = (node: RunningNode, event: ZEvent) => Effect<any>;

    type ZoneName = string;
    type NodeSpec = { [varName: string]: SourceState | DerivedState | Computation; }

    // Every zone must have atleast one channel with one other zone so that the entire system is connected
    type ChannelType = "websocket";
    type Channel = { type: ChannelType; fromZone: ZoneName; toZone: ZoneName; }

    type AppSpec = {
    zones: { [zoneName: ZoneName]: NodeSpec; };
    channels: Channel[]
    }

    type Connection = {
    fromNodeId: string;
    fromZone: ZoneName;
    toNodeId: string;
    toZone: ZoneName;
    channelType: ChannelType;
    isActive: boolean;
    latency: number;
    }

    type MsgQueue<VT> = Effect<VT>[];

    type RunningNode = {
    id: string;
    spec: NodeSpec;
    state: {[name: string]: any};
    zone: ZoneName;
    signingSecret: string;
    incomingQueue: MsgQueue<any>;
    outgoingQueue: MsgQueue<any>;
    getTimestamp: () => number;
    }

    /* This is the type for an actual distributed system instantiated from an AppSpec */
    type RunningApp = {
    zones: { [zoneName: ZoneName]: RunningNode[]; };
    connections: Connection[];
    }

    const initState = (nodeId: string, crdtType: CrdtType, valueType: ValueType, init?: Function) => {
    let init_for_derived_state = (name: string) => { console.log("init_for_derived_state", name)};

    let initializer: Function = init || init_for_derived_state;
    if(crdtType == "LWWRegister") {
    switch(valueType) {
    case "boolean": return new LWWRegister<boolean>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
    case "string": return new LWWRegister<string>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
    case "number": return new LWWRegister<number>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
    case "object": return new LWWRegister<object>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
    case "array": return new LWWRegister<any[]>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
    }
    } else if (crdtType == "LWWMap") {
    return LWWMap.fromState<any>(nodeId, new Map());
    } else if (crdtType == "GSet") {
    switch(valueType) {
    case "boolean": return new GSet<boolean>(new Set(initializer()));
    case "string": return new GSet<string>(new Set(initializer()));
    case "number": return new GSet<number>(new Set(initializer()));
    case "object": return new GSet<object>(new Set(initializer()));
    case "array": return new GSet<any[]>(new Set(initializer()));
    }
    }
    }

    const nodeSpecToState = (nodeId: string, nodeSpec: NodeSpec) => {
    let stateVars: [string, (SourceState | DerivedState)][] = Object.entries(nodeSpec)
    .filter(([k,v]) => typeof(v) != "function") as [string, (SourceState | DerivedState)][];

    const stateObject: {[name: string]: any} = {};
    for (const [k, v] of stateVars) {
    // Pass nodeId for LWWRegister
    stateObject[k] = initState(nodeId, v.crdtType, v.valueType, 'init' in v && v.init);
    }
    return stateObject;
    }

    const initNode = (zoneName: ZoneName, nodeSpec: NodeSpec) : RunningNode => {
    let nodeId = globalThis.crypto.randomUUID();
    return {
    spec: nodeSpec,
    state: nodeSpecToState(nodeId, nodeSpec),
    zone: zoneName,
    id: nodeId,
    signingSecret: globalThis.crypto.randomUUID(),
    incomingQueue: [] as MsgQueue<any>,
    outgoingQueue: [] as MsgQueue<any>,
    getTimestamp: () => { return Date.now() }
    }
    }

    const establishConnection = (fromNode: RunningNode, toNode: RunningNode, channelType: ChannelType) : Connection => {
    return {
    fromNodeId: fromNode.id,
    fromZone: fromNode.zone,
    toNodeId: toNode.id,
    toZone: toNode.zone,
    channelType: channelType,
    isActive: true,
    latency: 0.1
    }
    };

    const startApp = (app: AppSpec) : RunningApp => {
    // TODO: Remove hard-coding for todoApp
    // start at least one node in each zone
    let dbNode = initNode("db", app.zones.db);
    let serverNodes = [
    initNode("server", app.zones.server),
    initNode("server", app.zones.server)
    ];
    let clientNodes = [
    initNode("client", app.zones.client),
    initNode("client", app.zones.client)
    ];

    // establish channels to setup sessions
    let conn1 = establishConnection(dbNode, serverNodes[0], "websocket")
    let conn2 = establishConnection(serverNodes[0], clientNodes[0], "websocket")
    let conn3 = establishConnection(serverNodes[0], clientNodes[1], "websocket")

    // TODO: Build dependency graph for auto-pushing reactive updates

    return {
    zones: {
    db: [dbNode],
    server: serverNodes,
    client: clientNodes
    },
    connections: [conn1, conn2, conn3]
    }
    }

    // Example: Spec for a 3-tier todo app
    // db is responsible for persistence
    // server is needed for access-control and expensive computations
    // client is where user-interaction happens which generates effects
    // The framework will implement compatibility of effects with the destination CRDT types

    // challenge exercise:
    // (0) Show that computational work is done only where necessary, eg: for connected clients only
    // (1) model multi-layer client storage (RAM -> localStorage/SQLite)
    // (2) model database read replicas that take the burden off the master instance which still handles all write queries
    // (3) Show computations on server using a different programming language than the client
    // (4) Show example of long-running operations which send progress updates to the client
    // (5) Show that incompatibility of effects and target data type is detected and raises errors
    // (6) Show that access-control works as expected and no data leakage occurs

    const todo: AppSpec = {
    zones: {
    db: {
    all_todos: {
    crdtType: "LWWMap",
    valueType: "object",
    init: () => (new Map())
    },
    calculate_user_count: (dbNode: RunningNode, event: ZEvent) => ({
    nodeId: event.nodeId,
    path: '/server/user_count',
    msg: {
    nodeId: dbNode.id,
    timestamp: dbNode.getTimestamp(),
    value: dbNode.state.all_todos.countKeys()
    }
    })
    },
    server: {
    user_count: {
    crdtType: "LWWRegister",
    valueType: "number",
    computer: '/db/calculate_user_count',
    depends_on: ['/db/all_todos']
    },
    filter_todos_for_user: (serverNode: RunningNode, event: ZEvent) => ({
    nodeId: event.nodeId,
    path: '/client/my_todos',
    msg: {
    nodeId: serverNode.id,
    timestamp: serverNode.getTimestamp(),
    value: serverNode.state.all_todos.getValueForKey(event.session.claims.user_id) || []
    }
    }),
    },
    client: {
    my_todos: {
    crdtType: "LWWRegister",
    valueType: "array",
    computer: '/server/filter_todos_for_user',
    depends_on: ['/db/all_todos']
    },
    task_input: {
    crdtType: "LWWRegister",
    valueType: "string",
    init: () => ""
    },
    ui: {
    crdtType: "LWWMap",
    valueType: "object",
    computer: '/client/render',
    depends_on: ['/client/my_todos']
    },
    render: (clientNode: RunningNode, event: ZEvent) => ({
    nodeId: event.nodeId,
    path: '/client/ui',
    msg: new Map<string, LWWRegisterState<any>>()
    })
    }
    },
    channels: [
    {type: "websocket", fromZone: "server", toZone: "client"},
    {type: "websocket", fromZone: "db", toZone: "server"},
    ]
    }

    let app = startApp(todo); // starts all the nodes and sets up the network to start ticking

    console.log({app});

    const injectLocalMutation = (app: RunningApp, nodeId: string, path: string, fn: (crdt: CRDT<any, any>) => void) => {
    let zone = path.split("/")[0];
    let node = app.zones[zone].find((n) => n.id == nodeId);
    // TODO: Complete this
    }

    const tick = (app: RunningApp) => {
    // TODO: Process msgs in the queues
    }

    const simulate = (app: RunningApp) => {
    // TODO: Simulate user-interaction and system ticks here
    }