Skip to content

Instantly share code, notes, and snippets.

@ezeql
Created March 31, 2016 16:14
Show Gist options
  • Select an option

  • Save ezeql/07d182c43c110fc2577a15f4ad4ff665 to your computer and use it in GitHub Desktop.

Select an option

Save ezeql/07d182c43c110fc2577a15f4ad4ff665 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := generateList(done, "http://slowsite.com", "http://facebook.com", "http://google.com", "http://twitter.com")
// Distribute the sq work across two goroutines that both read from in.
c1 := processURLs(done, in)
c2 := processURLs(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
// fmt.Println("a")
for content := range out {
fmt.Println(content)
}
// done will be closed by the deferred call.
}
func generateList(done <-chan struct{}, nums ...string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func getContent(done <-chan struct{}, url string) chan string {
out := make(chan string)
go func() {
//just as an example
if url == "http://slowsite.com" {
time.Sleep(time.Second)
}
out <- url
}()
return out
}
func processURLs(done <-chan struct{}, urlc <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for url := range urlc {
contentC := getContent(done, url)
select {
case p := <-contentC:
out <- p
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan string) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment