Skip to content

Instantly share code, notes, and snippets.

@mtio
Last active May 2, 2017 14:42
Show Gist options
  • Select an option

  • Save mtio/e5a9c4a039358c3d1c1ef26a35318451 to your computer and use it in GitHub Desktop.

Select an option

Save mtio/e5a9c4a039358c3d1c1ef26a35318451 to your computer and use it in GitHub Desktop.
package main
import (
"container/heap"
"fmt"
"time"
"math/rand"
)
func main() {
work := make(chan Request)
bal := new(Balancer)
bal.done = make(chan *Worker)
for i := 0; i < 10; i++ {
worker := new(Worker)
worker.requests = make(chan Request)
worker.index = i
bal.pool = append(bal.pool, worker)
}
go requester(work)
bal.balance(work)
}
func requester(work chan Request) {
for {
response := make(chan string)
// Kill some time (fake load).
time.Sleep(10 * time.Millisecond)
work <- Request{doWork, response} // send request
go func(response chan string) {
fmt.Println(<-response)
}(response)
}
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work: // received a Request...
b.dispatch(req) // ...so send it to a Worker
case w := <-b.done: // a worker has finished...
b.completed(w) // ...so update its info
}
}
}
// Send Request to worker
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
go w.work(b.done)
// ...send it the task
w.requests <- req
// One more in its work queue.
w.pending++
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
// One fewer in the queue.
w.pending--
// Remove it from heap.
heap.Remove(&b.pool, w.index)
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
func (w *Worker) work(done chan *Worker) {
for {
req := <- w.requests // get Request from balancer
req.response <- req.fn(*w) // call fn and send result
done <- w // we've finished this request
}
}
func doWork(worker Worker) string {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return fmt.Sprintf("index: %d pending:%d", worker.index, worker.pending)
}
type Request struct {
fn func(worker Worker) string
response chan string
}
type Worker struct {
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in heap
}
type Balancer struct {
pool Pool
done chan *Worker
}
type Pool []*Worker
func (p Pool) Len() int { return len(p) }
func (p Pool) Less(i, j int) bool { return p[i].pending < p[j].pending }
func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p *Pool) Push(x interface{}) {
*p = append(*p, x.(*Worker))
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
x := old[n-1]
*p = old[0 : n-1]
return x
}
@mtio
Copy link
Copy Markdown
Author

mtio commented May 2, 2017

Updated with fixes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment