Skip to content

Instantly share code, notes, and snippets.

@moredure
Last active November 3, 2019 14:20
Show Gist options
  • Select an option

  • Save moredure/ca2aefb8d11ab6a6708469684063b862 to your computer and use it in GitHub Desktop.

Select an option

Save moredure/ca2aefb8d11ab6a6708469684063b862 to your computer and use it in GitHub Desktop.
type consumer struct {
url string
queue string
stop chan struct{}
wg sync.WaitGroup
deliveries chan<- amqp.Delivery
}
func (e *consumer) Stop() {
close(e.stop)
e.wg.Wait()
}
func (e *consumer) Start() {
e.wg.Add(1)
defer e.wg.Done()
for {
conn, err := amqp.Dial(e.url)
if err != nil {
log.Println(err)
continue
}
ch, err := conn.Channel()
if err != nil {
log.Println(err, conn.Close())
continue
}
if err := ch.Qos(1, 0, false); err != nil {
log.Println(err, conn.Close())
continue
}
msgs, err := ch.Consume(
e.queue,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Println(err, conn.Close())
continue
}
// on-close close channel, process last msg and finish
next, stopped := make(chan struct{}), make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-e.stop:
conn.Close()
close(stopped)
case <-next:
}
}()
for msg := range msgs {
e.deliveries <- msg
}
close(next)
wg.Wait()
select {
case <-stopped:
return
default:
}
}
}
func NewDeliveries(e models.Environment) (components.ConsumerAMQP, chan amqp.Delivery) {
deliveries := make(chan amqp.Delivery)
c := consumer{
url: e.RabbitURL,
queue: e.TasksQueue,
deliveries: deliveries,
stop: make(chan struct{}),
}
go c.Start()
return &c, deliveries
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment