Skip to content

Instantly share code, notes, and snippets.

@icholy
Last active February 20, 2017 01:44
Show Gist options
  • Select an option

  • Save icholy/5449954 to your computer and use it in GitHub Desktop.

Select an option

Save icholy/5449954 to your computer and use it in GitHub Desktop.

Revisions

  1. icholy revised this gist Apr 24, 2013. 1 changed file with 104 additions and 0 deletions.
    104 changes: 104 additions & 0 deletions stream_error_handling.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,104 @@
    package main

    import (
    "errors"
    "log"
    )

    type Stream struct {
    in chan string
    close chan bool
    err chan error
    }

    func (s *Stream) Close() { s.close <- true }
    func (s *Stream) Fail(err error) { s.Close(); s.Drain(); s.err <- err }
    func (s *Stream) Drain() {
    for _ = range s.in {
    }
    }

    func NewWordStream(words []string) *Stream {
    s := &Stream{make(chan string), make(chan bool), make(chan error)}
    go func() {
    loop:
    for _, w := range words {
    select {
    case <-s.close:
    break loop
    case s.in <- w:
    continue
    }
    }
    close(s.in)
    }()
    return s
    }

    func (s *Stream) Map(fn func(string) (string, error)) *Stream {
    out := make(chan string)
    go func() {
    for x := range s.in {
    x, err := fn(x)
    if err != nil {
    s.Fail(err)
    } else {
    out <- x
    }
    }
    close(out)
    }()
    return &Stream{out, s.close, s.err}
    }

    func (s *Stream) Filter(fn func(string) (bool, error)) *Stream {
    out := make(chan string)
    go func() {
    for x := range s.in {
    ok, err := fn(x)
    if err != nil {
    s.Fail(err)
    } else if ok {
    out <- x
    }
    }
    close(out)
    }()
    return &Stream{out, s.close, s.err}
    }

    func (s *Stream) Do(fn func(string)) error {
    var err error
    loop:
    for {
    select {
    case x, ok := <-s.in:
    if !ok {
    break loop
    }
    fn(x)
    case err = <-s.err:
    }
    }
    return err
    }

    func main() {

    words := []string{"this", "is", "pretty", "cool", "1", "2", "3"}

    longerThan3 := func(s string) (bool, error) { return len(s) > 3, nil }

    wrap := func(s string) (string, error) {
    if s == "cool" {
    return "", errors.New("not cool")
    }
    return "<" + s + ">", nil
    }

    err := NewWordStream(words).Map(wrap).Filter(longerThan3).Do(func(w string) { log.Print(w, " ") })

    if err != nil {
    log.Fatalf("Error: %s", err)
    }
    }
  2. icholy revised this gist Apr 24, 2013. 1 changed file with 16 additions and 16 deletions.
    32 changes: 16 additions & 16 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -58,22 +58,6 @@ func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc {
    return out
    }

    func (in Stream) ReduceAll(fn func(Acc, Item) Acc) Acc {
    var acc Acc
    for x := range in {
    acc = fn(acc, x)
    }
    return acc
    }

    func (in Stream) Collect() []Item {
    a := make([]Item, 0, 5)
    for x := range in {
    a = append(a, x)
    }
    return a
    }

    func (in Stream) Sniff(fn func(Item)) Stream {
    out := make(Stream)
    go func() {
    @@ -99,6 +83,22 @@ func (in Stream) Split() (Stream, Stream) {
    return out1, out2
    }

    func (in Stream) ReduceAll(fn func(Acc, Item) Acc) Acc {
    var acc Acc
    for x := range in {
    acc = fn(acc, x)
    }
    return acc
    }

    func (in Stream) Collect() []Item {
    a := make([]Item, 0, 5)
    for x := range in {
    a = append(a, x)
    }
    return a
    }

    func (in Stream) Do(fn func(Item)) {
    for x := range in {
    fn(x)
  3. icholy revised this gist Apr 24, 2013. 1 changed file with 18 additions and 10 deletions.
    28 changes: 18 additions & 10 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -58,6 +58,14 @@ func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc {
    return out
    }

    func (in Stream) ReduceAll(fn func(Acc, Item) Acc) Acc {
    var acc Acc
    for x := range in {
    acc = fn(acc, x)
    }
    return acc
    }

    func (in Stream) Collect() []Item {
    a := make([]Item, 0, 5)
    for x := range in {
    @@ -79,16 +87,16 @@ func (in Stream) Sniff(fn func(Item)) Stream {
    }

    func (in Stream) Split() (Stream, Stream) {
    out1, out2 := make(Stream), make(Stream)
    go func() {
    for x := range in {
    out1 <- x
    out2 <- x
    }
    close(out1)
    close(out2)
    }()
    return out1, out2
    out1, out2 := make(Stream), make(Stream)
    go func() {
    for x := range in {
    out1 <- x
    out2 <- x
    }
    close(out1)
    close(out2)
    }()
    return out1, out2
    }

    func (in Stream) Do(fn func(Item)) {
  4. icholy revised this gist Apr 24, 2013. 1 changed file with 25 additions and 0 deletions.
    25 changes: 25 additions & 0 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -66,6 +66,31 @@ func (in Stream) Collect() []Item {
    return a
    }

    func (in Stream) Sniff(fn func(Item)) Stream {
    out := make(Stream)
    go func() {
    for x := range in {
    fn(x)
    out <- x
    }
    close(out)
    }()
    return out
    }

    func (in Stream) Split() (Stream, Stream) {
    out1, out2 := make(Stream), make(Stream)
    go func() {
    for x := range in {
    out1 <- x
    out2 <- x
    }
    close(out1)
    close(out2)
    }()
    return out1, out2
    }

    func (in Stream) Do(fn func(Item)) {
    for x := range in {
    fn(x)
  5. icholy revised this gist Apr 24, 2013. 1 changed file with 20 additions and 6 deletions.
    26 changes: 20 additions & 6 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -58,6 +58,20 @@ func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc {
    return out
    }

    func (in Stream) Collect() []Item {
    a := make([]Item, 0, 5)
    for x := range in {
    a = append(a, x)
    }
    return a
    }

    func (in Stream) Do(fn func(Item)) {
    for x := range in {
    fn(x)
    }
    }

    func main() {

    reverse := func(word Item) Item {
    @@ -78,15 +92,15 @@ func main() {
    }

    not := func(s string) func(Item) bool {
    item := Item(s)
    return func(w Item) bool {
    return s != string(w)
    return item != w
    }
    }

    words := []Item{"this", "is", "pretty", "cool"}
    stream := NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    for w := range stream {
    println(w)
    }
    words := []Item{"this", "is", "pretty", "cool", "1", "2", "3"}

    NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this")).Do(func(w Item) {
    println(w)
    })
    }
  6. icholy revised this gist Apr 24, 2013. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -84,7 +84,7 @@ func main() {
    }

    words := []Item{"this", "is", "pretty", "cool"}
    stream :=NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    stream := NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    for w := range stream {
    println(w)
    }
  7. icholy revised this gist Apr 24, 2013. 1 changed file with 14 additions and 12 deletions.
    26 changes: 14 additions & 12 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,17 @@ type Item string
    type Stream chan Item
    type Acc string

    func NewWordStream(words []Item) Stream {
    stream := make(Stream)
    go func() {
    for _, w := range words {
    stream <- w
    }
    close(stream)
    }()
    return stream
    }

    func (in Stream) Map(fn func(Item) Item) Stream {
    out := make(Stream)
    go func() {
    @@ -72,18 +83,9 @@ func main() {
    }
    }

    stream := make(Stream)

    go func() {
    words := []Item{"this", "is", "pretty", "cool"}
    for _, w := range words {
    stream <- w
    }
    close(stream)
    }()

    out := stream.Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    for w := range out {
    words := []Item{"this", "is", "pretty", "cool"}
    stream :=NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    for w := range stream {
    println(w)
    }

  8. icholy created this gist Apr 24, 2013.
    90 changes: 90 additions & 0 deletions map_filter_reduce.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,90 @@
    package main

    import (
    "unicode/utf8"
    )

    type Item string
    type Stream chan Item
    type Acc string

    func (in Stream) Map(fn func(Item) Item) Stream {
    out := make(Stream)
    go func() {
    for x := range in {
    out <- fn(x)
    }
    close(out)
    }()
    return out
    }

    func (in Stream) Filter(fn func(Item) bool) Stream {
    out := make(Stream)
    go func() {
    for x := range in {
    if fn(x) {
    out <- x
    }
    }
    close(out)
    }()
    return out
    }

    func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc {
    out := make(chan Acc)
    go func() {
    var val, acc Acc
    for x := range in {
    acc, val = fn(acc, x)
    if val != "" {
    out <- val
    }
    }
    close(out)
    }()
    return out
    }

    func main() {

    reverse := func(word Item) Item {
    s := string(word)
    o := make([]rune, utf8.RuneCountInString(s))
    i := len(o)
    for _, c := range s {
    i--
    o[i] = c
    }
    return Item(o)
    }

    longerThan := func(n int) func(Item) bool {
    return func(w Item) bool {
    return len(w) > n
    }
    }

    not := func(s string) func(Item) bool {
    return func(w Item) bool {
    return s != string(w)
    }
    }

    stream := make(Stream)

    go func() {
    words := []Item{"this", "is", "pretty", "cool"}
    for _, w := range words {
    stream <- w
    }
    close(stream)
    }()

    out := stream.Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this"))
    for w := range out {
    println(w)
    }

    }