Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created October 18, 2022 20:03
Show Gist options
  • Select an option

  • Save hiredman/19e01f112b52e3a967af597262de3a81 to your computer and use it in GitHub Desktop.

Select an option

Save hiredman/19e01f112b52e3a967af597262de3a81 to your computer and use it in GitHub Desktop.

Revisions

  1. hiredman created this gist Oct 18, 2022.
    644 changes: 644 additions & 0 deletions cml.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,644 @@
    (import '(java.util ArrayList
    List)
    '(java.util.concurrent.locks ReentrantLock)
    '(java.util.concurrent.atomic AtomicReference
    AtomicReferenceArray))

    (set! *warn-on-reflection* true)
    (set! *unchecked-math* :warn-on-boxed)

    ;; TODO: should you be able to sync multiple times on the same event
    ;; object, seems like no?.
    (defprotocol Event
    (try-event [event resume control nack-group]
    "try to sync this event")
    ;; TODO split this out
    (check-nack-group [event nack-group]))

    (defprotocol Syncable
    (-sync [event]))

    (defprotocol QuasiEvent
    (push-down [event ctor lst]))

    (extend-type nil
    Event
    (try-event [event resume control nack-group])
    (check-nack-group [event nack-group])
    QuasiEvent
    (push-down [event ctor lst]
    (when-let [evt (ctor event)]
    (.add ^java.util.List lst evt)))
    Syncable
    (-sync [event]))

    (extend-type Object
    QuasiEvent
    (push-down [event ctor lst]
    (.add ^List lst (ctor event)))
    Syncable
    (-sync [event]
    ;; wrap list access in a volatile to ensure ordering, may not be
    ;; required.
    (let [lst (volatile! (ArrayList.))
    control (atom :waiting)]
    (push-down event identity @lst)
    (vreset! lst @lst)
    (letfn [(resume-f [event value nack-group]
    ;; resume-f could end up invoked on another thread,
    ;; hence the volatile
    (doseq [e @lst]
    (check-nack-group e nack-group)))]
    (doseq [evt @lst
    :when evt]
    (try-event evt resume-f control #{}))))))


    (def always
    (reify
    Event
    (try-event [event resume control nack-group]
    (let [k (Object.)
    f (fn [& _]
    (when (= @control :synced)
    (remove-watch control k))
    (when (compare-and-set! control :waiting :synced)
    (resume event true nack-group)))]
    (add-watch control k f)
    (f)))
    (check-nack-group [event nack-group])))

    (extend-type java.util.concurrent.CompletionStage
    Event
    (try-event [^java.util.concurrent.CompletionStage event resume control nack-group]
    (.handle event
    (reify
    java.util.function.BiFunction
    (apply [_ result exception]
    (let [k (Object.)
    f (fn [& _]
    (when (= @control :synced)
    (remove-watch control k))
    (when (compare-and-set! control :waiting :synced)
    (resume event (or exception result) nack-group)))]
    (add-watch control k f)
    (f))))))
    (check-nack-group [event nack-group]))

    (declare middleware)

    ;; Middleware wraps an event, passing protocols callls down with the
    ;; opportunity to intercept and make changes. The middleware smart
    ;; constructor can collapse multiple middlewares into a single
    ;; middleware via function composaition.
    (defrecord Middleware [try-event-transform
    check-nack-group-transform
    push-down-transform
    target]
    Event
    (try-event [_ resume control nack-group]
    ((try-event-transform try-event) target resume control nack-group))
    (check-nack-group [_ nack-group]
    ((check-nack-group-transform check-nack-group) target nack-group))
    QuasiEvent
    (push-down [event ctor lst]
    ((push-down-transform push-down)
    target
    (comp ctor #(middleware try-event-transform
    check-nack-group-transform
    push-down-transform
    %))
    lst)))

    (defn middleware
    [try-event-transform check-nack-group-transform push-down-transform target]
    (if (instance? Middleware target)
    (let [^Middleware target target]
    (middleware (comp try-event-transform (.-try-event-transform target))
    (comp check-nack-group-transform (.-check-nack-group-transform target))
    (comp push-down-transform (.-push-down-transform target))
    (.-target target)))
    (->Middleware try-event-transform check-nack-group-transform push-down-transform target)))

    (defn barrier
    "Given a function f, applies f to a function g and an event e. e becomes
    enabled when g is invoked. returns the result of invoking f."
    [f]
    (let [root (AtomicReference.)
    done (Object.)
    event (middleware
    (fn [event-transform]
    (fn [event resume control nack-group]
    (add-watch control (Object.) (fn [k r os ns]
    (when (= ns :synced)
    (loop [^AtomicReferenceArray r (.get root)]
    (when r
    (when-not (identical? r done)
    (if (identical? control (.get r 1))
    (doto r
    (.set 0 nil)
    (.set 1 nil)
    (.set 2 nil))
    (recur (.get r 3))))))
    (remove-watch r k))))
    (let [cell (doto (AtomicReferenceArray. 4)
    (.set 0 resume)
    (.set 1 control)
    (.set 2 nack-group))]
    (loop [r (.get root)]
    (if (identical? r done)
    (loop []
    (if (compare-and-set! control :waiting :synced)
    (resume event nil nack-group)
    (when-not (= @control :synced)
    (recur))))
    (do
    (.set cell 3 r)
    (when-not (.compareAndSet root r cell)
    (recur (.get root)))))))))
    (fn [check-nack-group]
    (fn [event nack-group]))
    identity
    nil)]
    (f event
    (fn []
    (loop [^AtomicReferenceArray r (.get root)]
    (when-not (identical? r done)
    (if-not (.compareAndSet root r done)
    (recur (.get root))
    (loop [r r]
    (when r
    (let [resume (.get r 0)
    control (.get r 1)
    nack-group (.get r 2)]
    (when (and resume control nack-group)
    (loop []
    (if (compare-and-set! control :waiting :synced)
    (resume event nil nack-group)
    (when-not (= @control :synced)
    (recur)))))
    (recur (.get r 3))))))))))))

    (defn choose
    "Takes a list of events, and non-deterministically selects one on synchronization"
    [evts]
    (middleware
    identity
    identity
    (fn [push-down]
    (fn [event ctor ^List lsg]
    (doseq [e (shuffle evts)]
    (push-down e ctor lsg))))
    nil))

    (defn nack
    "Like guard, but f is passed an event G. G will be enabled if on
    synchronization, the chosen event is not the one returned by f."
    [f]
    (middleware
    identity
    identity
    (fn [push-down]
    (fn [event ctor ^List lsg]
    (barrier
    (fn [b break]
    (let [e (f b)
    id (Object.)]
    (push-down
    e
    (comp ctor
    (fn g [evt]
    (middleware
    (fn [try-event]
    (fn [event resume control nack-group]
    (try-event event resume control (conj nack-group id))))
    (fn [check-nack-group]
    (fn [event nack-group]
    (when-not (contains? nack-group id)
    (break))
    (check-nack-group event nack-group)))
    identity
    evt)))
    lsg))))))
    nil))

    (defn guard
    "Takes a function of no arguments f, returns an event E, before E is
    synchronized on, f will be invoked, and E replaced with the event
    returned by f."
    [f]
    (middleware
    identity
    identity
    (fn [push-down]
    (fn [event ctor ^List lst]
    (push-down (f) ctor lst)))
    nil))

    (defn wrap
    "Wraps the post synchronization action f around the event evt"
    [evt f]
    (middleware
    (fn [try-event]
    (fn [evt resume control nack-group]
    (try-event evt
    (fn [event value nack-group]
    (resume event (f value) nack-group))
    control
    nack-group)))
    identity
    identity
    evt))

    (defonce scheduler
    (delay
    (java.util.concurrent.Executors/newScheduledThreadPool
    1
    (reify
    java.util.concurrent.ThreadFactory
    (newThread [_ r]
    ;; TODO name
    (doto (Thread. r)
    (.setDaemon true)))))))

    (defn timeout [delay]
    (let [now (System/nanoTime)
    n (* 1000000 (long delay))
    deadline (+ n now)]
    (barrier
    (fn [b break]
    (nack
    (fn [neg]
    (let [now (System/nanoTime)
    delay (- deadline now)]
    (if (pos? delay)
    (let [f (.schedule ^java.util.concurrent.ScheduledExecutorService @scheduler
    ^Runnable break
    delay
    java.util.concurrent.TimeUnit/NANOSECONDS)]
    (-sync (wrap neg (fn [_] (future-cancel f))))
    b)
    always))))))))

    (deftype Channel [fields])

    (defn channel []
    (->Channel
    (doto (object-array 7)
    (aset 0 (ReentrantLock.))
    (aset 1 false)
    (aset 2 (java.util.HashMap.))
    (aset 3 nil) ; writers head
    (aset 4 nil) ; writers tail
    (aset 5 nil) ; readers head
    (aset 6 nil) ; readers tail
    )))

    (defn close! [^Channel channel]
    (let [^objects channel (.-fields channel)
    ^ReentrantLock lock (aget channel 0)
    _ (.lock lock)
    ^java.util.HashMap controls (aget channel 2)]
    (if-not (aget channel 1)
    (do
    (aset channel 1 true)
    (let [items (into [] (vals controls))]
    (.unlock lock)
    (doseq [^objects item items
    :let [r (aget item 0)
    c (aget item 1)
    n (aget item 2)
    v (aget item 3)]
    :when (compare-and-set! c :waiting :claimed)]
    (compare-and-set! c :claimed :synced)
    (r nil nil n))))
    (.unlock lock))))

    (defn exchange [^Channel channel writer-index reader-index value]
    (let [^objects channel (.-fields channel)
    writer-index (long writer-index)
    reader-index (long reader-index)
    ^ReentrantLock lock (aget channel 0)
    ^java.util.HashMap controls (aget channel 2)]
    (middleware
    (fn [try-event]
    (fn [event resume control nack-group]
    ;; TODO rename clean-up
    (letfn [(clean-up [k r os ns]
    (when (= ns :waiting)
    (.lock lock)
    (let [^objects item (aget channel reader-index)]
    (if item
    (let [r (aget item 0)
    c (aget item 1)
    n (aget item 2)
    v (aget item 3)]
    (if (identical? c control)
    (do
    (.unlock lock)
    ;; TODO figure out how this should really work
    (throw (ex-info "Same operation on same channel?" {})))
    (if (compare-and-set! c :waiting :claimed)
    (if (compare-and-set! control :waiting :claimed)
    (do
    (.unlock lock)
    (compare-and-set! c :claimed :synced)
    (compare-and-set! control :claimed :synced)
    (r event value n)
    (resume event v nack-group))
    (do
    (compare-and-set! c :claimed :waiting)
    (.unlock lock)))
    (.unlock lock))))
    (if (aget channel 1)
    (do
    (.unlock lock)
    (when (compare-and-set! control :waiting :synced)
    ;; ?
    (resume event nil nack-group)))
    (.unlock lock)))))
    (when (= ns :synced)
    (.lock lock)
    (loop [^objects item (.get controls control)]
    (when item
    (when (aget item 4)
    (aset ^objects (aget item 4) 5 (aget item 5)))
    (when (aget item 5)
    (aset ^objects (aget item 5) 4 (aget item 4)))
    (when (aget item 6)
    (aset ^objects (aget item 6) 7 (aget item 7)))
    (when (aget item 7)
    (aset ^objects (aget item 7) 6 (aget item 6)))
    (when (identical? (.get controls control) item)
    (let [n (aget item 6)]
    (if n
    (.put controls control n)
    (.remove controls control))))
    (doseq [writer-index [writer-index reader-index]]
    (when (identical? (aget channel writer-index) item)
    (aset channel writer-index (aget item 4)))
    (when (identical? (aget channel (inc (long writer-index))) item)
    (aset channel writer-index (aget item 5)))
    (when (nil? (aget channel writer-index))
    (aset channel (inc (long writer-index)) nil)))
    (recur (aget item 6))))
    (.unlock lock)
    (when k
    (remove-watch r k))))]
    (.lock lock)
    (let [k (Object.)
    entry (doto (object-array 8)
    (aset 0 resume)
    (aset 1 control)
    (aset 2 nack-group)
    (aset 3 value)
    (aset 4 nil) ; next
    (aset 5 (aget channel (inc writer-index))) ; prev
    (aset 6 (.get controls control)) ; lst next
    (aset 7 nil) ; lst prev
    )]
    (when (nil? (aget channel writer-index))
    (aset channel writer-index entry))
    (when (aget channel (inc writer-index))
    (aset ^objects (aget channel (inc writer-index)) 4 entry))
    (aset channel (inc writer-index) entry)
    (when (some? (.get controls control))
    (aset ^objects (.get controls control) 7 entry))
    (.put controls control entry)
    (add-watch control k clean-up)
    (.unlock lock)
    (clean-up k control :waiting @control)))))
    identity
    identity
    nil)))

    (defn tx [channel value]
    (assert (some? value))
    (exchange channel 3 5 value))

    (defn rx [channel]
    (exchange channel 5 3 true))

    (defn sync!
    ([evt]
    (-sync evt))
    ([evt callback]
    (-sync (wrap evt callback))))

    (defn sync!! [evt]
    (let [p (promise)]
    (sync! evt p)
    @p))

    (defn poll! [evt success failure]
    (barrier
    (fn [b break]
    (sync!
    (choose (wrap evt success)
    (wrap b (fn [_] (failure)))))
    (break))))

    (defprotocol Consumer
    (consume [_ ch])
    (unconsume [_ ch])
    (unconsume-all [_]))

    (defprotocol Producer
    (subscribe [_ value ch])
    (unsubscribe [_ value ch])
    (unsubscribe-all [_]))

    (defmacro let-macro
    ([bindings body]
    (let [names (map first bindings)]
    `(let-macro ~(eval `(letfn ~bindings ~(into {} (map (fn [n] [(list 'quote n) n])) names)))
    ~identity
    ~body)))
    ([bindings k body]
    (cond (and (seq? body)
    (symbol? (first body))
    (contains? bindings (first body)))
    `(let-macro ~bindings ~k ~(apply (get bindings (first body)) (rest body)))
    (and (seq? body)
    (symbol? (first body))
    (= "quote" (name (first body))))
    body
    (and (or (vector? body) (seq? body)) (seq body))
    `(let-macro ~bindings
    ~(partial (fn f [i o v]
    (if (seq i)
    `(let-macro ~bindings
    ~(partial f (rest i) (conj o v))
    ~(first i))
    (k ((if (vector? body) identity seq) (conj o v)))))
    (rest body)
    [])
    ~(first body))
    (and (coll? body) (not (record? body)))
    `(let-macro ~bindings ~(fn [v]
    (k (into (empty body) v))) ~(seq body))
    :else
    (k body))))

    (defn pubsub [selector]
    (let [command (channel)]
    (let-macro [(let% [bindings body]
    (if (seq bindings)
    (let [[p v & bindings] bindings]
    `(~'bind% ~v (fn [~p] (~'let% ~bindings ~body))))
    body))
    (do% [a & things]
    (if (seq things)
    `(~'let% [_# ~a] (~'do% ~@things))
    a))]
    (letfn [(bind% [m b] (fn [k s] (m (fn [x s] ((b x) k s)) s)))
    (return% [v] (fn [k s] (k v s)))
    (call-cc% [f] (fn [k s] ((f (fn [value] (fn [_ s] (k value s)))) k s)))
    (sync% [evt] (fn d [k s] (sync! (wrap evt (fn [v] (k v s))))))
    (alter% [f & args] (fn [k s] (apply k (repeat 2 (apply f s args)))))
    (send-loop []
    (let% [state (alter% identity)
    _ (reduce
    (fn [x c]
    (let% [^long i x
    sent (sync% (tx c (:value-to-send state)))]
    (if sent
    (return% (inc i))
    (alter% update-in [:outputs (:selection-value state)] disj c))))
    (return% 0)
    (:output-chans state))]
    (do% (if (seq (get-in state [:outputs (:selection-value state)]))
    (return% nil)
    (alter% update-in [:outputs] dissoc (:selection-value state)))
    (main-loop))))
    (main-loop []
    (let% [state (alter% assoc
    :value-to-send nil
    :selection-value nil
    :output-chans nil
    :new-output-chans nil)
    [the-chan the-val] (sync%
    (choose
    (cons
    (wrap (rx command) (partial vector command))
    (for [i (:inputs state)]
    (wrap (rx i) (partial vector i))))))]
    (cond (= the-chan command)
    (let [[op & args] the-val]
    (case op
    :consume (do% (alter% update-in [:inputs] conj (first args))
    (main-loop))
    :unconsume (do% (alter% update-in [:inputs] disj (first args))
    (main-loop))
    :unconsume-all (do% (alter% update-in [:inputs] (constantly #{}))
    (main-loop))
    :subscribe (do% (alter% update-in [:ouputs (first args)]
    (fnil conj #{}) (second args))
    (main-loop))
    ;; TODO dissoc empty
    :unsubscribe (do% (alter% update-in [:ouputs (first args)] disj (second args))
    (main-loop))
    :unsubscribe-all (do% (alter% update-in [:ouputs] (constantly {}))
    (main-loop))
    :close-outputs (do% (return%
    (doseq [[_ outputs] (get state :outputs)
    output outputs]
    (close! output)))
    (alter% update-in [:ouputs] (constantly {}))
    (main-loop))))
    (some? the-val)
    (let% [d (return% (selector the-val))
    _ (alter% assoc
    :value-to-send the-val
    :selection-value d
    :output-chans (get-in state [:ouputs d])
    :new-output-chans #{})]
    (send-loop))
    :else
    (do% (alter% update-in [:inputs] disj the-chan)
    (main-loop)))))]
    ((main-loop)
    (fn [v _] v)
    {})))
    (reify
    Consumer
    (consume [_ ch]
    (tx command [:consume ch]))
    (unconsume [_ ch]
    (tx command [:unconsume ch]))
    (unconsume-all [_]
    (tx command [:unconsume-all]))
    Producer
    ;; TODO on close?
    (subscribe [_ value ch]
    (tx command [:subscribe value ch]))
    (unsubscribe [_ value ch]
    (tx command [:unsubscribe value ch]))
    (unsubscribe-all [_]
    (tx command [:unsubscribe-all]))
    java.io.Closeable
    (close [_]
    (tx command [:close-outputs])))))

    (defn pipe [in out]
    (let [p (pubsub (constantly nil))]
    (consume p in)
    (subscribe p nil out)))

    (assert (= 1
    (sync!!
    (choose
    [nil
    (choose [(barrier (fn [e _] e))])
    (barrier (fn [e _] e))
    (nack (fn [s]
    (sync! s (fn [_] (prn "not chosen")))
    (barrier (fn [e _] e))))
    (nack (fn [s]
    (sync! s (fn [_] (prn "A")))
    (choose
    [(nack (fn [s]
    (sync! s (fn [_] (prn "B")))
    (barrier (fn [e _] e))))
    (wrap (guard (fn [] (barrier (fn [e s] (s) e))))
    (constantly 1))])))]))))

    (println "here")
    (let [c (channel)]
    (sync! (tx c "Hello World") prn)
    (sync! (rx c) (fn [v] (prn v))))

    (let [to (timeout 1000)]
    (time (sync!! to))
    (time (sync!! to)))

    (println "===============")
    (let [p (pubsub (constantly nil))
    c1 (channel)
    c2 (channel)
    c3 (channel)]
    (sync! (consume p c1))
    (sync! (subscribe p nil c2))
    (sync! (subscribe p nil c3))
    (sync!! (tx c1 "Hello World"))
    (assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
    (assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
    )

    (sync!! (wrap always (fn [_] (println "Here"))))

    ;; (defn proposer [input acceptors]
    ;; (letfn [(main-loop [events ballot]
    ;; (sync!
    ;; (wrap
    ;; (choose (vals events))
    ;; (fn [msg]
    ;; (case (:op msg)
    ;; :propose (let [events (dissoc events :input)]
    ;; (for [a acceptors]
    ;; [a (wrap (tx a ...)
    ;; (fn []))])))))))]
    ;; (main-loop
    ;; {:input (rx input)}
    ;; 0)))