Last active
October 7, 2015 09:57
-
-
Save kachayev/3146759 to your computer and use it in GitHub Desktop.
Channels-driven concurrency with Clojure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ;; Channels-driven concurrency with Clojure | |
| ;; Clojure variant for code examples from this gist: | |
| ;; https://gist.github.com/3124594 | |
| ;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
| ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
| ;; | |
| ;; Concurrency is the key to designing high performance network services. | |
| ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
| ;; There is no implementation for "Go channels" in core, but we can use | |
| ;; 3rd-party library Lamina to do the same things. | |
| ;; | |
| ;; https://github.com/ztellman/lamina | |
| ;; | |
| ;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
| ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
| ;; functional approach of data transformation from initial to desired state). | |
| ;; (1) Generator: function that returns the channel | |
| (use 'lamina.core) | |
| (defn boring | |
| [name] | |
| (let [ch (channel)] | |
| ;; future will run separately from main control flow | |
| (future | |
| ;; emit string message five times with random delay | |
| (dotimes [_ 5] | |
| (let [after (int (rand 500))] | |
| (Thread/sleep after) | |
| (enqueue ch (str name ": I'm boring after " after))))) | |
| ;; return the channel to caller | |
| ch)) | |
| ;; With single instance | |
| (let [joe (boring "Joe")] | |
| (doseq [msg (lazy-channel-seq (take* 5 joe))] (println msg))) | |
| (println "You're boring: I'm leaving.") | |
| ;; Process all messages from channel | |
| ;; Please, note this notation is asynchronous, so... | |
| (let [joe (boring "Joe")] (receive-all joe println)) | |
| ;; you will see this message first :) | |
| (println "You're boring: I'm leaving.") | |
| ;; More instances... | |
| ;; Actually, this is little bit tricky and it's definitely other | |
| ;; mechanism than we use in Go for this example. It's more | |
| ;; likely what we do in "#2 Fan-in" code examples. | |
| (let [joe (boring "Joe") ann (boring "Ann") chs (channel)] | |
| (doseq [ch [joe ann]] (join ch chs)) | |
| (receive-all chs println)) | |
| (println "You're boring: I'm leaving.") | |
| ;; More instances... | |
| ;; Read from one channel, than - from second | |
| (let [joe (boring "Joe") ann (boring "Ann")] | |
| (loop [] | |
| (doseq [ch [joe ann]] | |
| (when-not (closed? ch) (println @(read-channel ch)))) | |
| (recur))) | |
| ;; Read from one channel, than - from second | |
| ;; Several improvements in order to stop execution, | |
| ;; when both channels are closed (without any information | |
| ;; about total count of messages) | |
| (let [joe (boring "Joe") ann (boring "Ann") run (atom 2)] | |
| (loop [] | |
| (doseq [ch [joe ann]] | |
| (if (closed? ch) | |
| (swap! run dec) | |
| (println @(read-channel ch)))) | |
| (if (> @run 0) (recur)))) | |
| (println "You're boring: I'm leaving.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ;; Channels-driven concurrency with Clojure | |
| ;; Clojure variant for code examples from this gist: | |
| ;; https://gist.github.com/3124594 | |
| ;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
| ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
| ;; | |
| ;; Concurrency is the key to designing high performance network services. | |
| ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
| ;; There is no implementation for "Go channels" in core, but we can use | |
| ;; 3rd-party library Lamina to do the same things. | |
| ;; | |
| ;; https://github.com/ztellman/lamina | |
| ;; | |
| ;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
| ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
| ;; functional approach of data transformation from initial to desired state). | |
| ;; (2) Fan-in | |
| ;; "Hand-made" one | |
| (defn fan-in | |
| [input1 input2] | |
| (let [ch (channel) pusher (partial enqueue ch)] | |
| (doseq [x [input1 input2]] (receive-all x pusher)) ch)) | |
| ;; Or any count of inputs instead of just 2 | |
| (defn fan-in | |
| [& inputs] | |
| (let [ch (channel) pusher (partial enqueue ch)] | |
| (doseq [x inputs] (receive-all x pusher)) ch)) | |
| ;; Or more "clojurian" approach with join | |
| (defn fan-in | |
| [& inputs] | |
| (let [ch (channel)] (doseq [x inputs] (join x ch)) ch)) | |
| ;; Do printing only 10 times | |
| (let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
| (receive-all (take* 10 ch) println)) | |
| ;; Or any times something will be pushed to channel | |
| (let [ch (apply fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ;; Channels-driven concurrency with Clojure | |
| ;; Clojure variant for code examples from this gist: | |
| ;; https://gist.github.com/3124594 | |
| ;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
| ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
| ;; | |
| ;; Concurrency is the key to designing high performance network services. | |
| ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
| ;; There is no implementation for "Go channels" in core, but we can use | |
| ;; 3rd-party library Lamina to do the same things. | |
| ;; | |
| ;; https://github.com/ztellman/lamina | |
| ;; | |
| ;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
| ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
| ;; functional approach of data transformation from initial to desired state). | |
| ;; (4) Timeouts | |
| ;; To test timeouts let add one line into boring generator | |
| (defn boring | |
| [name] | |
| (let [ch (channel)] | |
| ;; future will run separately from main control flow | |
| (future | |
| ;; emit string message five times with random delay | |
| (dotimes [_ 5] | |
| (let [after (int (rand 100))] | |
| (Thread/sleep after) | |
| (enqueue ch (str name ": I'm boring after " after)))) | |
| (close ch)) ;; <--- Here. Let's close channel after 5 messages. | |
| ;; return the channel to caller | |
| ch)) | |
| ;; Timeout for whole conversation | |
| (let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
| ;; note 3rd param for lazy-channel-seq function | |
| (doseq [msg (lazy-channel-seq (take* 10 ch) 500)] (println msg))) | |
| (println "You're boring: I'm leaving.") | |
| ;; Timeout for each message | |
| ;; There are multiple ways of how to perform such functionality. | |
| ;; Here you can find approach using (with-timeout _) wrapper for ResultChannel. | |
| (let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
| (loop [] | |
| (when-not (or (closed? ch) (drained? ch)) | |
| (println @(with-timeout 50 (read-channel ch))) (recur)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment