# Dual State Engine Architecture A template for building deterministic, verifiable systems with parallel Go/JavaScript implementations. ## Core Concept Build complex stateful systems where **state is a function of events**: ``` State(t) = fold(apply, initialState, events[0..t]) ``` By implementing identical logic in Go (server) and JavaScript (client), you get: - **Determinism**: Same seed + same actions = same outcome - **Verification**: Server replays client-claimed outcomes - **Offline-first**: Client runs full logic locally, syncs events later - **Testability**: Run thousands of simulations to find edge cases ## Quick Start ```bash make build # Build Go binary make run # Server at http://localhost:8080 make test # Verify Go/JS parity ``` ## Project Structure ``` project/ ├── engine/ # Go state engine (authoritative) │ ├── state.go # Core state, action processing │ ├── events.go # Event types and logging │ ├── processor.go # Action → Event pipeline │ └── projections.go # Derived views from events ├── server/ # HTTP/WebSocket server ├── streams/ # Reactive event bus └── cmd/ ├── main.go # Entry point └── client/ # Browser client (JS mirror) ├── state.js # State logic (mirrors state.go) ├── events.js # Event handling └── app.js # UI, WebSocket, rendering ``` ## Architecture ### Event Flow ``` Action → ProcessAction() → State Mutation → Event Logged → Broadcast ↓ SQLite Store ↓ Projections (analytics, reports, replays) ``` ### Key Principles 1. **Actions are the source of truth** — Not state snapshots. Reconstruct from `(seed, actions[])`. 2. **Separate RNG streams** — Deterministic random with isolated seeds for different subsystems. 3. **Monadic pipelines** — Clean decision logic using Option/Maybe patterns. 4. **Reactive event bus** — Pub/sub decouples subsystems. ## Seeded Determinism Use the same PRNG algorithm in both languages. Mulberry32 works well: ```go // Go - engine/random.go func (s *State) deterministicRandom() float64 { s.RandSeed += 0x6D2B79F5 t := s.RandSeed t = (t ^ (t >> 15)) * (t | 1) t ^= t + (t^(t>>7))*(t|61) return float64((t ^ (t >> 14)) & 0x7FFFFFFF) / float64(0x7FFFFFFF) } ``` ```javascript // JavaScript - client/state.js deterministicRandom() { this.randSeed = (this.randSeed + 0x6D2B79F5) >>> 0; let t = this.randSeed; t = Math.imul(t ^ (t >>> 15), t | 1); t = (t ^ (t + Math.imul(t ^ (t >>> 7), t | 61))) >>> 0; return ((t ^ (t >>> 14)) & 0x7FFFFFFF) / 0x7FFFFFFF; } ``` ## Monadic Design Patterns Use `samber/mo` for Option types and `samber/lo` for functional pipelines: ```go import ( "github.com/samber/lo" "github.com/samber/mo" ) // Option types for clean control flow type Result = mo.Option[ActionType] // tryAction creates a Result from a condition func tryAction(cond bool, action ActionType) Result { if cond { return mo.Some(action) } return mo.None[ActionType]() } // firstAction returns first Some from lazy thunks (short-circuit evaluation) func firstAction(thunks ...func() Result) Result { for _, thunk := range thunks { if result := thunk(); result.IsPresent() { return result } } return mo.None[ActionType]() } // Functional pipelines with lo func processEntities(entities []*Entity, px, py int) []entityView { return lo.Map(entities, func(e *Entity, _ int) entityView { return entityView{entity: e, dist: manhattan(e.X, e.Y, px, py)} }) } // Filter and transform chains func findTargets(entities []*Entity, criteria func(*Entity) bool) []*Entity { return lo.Filter(entities, func(e *Entity, _ int) bool { return criteria(e) }) } // Decision pipeline using monadic composition func decideAction(state *State) ActionType { return firstAction( func() Result { return tryHandleUrgent(state) }, func() Result { return tryHandleNormal(state) }, func() Result { return tryHandleFallback(state) }, ).OrElse(ActionWait) } ``` ### JavaScript Equivalent ```javascript // Option monad in JS const some = (value) => ({ isPresent: true, value }); const none = () => ({ isPresent: false }); const tryAction = (cond, action) => cond ? some(action) : none(); const firstAction = (...thunks) => { for (const thunk of thunks) { const result = thunk(); if (result.isPresent) return result; } return none(); }; // Functional pipelines const withDistance = (entities, px, py) => entities.map(e => ({ entity: e, dist: manhattan(e.x, e.y, px, py) })); const filterBy = (arr, predicate) => arr.filter(predicate); // Decision pipeline const decideAction = (state) => firstAction( () => tryHandleUrgent(state), () => tryHandleNormal(state), () => tryHandleFallback(state), ).value ?? 'wait'; ``` ## Reactive Streams Use pub/sub for event distribution. This decouples subsystems and enables spectators, logging, and analytics. ```go // streams/streams.go package streams import ( "context" "sync" ) // Event represents a domain event type Event struct { Type string SessionID string Timestamp int64 Data map[string]interface{} } // EventStream provides reactive event streaming type EventStream struct { mu sync.RWMutex subscribers []chan Event ctx context.Context cancel context.CancelFunc } func NewEventStream() *EventStream { ctx, cancel := context.WithCancel(context.Background()) return &EventStream{ subscribers: make([]chan Event, 0), ctx: ctx, cancel: cancel, } } // Publish sends an event to all subscribers func (s *EventStream) Publish(event Event) { s.mu.RLock() defer s.mu.RUnlock() for _, ch := range s.subscribers { select { case ch <- event: default: // Drop if subscriber is slow (backpressure) } } } // Subscribe returns a channel for receiving events func (s *EventStream) Subscribe() chan Event { ch := make(chan Event, 100) s.mu.Lock() s.subscribers = append(s.subscribers, ch) s.mu.Unlock() return ch } // Unsubscribe removes a channel from subscribers func (s *EventStream) Unsubscribe(ch chan Event) { s.mu.Lock() defer s.mu.Unlock() for i, c := range s.subscribers { if c == ch { s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...) close(ch) return } } } func (s *EventStream) Close() { s.cancel() s.mu.Lock() defer s.mu.Unlock() for _, ch := range s.subscribers { close(ch) } s.subscribers = nil } ``` ### Reactive Operators ```go import "github.com/samber/lo" // FilterBySession returns events for a specific session func FilterBySession(sessionID string) func(Event) bool { return func(e Event) bool { return e.SessionID == sessionID } } // FilterByTypes returns events matching any of the given types func FilterByTypes(types ...string) func(Event) bool { typeSet := lo.SliceToMap(types, func(t string) (string, bool) { return t, true }) return func(e Event) bool { _, ok := typeSet[e.Type] return ok } } // Usage ch := eventStream.Subscribe() for event := range ch { if FilterBySession("session-123")(event) { // Handle session-specific event } } ``` ### Event Bus (Centralized Hub) ```go // EventBus is the central hub for all events type EventBus struct { events *EventStream wsStreams map[string]*WSMessageStream mu sync.RWMutex } func NewEventBus() *EventBus { return &EventBus{ events: NewEventStream(), wsStreams: make(map[string]*WSMessageStream), } } func (b *EventBus) Events() *EventStream { return b.events } func (b *EventBus) BroadcastToSession(sessionID string, msg WSMessage) { b.mu.RLock() defer b.mu.RUnlock() for _, stream := range b.wsStreams { stream.Send(msg) } } ``` ## Reactive WebSocket Handler Use the **pump pattern** for WebSocket connections: separate goroutines for different concerns, unified by context-based lifecycle management. ```go // server/ws_handler.go package server import ( "context" "encoding/json" "sync" "sync/atomic" "github.com/gorilla/websocket" ) var connectionID atomic.Int64 // ReactiveWSHandler handles a websocket connection with reactive streams type ReactiveWSHandler struct { server *Server conn *websocket.Conn connID string session *Session ctx context.Context cancel context.CancelFunc writeMu sync.Mutex // Protects websocket writes } func NewReactiveWSHandler(s *Server, conn *websocket.Conn) *ReactiveWSHandler { ctx, cancel := context.WithCancel(context.Background()) id := connectionID.Add(1) return &ReactiveWSHandler{ server: s, conn: conn, connID: fmt.Sprintf("ws-%d", id), ctx: ctx, cancel: cancel, } } ``` ### The Pump Pattern Each WebSocket connection runs multiple goroutines ("pumps") for different responsibilities: ```go // Handle starts all pumps and manages connection lifecycle func (h *ReactiveWSHandler) Handle(r *http.Request) { defer h.cleanup() // Restore session from cookie if exists if cookie, err := r.Cookie("session"); err == nil { h.session = h.server.restoreSession(cookie.Value) if h.session != nil { h.sendState() } } // Start pumps concurrently go h.eventSubscriptionPump() // Subscribe to domain events go h.broadcastPump() // Forward messages to client // Blocking: process incoming messages h.incomingPump() } ``` ### Incoming Pump (Client → Server) Reads messages from WebSocket, dispatches to handlers: ```go func (h *ReactiveWSHandler) incomingPump() { for { select { case <-h.ctx.Done(): return default: _, data, err := h.conn.ReadMessage() if err != nil { return // Connection closed } var msg Message if err := json.Unmarshal(data, &msg); err != nil { h.sendError("invalid_json", "Invalid message format") continue } h.handleMessage(msg) } } } func (h *ReactiveWSHandler) handleMessage(msg Message) { switch msg.Type { case "ping": h.sendPong() case "join": var payload JoinPayload json.Unmarshal(msg.Payload, &payload) h.session = h.server.createSession(payload.Seed) h.sendState() case "action": if h.session == nil { h.sendError("no_session", "Join first") return } var payload ActionPayload json.Unmarshal(msg.Payload, &payload) h.processAction(payload.Action) case "spectate": var payload SpectatePayload json.Unmarshal(msg.Payload, &payload) h.spectateSession(payload.SessionID) } } ``` ### Event Subscription Pump (Domain Events → Client) Subscribes to the event bus and forwards relevant events: ```go func (h *ReactiveWSHandler) eventSubscriptionPump() { // Subscribe to domain events eventCh := h.server.eventBus.Events().Subscribe() defer h.server.eventBus.Events().Unsubscribe(eventCh) for { select { case <-h.ctx.Done(): return case event, ok := <-eventCh: if !ok { return } // Filter: only events for our session if h.session == nil || event.SessionID != h.session.ID { continue } // Broadcast state on significant events (for spectators) switch event.Type { case "state_change", "action_processed": h.sendState() } } } } ``` ### Thread-Safe Writes WebSocket writes must be serialized: ```go func (h *ReactiveWSHandler) writeMessage(msg Message) { h.writeMu.Lock() defer h.writeMu.Unlock() data, err := json.Marshal(msg) if err != nil { return } h.conn.WriteMessage(websocket.TextMessage, data) } func (h *ReactiveWSHandler) sendState() { if h.session == nil { return } h.session.mu.Lock() state := h.session.State.GetSnapshot() h.session.mu.Unlock() payload, _ := json.Marshal(state) h.writeMessage(Message{ Type: "state", Payload: payload, Timestamp: time.Now().UnixMilli(), }) } func (h *ReactiveWSHandler) sendError(code, message string) { payload, _ := json.Marshal(map[string]string{ "code": code, "message": message, }) h.writeMessage(Message{Type: "error", Payload: payload}) } ``` ### Action Processing with Event Publishing Actions mutate state and publish events: ```go func (h *ReactiveWSHandler) processAction(action string) { h.session.mu.Lock() h.session.State.ProcessAction(ActionType(action)) h.session.mu.Unlock() // Publish to event bus for reactive broadcasts h.server.eventBus.Events().Publish(Event{ Type: "action_processed", SessionID: h.session.ID, Timestamp: time.Now().UnixMilli(), Data: map[string]interface{}{ "action": action, }, }) h.sendState() } ``` ### Graceful Cleanup Context cancellation triggers coordinated shutdown: ```go func (h *ReactiveWSHandler) cleanup() { h.cancel() // Signals all pumps to exit h.conn.Close() } ``` ### WebSocket Protocol ```json // Client → Server {"type": "join", "payload": {"seed": 12345}} {"type": "action", "payload": {"action": "submit"}} {"type": "spectate", "payload": {"session_id": "abc123"}} {"type": "ping"} // Server → Client {"type": "state", "payload": {...}, "timestamp": 1704067200000} {"type": "error", "payload": {"code": "no_session", "message": "Join first"}} {"type": "pong", "timestamp": 1704067200000} {"type": "config", "payload": {"version": "1.0.0"}} ``` ### Session Restoration Sessions survive page reloads via cookies + persistence: ```go func (h *ReactiveWSHandler) Handle(r *http.Request) { // Check for existing session from HttpOnly cookie if cookie, err := r.Cookie("session_id"); err == nil { // Try memory first if session := h.server.getSession(cookie.Value); session != nil { h.session = session } else { // Fall back to SQLite (survives server restart) h.session = h.server.restoreFromDB(cookie.Value) } if h.session != nil { h.sendConfig() h.sendState() } } // ... start pumps } ``` ### Spectator Support Multiple clients can watch the same session: ```go func (h *ReactiveWSHandler) spectateSession(sessionID string) { session := h.server.getSession(sessionID) if session == nil { // Find any active session session = h.server.getAnyActiveSession() } if session != nil { h.session = session h.sendState() } else { h.sendError("no_session", "No active session to spectate") } } ``` ## Event Sourcing ### Event Store Schema ```sql CREATE TABLE events ( id INTEGER PRIMARY KEY, time DATETIME, session_id TEXT, sequence INTEGER, event_type TEXT, entity_id TEXT, data JSON, version TEXT ); CREATE TABLE sessions ( session_id TEXT PRIMARY KEY, seed INTEGER, created_at DATETIME, last_activity DATETIME ); -- Projections (derived from events) CREATE TABLE projections ( session_id TEXT, projection_type TEXT, data JSON, updated_at DATETIME ); ``` ### Verification Protocol Clients claim outcomes, servers verify by replay: ```go func VerifyAndRecord(seed int64, actions []ActionType, claim Claim) (*Result, error) { state := NewState(seed) for _, action := range actions { state.ProcessAction(action) } if state.Outcome != claim.Outcome { return nil, ErrClaimMismatch } // Record verified result return &Result{Verified: true, State: state}, nil } ``` ### Projections Derive multiple views from the same event stream: ```go // Aggregate events into analytics type Stats struct { TotalActions int ActionCounts map[string]int } func AggregateStats(events []Event) Stats { stats := Stats{ActionCounts: make(map[string]int)} for _, e := range events { stats.TotalActions++ stats.ActionCounts[e.Type]++ } return stats } ``` ## Working With This Architecture ### Adding Features 1. Add to Go engine first (`engine/`) 2. Mirror in JavaScript client (`cmd/client/`) 3. Ensure determinism: same inputs → same outputs 4. Add event types if needed ### Testing Determinism ```go func TestDeterminism(t *testing.T) { seed := int64(12345) actions := []ActionType{ActionA, ActionB, ActionC} // Run twice with same inputs state1 := NewState(seed) state2 := NewState(seed) for _, action := range actions { state1.ProcessAction(action) state2.ProcessAction(action) } if state1.Hash() != state2.Hash() { t.Error("Determinism violation: same inputs produced different states") } } ``` ### Cross-Language Parity Tests ```bash # Generate reference outputs from Go go run ./cmd/testgen > testcases.json # Verify JavaScript produces same outputs node cmd/client/test.js testcases.json ``` ## Dependencies ### Go ```go require ( github.com/samber/lo v1.47.0 // Functional utilities github.com/samber/mo v1.13.0 // Monads (Option, Result, Either) github.com/gorilla/websocket v1.5.3 github.com/mattn/go-sqlite3 v1.14.24 ) ``` ### JavaScript No external dependencies required. Implement monadic helpers inline or use: - `lodash/fp` for functional utilities - Custom Option implementation (shown above) ## Design Lessons 1. **Dual implementation catches bugs** — Differences between Go and JS reveal hidden assumptions. 2. **Seeded RNG is essential** — Without determinism, replay and verification are impossible. 3. **Events decouple subsystems** — Components don't know about each other; they just emit events. 4. **Functional pipelines are debuggable** — Monadic composition makes decision logic traceable. 5. **Projections are cheap** — Derive multiple views (stats, replays, heatmaps) from one event stream. 6. **Actions, not snapshots** — Store the minimal source of truth; reconstruct state on demand.