Skip to content

Instantly share code, notes, and snippets.

@moredure
Created November 2, 2019 22:24
Show Gist options
  • Select an option

  • Save moredure/2fae97baa713d449633dc4b4d8bcf5f1 to your computer and use it in GitHub Desktop.

Select an option

Save moredure/2fae97baa713d449633dc4b4d8bcf5f1 to your computer and use it in GitHub Desktop.
type publisher struct {
url string
chPool chan *amqp.Channel
}
func (c *publisher) init() error {
conn, err := amqp.Dial(c.url)
if err != nil {
return err
}
for i := 0; i < cap(c.chPool); i += 1 {
ch, err := conn.Channel()
if err != nil {
return multierr.Combine(err, conn.Close())
}
c.chPool <- ch
}
go func(conn *amqp.Connection) {
errCh := make(chan *amqp.Error, 1)
<-conn.NotifyClose(errCh)
for i := 0; i < cap(c.chPool); i += 1 {
select {
case <-c.chPool:
default:
}
}
for {
if err := c.init(); err != nil {
time.Sleep(1 * time.Second)
log.Println(err)
continue
}
return
}
}(conn)
return nil
}
func (c *publisher) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
for {
ch := <-c.chPool
err := ch.Publish(exchange, key, mandatory, immediate, msg)
if err == amqp.ErrClosed {
continue
}
c.chPool <- ch
return err
}
}
func NewPublisher(environment models.Environment) (components.Publisher, error) {
p := &publisher{
url: environment.RabbitURL,
chPool: make(chan *amqp.Channel, environment.ChannelsPoolSize),
}
return p, p.init()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment