Skip to content

Instantly share code, notes, and snippets.

@kachayev
Last active October 7, 2015 09:57
Show Gist options
  • Select an option

  • Save kachayev/3146759 to your computer and use it in GitHub Desktop.

Select an option

Save kachayev/3146759 to your computer and use it in GitHub Desktop.

Revisions

  1. kachayev revised this gist Jun 1, 2013. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion clojure-channels-7-search.clj
    Original file line number Diff line number Diff line change
    @@ -33,7 +33,8 @@

    (defn fastest
    [query & replicas]
    (let [ch (apply fan-in (map #(% query (channel)) replicas))]
    (let [chs (map #(% query (channel)) replicas)
    ch (apply fan-in chs)]
    @(read-channel ch)))

    (defn google
  2. kachayev revised this gist Jun 1, 2013. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion clojure-channels-7-search.clj
    Original file line number Diff line number Diff line change
    @@ -28,7 +28,8 @@

    (defn fan-in
    [& inputs]
    (let [ch (channel)] (doseq [x inputs] (join x ch)) ch))
    (let [ch (channel)]
    (doseq [x inputs] (join x ch)) ch))

    (defn fastest
    [query & replicas]
  3. kachayev revised this gist Jun 1, 2013. 1 changed file with 3 additions and 1 deletion.
    4 changes: 3 additions & 1 deletion clojure-channels-7-search.clj
    Original file line number Diff line number Diff line change
    @@ -39,7 +39,9 @@
    [query]
    (println "Start searching")
    (let [ch (channel)]
    (doseq [s '(["Web1" "Web2"] ["Image1" "Image2"] ["Video1" "Video2"])]
    (doseq [s '(["Web1" "Web2"]
    ["Image1" "Image2"]
    ["Video1" "Video2"])]
    (future
    (enqueue ch
    (apply fastest (conj (map fake-search s) query)))))
  4. kachayev revised this gist Jul 23, 2012. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion clojure-channels-7-search.clj
    Original file line number Diff line number Diff line change
    @@ -38,7 +38,7 @@
    (defn google
    [query]
    (println "Start searching")
    (let [ch (channel) query "Go & Clojure channels"]
    (let [ch (channel)]
    (doseq [s '(["Web1" "Web2"] ["Image1" "Image2"] ["Video1" "Video2"])]
    (future
    (enqueue ch
  5. kachayev revised this gist Jul 22, 2012. 3 changed files with 4 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -57,6 +57,7 @@
    (let [joe (boring "Joe") ann (boring "Ann")]
    (loop []
    (doseq [ch [joe ann]]
    ;; TODO: Fix checking for channel closing (this is wrong way)
    (when-not (closed? ch) (println @(read-channel ch))))
    (recur)))

    @@ -67,6 +68,7 @@
    (let [joe (boring "Joe") ann (boring "Ann") run (atom 2)]
    (loop []
    (doseq [ch [joe ann]]
    ;; TODO: Fix checking for channel closing (this is wrong way)
    (if (closed? ch)
    (swap! run dec)
    (println @(read-channel ch))))
    1 change: 1 addition & 0 deletions clojure-channels-3-select.clj
    Original file line number Diff line number Diff line change
    @@ -27,6 +27,7 @@
    (doseq
    [[t ch] [["joe" joe] ["timer" timer]]]
    ;; Map message to struct [type message]
    ;; TODO: Check if I can you (named-channel) for this
    (join (map* (partial conj [t]) ch) select))
    ;; Read from channel until it's not closed (in blocking mode)
    (receive-all select
    1 change: 1 addition & 0 deletions clojure-channels-4-timeouts.clj
    Original file line number Diff line number Diff line change
    @@ -45,5 +45,6 @@
    ;; Here you can find approach using (with-timeout _) wrapper for ResultChannel.
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
    (loop []
    ;; TODO: Fix checking for channel closing (this is wrong way)
    (when-not (closed? ch)
    (println @(with-timeout 50 (read-channel ch))) (recur))))
  6. kachayev revised this gist Jul 22, 2012. 1 changed file with 5 additions and 5 deletions.
    10 changes: 5 additions & 5 deletions clojure-channels-3-select.clj
    Original file line number Diff line number Diff line change
    @@ -21,17 +21,17 @@
    ;; but we can simulate it using map* and case calls
    (let [joe (boring "Joe")
    ;; Will generate messages each 60 ms
    t (periodically 60 (fn [] "You're too slow!"))
    timer (periodically 60 (fn [] "You're too slow!"))
    ;; All channels will be joined with this one
    select (channel)]
    (doseq
    [[t ch] [["joe" joe] ["time" t]]]
    [[t ch] [["joe" joe] ["timer" timer]]]
    ;; Map message to struct [type message]
    (join (map* (partial conj [t]) ch) select))
    ;; Read from channel until it's not closed (in blocking mode)
    (receive-all select
    (fn [[type msg]]
    (fn [[name msg]]
    (println (str msg
    (case type
    (case name
    "joe" " <== Message from Joe"
    "time" " <== Timeout"))))))
    "timer" " <== Timeout"))))))
  7. kachayev revised this gist Jul 21, 2012. 1 changed file with 37 additions and 0 deletions.
    37 changes: 37 additions & 0 deletions clojure-channels-3-select.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,37 @@
    ;; Channels-driven concurrency with Clojure
    ;; Clojure variant for code examples from this gist:
    ;; https://gist.github.com/3124594
    ;; Primarily taken from Rob Pike's talk on Google I/O 2012:
    ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
    ;;
    ;; Concurrency is the key to designing high performance network services.
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;;
    ;; https://github.com/ztellman/lamina
    ;;
    ;; I should also mention, that this is not a simple copy of syntax/semantic notations
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (3) Select

    ;; Clojure doesn't have "select" (mostly cause of functional approach),
    ;; but we can simulate it using map* and case calls
    (let [joe (boring "Joe")
    ;; Will generate messages each 60 ms
    t (periodically 60 (fn [] "You're too slow!"))
    ;; All channels will be joined with this one
    select (channel)]
    (doseq
    [[t ch] [["joe" joe] ["time" t]]]
    ;; Map message to struct [type message]
    (join (map* (partial conj [t]) ch) select))
    ;; Read from channel until it's not closed (in blocking mode)
    (receive-all select
    (fn [[type msg]]
    (println (str msg
    (case type
    "joe" " <== Message from Joe"
    "time" " <== Timeout"))))))
  8. kachayev revised this gist Jul 21, 2012. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion clojure-channels-4-timeouts.clj
    Original file line number Diff line number Diff line change
    @@ -45,5 +45,5 @@
    ;; Here you can find approach using (with-timeout _) wrapper for ResultChannel.
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
    (loop []
    (when-not (or (closed? ch) (drained? ch))
    (when-not (closed? ch)
    (println @(with-timeout 50 (read-channel ch))) (recur))))
  9. kachayev revised this gist Jul 21, 2012. 1 changed file with 57 additions and 0 deletions.
    57 changes: 57 additions & 0 deletions clojure-channels-7-search.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,57 @@
    ;; Channels-driven concurrency with Clojure
    ;; Clojure variant for code examples from this gist:
    ;; https://gist.github.com/3124594
    ;; Primarily taken from Rob Pike's talk on Google I/O 2012:
    ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
    ;;
    ;; Concurrency is the key to designing high performance network services.
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;;
    ;; https://github.com/ztellman/lamina
    ;;
    ;; I should also mention, that this is not a simple copy of syntax/semantic notations
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (7) Fake Google search

    (defn fake-search
    [kind]
    (fn [query ch]
    ;; Fake seach will work in async mode and will push result to channel
    (future
    (Thread/sleep (int (rand 100)))
    (enqueue ch (str kind " result for " query)) (close ch))
    ch))

    (defn fan-in
    [& inputs]
    (let [ch (channel)] (doseq [x inputs] (join x ch)) ch))

    (defn fastest
    [query & replicas]
    (let [ch (apply fan-in (map #(% query (channel)) replicas))]
    @(read-channel ch)))

    (defn google
    [query]
    (println "Start searching")
    (let [ch (channel) query "Go & Clojure channels"]
    (doseq [s '(["Web1" "Web2"] ["Image1" "Image2"] ["Video1" "Video2"])]
    (future
    (enqueue ch
    (apply fastest (conj (map fake-search s) query)))))
    (time ;; This macro will check elapsed time for calculations
    (doseq [msg (lazy-channel-seq (take* 3 ch) 80)]
    (println msg)))))

    ;; Output result for calling
    ;; (google "Go & Clojure channels")
    ;;
    ;; Start searching
    ;; Video1 result for Go & Clojure channels
    ;; Image2 result for Go & Clojure channels
    ;; Web2 result for Go & Clojure channels
    ;; "Elapsed time: 74.172 msecs"
  10. kachayev revised this gist Jul 20, 2012. 1 changed file with 14 additions and 2 deletions.
    16 changes: 14 additions & 2 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -57,6 +57,18 @@
    (let [joe (boring "Joe") ann (boring "Ann")]
    (loop []
    (doseq [ch [joe ann]]
    (when-not (or (drained? ch) (closed? ch))
    (println @(read-channel ch))))
    (when-not (closed? ch) (println @(read-channel ch))))
    (recur)))

    ;; Read from one channel, than - from second
    ;; Several improvements in order to stop execution,
    ;; when both channels are closed (without any information
    ;; about total count of messages)
    (let [joe (boring "Joe") ann (boring "Ann") run (atom 2)]
    (loop []
    (doseq [ch [joe ann]]
    (if (closed? ch)
    (swap! run dec)
    (println @(read-channel ch))))
    (if (> @run 0) (recur))))
    (println "You're boring: I'm leaving.")
  11. kachayev revised this gist Jul 20, 2012. 1 changed file with 9 additions and 0 deletions.
    9 changes: 9 additions & 0 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -51,3 +51,12 @@
    (doseq [ch [joe ann]] (join ch chs))
    (receive-all chs println))
    (println "You're boring: I'm leaving.")

    ;; More instances...
    ;; Read from one channel, than - from second
    (let [joe (boring "Joe") ann (boring "Ann")]
    (loop []
    (doseq [ch [joe ann]]
    (when-not (or (drained? ch) (closed? ch))
    (println @(read-channel ch))))
    (recur)))
  12. kachayev revised this gist Jul 20, 2012. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion clojure-channels-4-timeouts.clj
    Original file line number Diff line number Diff line change
    @@ -15,7 +15,7 @@
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (3) Timeouts
    ;; (4) Timeouts

    ;; To test timeouts let add one line into boring generator

  13. kachayev renamed this gist Jul 20, 2012. 1 changed file with 0 additions and 0 deletions.
  14. kachayev revised this gist Jul 20, 2012. 1 changed file with 49 additions and 0 deletions.
    49 changes: 49 additions & 0 deletions clj-channels-4-timeouts.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    ;; Channels-driven concurrency with Clojure
    ;; Clojure variant for code examples from this gist:
    ;; https://gist.github.com/3124594
    ;; Primarily taken from Rob Pike's talk on Google I/O 2012:
    ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
    ;;
    ;; Concurrency is the key to designing high performance network services.
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;;
    ;; https://github.com/ztellman/lamina
    ;;
    ;; I should also mention, that this is not a simple copy of syntax/semantic notations
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (3) Timeouts

    ;; To test timeouts let add one line into boring generator

    (defn boring
    [name]
    (let [ch (channel)]
    ;; future will run separately from main control flow
    (future
    ;; emit string message five times with random delay
    (dotimes [_ 5]
    (let [after (int (rand 100))]
    (Thread/sleep after)
    (enqueue ch (str name ": I'm boring after " after))))
    (close ch)) ;; <--- Here. Let's close channel after 5 messages.
    ;; return the channel to caller
    ch))


    ;; Timeout for whole conversation
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
    ;; note 3rd param for lazy-channel-seq function
    (doseq [msg (lazy-channel-seq (take* 10 ch) 500)] (println msg)))
    (println "You're boring: I'm leaving.")

    ;; Timeout for each message
    ;; There are multiple ways of how to perform such functionality.
    ;; Here you can find approach using (with-timeout _) wrapper for ResultChannel.
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
    (loop []
    (when-not (or (closed? ch) (drained? ch))
    (println @(with-timeout 50 (read-channel ch))) (recur))))
  15. kachayev revised this gist Jul 20, 2012. 1 changed file with 11 additions and 5 deletions.
    16 changes: 11 additions & 5 deletions clojure-channels-2-multiplexing.clj
    Original file line number Diff line number Diff line change
    @@ -17,20 +17,26 @@

    ;; (2) Fan-in

    ;; "Hand-made" one
    (defn fan-in
    [input1 input2]
    (let [ch (channel) pusher (partial enqueue ch)]
    (apply #(receive-all % pusher) [input1 input2]) ch))
    (doseq [x [input1 input2]] (receive-all x pusher)) ch))

    ;; Or any count of inputs instead of just 2
    (defn fan-in
    [& inputs]
    (let [ch (channel) pusher (partial enqueue ch)]
    (apply #(receive-all % pusher) inputs) ch))
    (doseq [x inputs] (receive-all x pusher)) ch))

    ;; Or more "clojurian" approach with join
    (defn fan-in
    [& inputs]
    (let [ch (channel)] (doseq [x inputs] (join x ch)) ch))

    ;; Do printing only 10 times
    (let [ch (fan-in (map boring ["Joe" "Ann"]))]
    (dotimes [_ 10] (receive ch println)))
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
    (receive-all (take* 10 ch) println))

    ;; Or any times something will be pushed to channel
    (let [ch (fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println))
    (let [ch (apply fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println))
  16. kachayev revised this gist Jul 20, 2012. 1 changed file with 9 additions and 0 deletions.
    9 changes: 9 additions & 0 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -42,3 +42,12 @@
    (let [joe (boring "Joe")] (receive-all joe println))
    ;; you will see this message first :)
    (println "You're boring: I'm leaving.")

    ;; More instances...
    ;; Actually, this is little bit tricky and it's definitely other
    ;; mechanism than we use in Go for this example. It's more
    ;; likely what we do in "#2 Fan-in" code examples.
    (let [joe (boring "Joe") ann (boring "Ann") chs (channel)]
    (doseq [ch [joe ann]] (join ch chs))
    (receive-all chs println))
    (println "You're boring: I'm leaving.")
  17. kachayev revised this gist Jul 20, 2012. 2 changed files with 23 additions and 9 deletions.
    27 changes: 18 additions & 9 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -8,7 +8,12 @@
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;;
    ;; https://github.com/ztellman/lamina
    ;;
    ;; I should also mention, that this is not a simple copy of syntax/semantic notations
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (1) Generator: function that returns the channel

    @@ -19,17 +24,21 @@
    (let [ch (channel)]
    ;; future will run separately from main control flow
    (future
    (let [after (int (rand 500))]
    (Thread/sleep after)
    (enqueue ch (str name ": I'm boring after " after)))
    ;; emit string message five times with random delay
    (dotimes [_ 5]
    (let [after (int (rand 500))]
    (Thread/sleep after)
    (enqueue ch (str name ": I'm boring after " after)))))
    ;; return the channel to caller
    ch))

    ;; With single instance
    (let [joe (boring "Joe")] (dotimes [_ 5] (receive joe println)))
    (let [joe (boring "Joe")]
    (doseq [msg (lazy-channel-seq (take* 5 joe))] (println msg)))
    (println "You're boring: I'm leaving.")

    ;; Process all messages from channel
    ;; Please, note this notation is asynchronous, so...
    (let [joe (boring "Joe")] (receive-all joe println))
    ;; you will see this message first :)
    (println "You're boring: I'm leaving.")

    ;; More instances...
    (let [joe (boring "Joe") ann (boring "Ann")]
    (dotimes [_ 10] (apply #(receive % println) [joe ann])))
    (println "You're boring: I'm leaving.")
    5 changes: 5 additions & 0 deletions clojure-channels-2-multiplexing.clj
    Original file line number Diff line number Diff line change
    @@ -8,7 +8,12 @@
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;;
    ;; https://github.com/ztellman/lamina
    ;;
    ;; I should also mention, that this is not a simple copy of syntax/semantic notations
    ;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
    ;; functional approach of data transformation from initial to desired state).

    ;; (2) Fan-in

  18. kachayev revised this gist Jul 20, 2012. 2 changed files with 2 additions and 0 deletions.
    1 change: 1 addition & 0 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,7 @@
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;; https://github.com/ztellman/lamina

    ;; (1) Generator: function that returns the channel

    1 change: 1 addition & 0 deletions clojure-channels-2-multiplexing.clj
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,7 @@
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.
    ;; https://github.com/ztellman/lamina

    ;; (2) Fan-in

  19. kachayev revised this gist Jul 19, 2012. 1 changed file with 30 additions and 0 deletions.
    30 changes: 30 additions & 0 deletions clojure-channels-2-multiplexing.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    ;; Channels-driven concurrency with Clojure
    ;; Clojure variant for code examples from this gist:
    ;; https://gist.github.com/3124594
    ;; Primarily taken from Rob Pike's talk on Google I/O 2012:
    ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
    ;;
    ;; Concurrency is the key to designing high performance network services.
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.

    ;; (2) Fan-in

    (defn fan-in
    [input1 input2]
    (let [ch (channel) pusher (partial enqueue ch)]
    (apply #(receive-all % pusher) [input1 input2]) ch))

    ;; Or any count of inputs instead of just 2
    (defn fan-in
    [& inputs]
    (let [ch (channel) pusher (partial enqueue ch)]
    (apply #(receive-all % pusher) inputs) ch))

    ;; Do printing only 10 times
    (let [ch (fan-in (map boring ["Joe" "Ann"]))]
    (dotimes [_ 10] (receive ch println)))

    ;; Or any times something will be pushed to channel
    (let [ch (fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println))
  20. kachayev created this gist Jul 19, 2012.
    34 changes: 34 additions & 0 deletions clojure-channels-1-generator.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,34 @@
    ;; Channels-driven concurrency with Clojure
    ;; Clojure variant for code examples from this gist:
    ;; https://gist.github.com/3124594
    ;; Primarily taken from Rob Pike's talk on Google I/O 2012:
    ;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
    ;;
    ;; Concurrency is the key to designing high performance network services.
    ;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
    ;; There is no implementation for "Go channels" in core, but we can use
    ;; 3rd-party library Lamina to do the same things.

    ;; (1) Generator: function that returns the channel

    (use 'lamina.core)

    (defn boring
    [name]
    (let [ch (channel)]
    ;; future will run separately from main control flow
    (future
    (let [after (int (rand 500))]
    (Thread/sleep after)
    (enqueue ch (str name ": I'm boring after " after)))
    ;; return the channel to caller
    ch))

    ;; With single instance
    (let [joe (boring "Joe")] (dotimes [_ 5] (receive joe println)))
    (println "You're boring: I'm leaving.")

    ;; More instances...
    (let [joe (boring "Joe") ann (boring "Ann")]
    (dotimes [_ 10] (apply #(receive % println) [joe ann])))
    (println "You're boring: I'm leaving.")