Skip to content

Instantly share code, notes, and snippets.

@kachayev
Last active October 7, 2015 09:57
Show Gist options
  • Select an option

  • Save kachayev/3146759 to your computer and use it in GitHub Desktop.

Select an option

Save kachayev/3146759 to your computer and use it in GitHub Desktop.
Channels-driven concurrency with Clojure
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (1) Generator: function that returns the channel
(use 'lamina.core)
(defn boring
[name]
(let [ch (channel)]
;; future will run separately from main control flow
(future
;; emit string message five times with random delay
(dotimes [_ 5]
(let [after (int (rand 500))]
(Thread/sleep after)
(enqueue ch (str name ": I'm boring after " after)))))
;; return the channel to caller
ch))
;; With single instance
(let [joe (boring "Joe")]
(doseq [msg (lazy-channel-seq (take* 5 joe))] (println msg)))
(println "You're boring: I'm leaving.")
;; Process all messages from channel
;; Please, note this notation is asynchronous, so...
(let [joe (boring "Joe")] (receive-all joe println))
;; you will see this message first :)
(println "You're boring: I'm leaving.")
;; More instances...
;; Actually, this is little bit tricky and it's definitely other
;; mechanism than we use in Go for this example. It's more
;; likely what we do in "#2 Fan-in" code examples.
(let [joe (boring "Joe") ann (boring "Ann") chs (channel)]
(doseq [ch [joe ann]] (join ch chs))
(receive-all chs println))
(println "You're boring: I'm leaving.")
;; More instances...
;; Read from one channel, than - from second
(let [joe (boring "Joe") ann (boring "Ann")]
(loop []
(doseq [ch [joe ann]]
(when-not (or (drained? ch) (closed? ch))
(println @(read-channel ch))))
(recur)))
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (2) Fan-in
;; "Hand-made" one
(defn fan-in
[input1 input2]
(let [ch (channel) pusher (partial enqueue ch)]
(doseq [x [input1 input2]] (receive-all x pusher)) ch))
;; Or any count of inputs instead of just 2
(defn fan-in
[& inputs]
(let [ch (channel) pusher (partial enqueue ch)]
(doseq [x inputs] (receive-all x pusher)) ch))
;; Or more "clojurian" approach with join
(defn fan-in
[& inputs]
(let [ch (channel)] (doseq [x inputs] (join x ch)) ch))
;; Do printing only 10 times
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
(receive-all (take* 10 ch) println))
;; Or any times something will be pushed to channel
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println))
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (4) Timeouts
;; To test timeouts let add one line into boring generator
(defn boring
[name]
(let [ch (channel)]
;; future will run separately from main control flow
(future
;; emit string message five times with random delay
(dotimes [_ 5]
(let [after (int (rand 100))]
(Thread/sleep after)
(enqueue ch (str name ": I'm boring after " after))))
(close ch)) ;; <--- Here. Let's close channel after 5 messages.
;; return the channel to caller
ch))
;; Timeout for whole conversation
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
;; note 3rd param for lazy-channel-seq function
(doseq [msg (lazy-channel-seq (take* 10 ch) 500)] (println msg)))
(println "You're boring: I'm leaving.")
;; Timeout for each message
;; There are multiple ways of how to perform such functionality.
;; Here you can find approach using (with-timeout _) wrapper for ResultChannel.
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
(loop []
(when-not (or (closed? ch) (drained? ch))
(println @(with-timeout 50 (read-channel ch))) (recur))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment