package main import ( "errors" "fmt" "io" "strings" ) func main() { fmt.Println("Hello, playground") s := (&Stream{}).Map(strings.ToUpper).Filter(func(s string) bool { return true }).Each(func(s string) { fmt.Println("writer got", s) }) // Push one data on the buffer s.Push("one") // read one data { v, err := s.Read() fmt.Printf("reader got %q %v\n", v, err) } // read one more data { v, err := s.Read() fmt.Printf("reader got %q %v\n", v, err) } // got EOF! // Push another data on the buffer s.Push("two") // read new data { v, err := s.Read() fmt.Printf("reader got %q %v\n", v, err) } // read again, get an EOF // or write/read a data { v, err := s.Write("uppercase") fmt.Printf("reader got %q %v\n", v, err) } } type Stream struct { bridges []*StreamBridge buffer []string } type SkipErr struct{ error } func (s *Stream) add(bridge *StreamBridge) *Stream { k := len(s.bridges) if k > 0 { bridge.from = s.bridges[k-1] s.bridges[k-1].to = bridge } else { bridge.from = &StreamPull{s} } s.bridges = append(s.bridges, bridge) return s } func (s *Stream) Filter(f func(string) bool) *Stream { bridge := &StreamBridge{ handle: func(s string) (string, error) { fmt.Println("Filter got", s) if f(s) == false { return s, SkipErr{errors.New("skip")} } return s, nil }, } return s.add(bridge) } func (s *Stream) Each(f func(string)) *Stream { bridge := &StreamBridge{ handle: func(s string) (string, error) { fmt.Println("Each got", s) f(s) return s, nil }, } return s.add(bridge) } func (s *Stream) Map(f func(string) string) *Stream { bridge := &StreamBridge{ handle: func(s string) (string, error) { fmt.Println("Map got", s) return f(s), nil }, } return s.add(bridge) } func (s *Stream) Push(one string) *Stream { s.buffer = append(s.buffer, one) return s } func (s *Stream) Read() (string, error) { k := len(s.bridges) if k == 0 { p := &StreamPull{s} return p.Pull() } return s.bridges[k-1].Pull() } func (s *Stream) Write(in string) (string, error) { return s.Push(in).Read() } type StreamPuller interface { Pull() (string, error) Push(string) (string, error) } type StreamPull struct { *Stream } func (s *StreamPull) Pull() (string, error) { var v string if len(s.buffer) > 0 { v = s.buffer[0] s.buffer = s.buffer[1:] return s.Push(v) } return v, io.EOF } func (s *StreamPull) Push(in string) (string, error) { if len(s.bridges) > 0 { return s.bridges[0].Push(in) } return in, io.EOF } type StreamBridge struct { from StreamPuller to StreamPuller handle func(string) (string, error) } func (s *StreamBridge) Pull() (string, error) { return s.from.Pull() } func (s *StreamBridge) Push(in string) (string, error) { selfV, selfErr := s.handle(in) if selfErr != nil || s.to == nil { return selfV, selfErr } return s.to.Push(selfV) } // todo asyncBridge