Last active
November 3, 2019 14:20
-
-
Save moredure/ca2aefb8d11ab6a6708469684063b862 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type consumer struct { | |
| url string | |
| queue string | |
| stop chan struct{} | |
| wg sync.WaitGroup | |
| deliveries chan<- amqp.Delivery | |
| } | |
| func (e *consumer) Stop() { | |
| close(e.stop) | |
| e.wg.Wait() | |
| } | |
| func (e *consumer) Start() { | |
| e.wg.Add(1) | |
| defer e.wg.Done() | |
| for { | |
| conn, err := amqp.Dial(e.url) | |
| if err != nil { | |
| log.Println(err) | |
| continue | |
| } | |
| ch, err := conn.Channel() | |
| if err != nil { | |
| log.Println(err, conn.Close()) | |
| continue | |
| } | |
| if err := ch.Qos(1, 0, false); err != nil { | |
| log.Println(err, conn.Close()) | |
| continue | |
| } | |
| msgs, err := ch.Consume( | |
| e.queue, | |
| "", | |
| true, | |
| false, | |
| false, | |
| false, | |
| nil, | |
| ) | |
| if err != nil { | |
| log.Println(err, conn.Close()) | |
| continue | |
| } | |
| // on-close close channel, process last msg and finish | |
| next, stopped := make(chan struct{}), make(chan struct{}) | |
| var wg sync.WaitGroup | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| select { | |
| case <-e.stop: | |
| conn.Close() | |
| close(stopped) | |
| case <-next: | |
| } | |
| }() | |
| for msg := range msgs { | |
| e.deliveries <- msg | |
| } | |
| close(next) | |
| wg.Wait() | |
| select { | |
| case <-stopped: | |
| return | |
| default: | |
| } | |
| } | |
| } | |
| func NewDeliveries(e models.Environment) (components.ConsumerAMQP, chan amqp.Delivery) { | |
| deliveries := make(chan amqp.Delivery) | |
| c := consumer{ | |
| url: e.RabbitURL, | |
| queue: e.TasksQueue, | |
| deliveries: deliveries, | |
| stop: make(chan struct{}), | |
| } | |
| go c.Start() | |
| return &c, deliveries | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment