package main import ( "fmt" "math/rand" "sync" "time" ) type Message struct { Content string Id string } type MessageBatch struct { mu sync.Mutex Messages []Message } func (mb *MessageBatch) push(message Message){ mb.mu.Lock() mb.Messages = append(mb.Messages, message) mb.mu.Unlock() } func (mb *MessageBatch) empty(){ mb.mu.Lock() mb.Messages = []Message{} mb.mu.Unlock() } func (mb *MessageBatch) mLength() int { mb.mu.Lock() length := len(mb.Messages) mb.mu.Unlock() return length } func (mb *MessageBatch) String() string { mb.mu.Lock() defer mb.mu.Unlock() result := fmt.Sprint(mb.Messages) return result } const ( wait = time.Duration(10)*time.Second ) func main(){ var messageBucket MessageBatch names := []string{"sara","matt","nicole","chris","elena","bob","seven","grizzly","stu","gomez"} filled := make(chan bool) fmt.Println("start") // Simulate 10 agents for x:=0; x<9; x++ { //use a name to easier identify threads name := names[x] go func(id string) { // seed start time for thread rand.Seed(time.Now().UnixNano()) var mess Message for { // give me a random letter of the alphabet var singleRune = string(rand.Intn(26) + 97) mess = Message{Content: singleRune, Id: id} fmt.Print(mess, " ") // send the message to the out bound pipe messageBucket.push(mess) // have thread sleep some random amount of time time.Sleep(time.Duration(rand.Intn(15))*time.Second) } }(name) } // have go rountine check if the message buffer is filled, if so, send message to publish batch of messages go func(fill chan bool, b *MessageBatch) { for { if b.mLength() > 10 { filled <- true } time.Sleep(time.Duration(1 * time.Second)) } }(filled, &messageBucket) // this is used to show how much time is passed since the last batch publishing var elapsed time.Duration // timer will be used to make sure too much time doesnt pass before records are published timer := time.NewTimer(wait) now := time.Now() for { select { // the timer has timed out case <-timer.C: elapsed = time.Since(now) fmt.Printf("\n\n %fs ===== Ticked %d \n", elapsed.Seconds(), messageBucket.mLength()) fmt.Println(messageBucket) fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") messageBucket.empty() timer.Reset(wait) now = time.Now() // bucket is full case <-filled: timer.Reset(wait) elapsed = time.Since(now) fmt.Printf("\n\n %fs ===== Filled %d \n", elapsed.Seconds(), messageBucket.mLength()) fmt.Println(messageBucket) fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") messageBucket.empty() timer.Reset(wait) now = time.Now() } } }