const amqplib = require('amqplib'); class MessageQueue { constructor() { this.connection = amqplib.connect(process.env.CLOUDAMQP_URL || 'amqp://localhost'); this.channels = {}; } channel(queue) { if (!this.channels[queue]) { this.channels[queue] = this.connection .then(connection => connection.createChannel()) .tap(channel => channel.assertQueue(queue)) .tap(channel => channel.prefetch(1)); } return this.channels[queue]; } publish(queue, message) { this.channel(queue).then(channel => { channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); }); } consume(queue, callback) { this.channel(queue).then(channel => { channel.consume(queue, message => { let decoded = message; if (decoded !== null) { decoded = JSON.parse(decoded.content.toString()); } callback(decoded, () => { channel.ack(message); }); }); }); } } module.exports = new MessageQueue();