Skip to content

Instantly share code, notes, and snippets.

@StefanoLusardi
Created January 21, 2025 23:36
Show Gist options
  • Select an option

  • Save StefanoLusardi/d267dcf7c29fbbab48c5f27b5ff35efd to your computer and use it in GitHub Desktop.

Select an option

Save StefanoLusardi/d267dcf7c29fbbab48c5f27b5ff35efd to your computer and use it in GitHub Desktop.
RabbitMQ Client
#include "rabbitmq_client.hpp"
#include <uvw.hpp>
#include <iostream>
RabbitMQClient::RabbitMQClient(const std::string& host, int port, const std::string& user, const std::string& password)
: _host(host), _port(port), _user(user), _password(password), _loop(uvw::Loop::getDefault()) {
_tcp = _loop->resource<uvw::TCPHandle>();
}
RabbitMQClient::~RabbitMQClient() {
if (_tcp) {
_tcp->close();
}
}
void RabbitMQClient::connect() {
_tcp->on<uvw::ConnectEvent>([this](const uvw::ConnectEvent&, uvw::TCPHandle&) {
if (_on_connected) _on_connected();
send_amqp_header();
});
_tcp->on<uvw::DataEvent>([this](const uvw::DataEvent& event, uvw::TCPHandle&) {
_buffer.insert(_buffer.end(), event.data.get(), event.data.get() + event.length);
handle_amqp_frame(_buffer);
});
_tcp->on<uvw::ErrorEvent>([this](const uvw::ErrorEvent& event, uvw::TCPHandle&) {
if (_on_error) _on_error(event.what());
});
_tcp->connect(_host, _port);
}
void RabbitMQClient::publish(const std::string& exchange, const std::string& routing_key, const std::string& message) {
std::vector<uint8_t> frame = construct_publish_frame(exchange, routing_key, message);
send_amqp_frame(frame);
}
void RabbitMQClient::consume(const std::string& queue, MessageCallback callback) {
_consumers[queue] = callback;
std::vector<uint8_t> frame = construct_consume_frame(queue);
send_amqp_frame(frame);
}
void RabbitMQClient::bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key) {
std::vector<uint8_t> frame = construct_bind_queue_frame(queue, exchange, routing_key);
send_amqp_frame(frame);
}
void RabbitMQClient::unbind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key) {
std::vector<uint8_t> frame = construct_unbind_queue_frame(queue, exchange, routing_key);
send_amqp_frame(frame);
}
void RabbitMQClient::declare_queue(const std::string& queue, bool durable, bool exclusive, bool auto_delete) {
std::vector<uint8_t> frame = construct_declare_queue_frame(queue, durable, exclusive, auto_delete);
send_amqp_frame(frame);
if (_on_queue_declared) _on_queue_declared(queue);
}
void RabbitMQClient::declare_exchange(const std::string& exchange, const std::string& type, bool durable, bool auto_delete) {
std::vector<uint8_t> frame = construct_declare_exchange_frame(exchange, type, durable, auto_delete);
send_amqp_frame(frame);
if (_on_exchange_declared) _on_exchange_declared(exchange);
}
void RabbitMQClient::set_on_error(ErrorCallback callback) {
_on_error = callback;
}
void RabbitMQClient::set_on_connected(ConnectedCallback callback) {
_on_connected = callback;
}
void RabbitMQClient::set_on_disconnected(DisconnectedCallback callback) {
_on_disconnected = callback;
}
void RabbitMQClient::set_on_reconnected(ReconnectedCallback callback) {
_on_reconnected = callback;
}
void RabbitMQClient::set_on_queue_declared(QueueDeclaredCallback callback) {
_on_queue_declared = callback;
}
void RabbitMQClient::set_on_exchange_declared(ExchangeDeclaredCallback callback) {
_on_exchange_declared = callback;
}
void RabbitMQClient::send_amqp_header() {
std::vector<uint8_t> header = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
_tcp->write(header.data(), header.size());
}
void RabbitMQClient::send_amqp_frame(const std::vector<uint8_t>& frame) {
_tcp->write(frame.data(), frame.size());
}
size_t RabbitMQClient::parse_frame_size() {
return (_buffer[3] << 24) | (_buffer[2] << 16) | (_buffer[1] << 8) | _buffer[0];
}
void RabbitMQClient::handle_amqp_frame(const std::vector<uint8_t>& frame) {
if (frame.size() < 7) return; // Invalid frame
uint8_t type = frame[0];
uint16_t channel = (frame[1] << 8) | frame[2];
uint32_t size = (frame[3] << 24) | (frame[4] << 16) | (frame[5] << 8) | frame[6];
if (frame.size() < size + 8) return; // Incomplete frame
std::vector<uint8_t> payload(frame.begin() + 7, frame.begin() + 7 + size);
switch (type) {
case 1: // Method frame
handle_method_frame(channel, payload);
break;
case 2: // Header frame
handle_header_frame(channel, payload);
break;
case 3: // Body frame
handle_body_frame(channel, payload);
break;
default:
std::cerr << "Unknown frame type: " << static_cast<int>(type) << std::endl;
break;
}
}
void RabbitMQClient::handle_method_frame(uint16_t channel, const std::vector<uint8_t>& payload) {
if (payload.size() < 4) return; // Invalid method frame
uint16_t class_id = (payload[0] << 8) | payload[1];
uint16_t method_id = (payload[2] << 8) | payload[3];
if (class_id == 10) { // Class 10 = Connection
if (method_id == 10) { // Method 10 = Start
handle_connection_start(payload);
} else if (method_id == 31) { // Method 31 = Tune
handle_connection_tune(payload);
} else if (method_id == 41) { // Method 41 = Open-Ok
std::cout << "Logged in successfully!" << std::endl;
}
} else if (class_id == 60 && method_id == 60) { // Class 60 = Basic, Method 60 = Deliver
handle_basic_deliver(payload);
}
}
void RabbitMQClient::handle_connection_start(const std::vector<uint8_t>& payload) {
size_t offset = 4;
uint8_t version_major = payload[offset++];
uint8_t version_minor = payload[offset++];
std::string server_properties = extract_long_string(payload, offset);
std::string mechanisms = extract_long_string(payload, offset);
std::string locales = extract_long_string(payload, offset);
// Send connection.start-ok
std::vector<uint8_t> start_ok_args;
start_ok_args.push_back(0); // Client properties (empty)
start_ok_args.push_back(0); // End of table
start_ok_args.push_back(static_cast<uint8_t>(mechanisms.find("PLAIN") != std::string::npos ? 5 : 0)); // Mechanism length
start_ok_args.insert(start_ok_args.end(), "PLAIN", "PLAIN" + 5); // Mechanism
start_ok_args.push_back(0); // End of string
start_ok_args.push_back(static_cast<uint8_t>(locales.find("en_US") != std::string::npos ? 5 : 0)); // Locale length
start_ok_args.insert(start_ok_args.end(), "en_US", "en_US" + 5); // Locale
start_ok_args.push_back(0); // End of string
// Add PLAIN authentication response
std::string auth_response = "\0" + _user + "\0" + _password;
start_ok_args.insert(start_ok_args.end(), auth_response.begin(), auth_response.end());
auto start_ok_frame = construct_method_frame(0, 10, 11, start_ok_args); // Class 10 = Connection, Method 11 = Start-Ok
send_amqp_frame(start_ok_frame);
}
void RabbitMQClient::handle_connection_tune(const std::vector<uint8_t>& payload) {
size_t offset = 4;
uint16_t channel_max = (payload[offset++] << 8) | payload[offset++];
uint32_t frame_max = (payload[offset++] << 24) | (payload[offset++] << 16) | (payload[offset++] << 8) | payload[offset++];
uint16_t heartbeat = (payload[offset++] << 8) | payload[offset++];
// Send connection.tune-ok
std::vector<uint8_t> tune_ok_args;
tune_ok_args.push_back(static_cast<uint8_t>(channel_max >> 8)); // Channel max high byte
tune_ok_args.push_back(static_cast<uint8_t>(channel_max & 0xFF)); // Channel max low byte
tune_ok_args.push_back(static_cast<uint8_t>(frame_max >> 24)); // Frame max byte 1
tune_ok_args.push_back(static_cast<uint8_t>(frame_max >> 16)); // Frame max byte 2
tune_ok_args.push_back(static_cast<uint8_t>(frame_max >> 8)); // Frame max byte 3
tune_ok_args.push_back(static_cast<uint8_t>(frame_max & 0xFF)); // Frame max byte 4
tune_ok_args.push_back(static_cast<uint8_t>(heartbeat >> 8)); // Heartbeat high byte
tune_ok_args.push_back(static_cast<uint8_t>(heartbeat & 0xFF)); // Heartbeat low byte
auto tune_ok_frame = construct_method_frame(0, 10, 31, tune_ok_args); // Class 10 = Connection, Method 31 = Tune-Ok
send_amqp_frame(tune_ok_frame);
// Send connection.open
std::vector<uint8_t> open_args;
open_args.push_back(0); // Virtual host (empty for default)
open_args.push_back(0); // End of string
open_args.push_back(0); // Reserved
open_args.push_back(0); // Reserved
auto open_frame = construct_method_frame(0, 10, 40, open_args); // Class 10 = Connection, Method 40 = Open
send_amqp_frame(open_frame);
}
void RabbitMQClient::handle_basic_deliver(const std::vector<uint8_t>& payload) {
size_t offset = 4;
std::string consumer_tag = extract_short_string(payload, offset);
std::string delivery_tag = extract_long_long_string(payload, offset);
std::string exchange = extract_short_string(payload, offset);
std::string routing_key = extract_short_string(payload, offset);
if (_consumers.find(routing_key) != _consumers.end()) {
_consumers[routing_key](routing_key, "Message received");
}
}
void RabbitMQClient::handle_header_frame(uint16_t channel, const std::vector<uint8_t>& payload) {
std::cout << "Header frame received" << std::endl;
}
void RabbitMQClient::handle_body_frame(uint16_t channel, const std::vector<uint8_t>& payload) {
std::string message(payload.begin(), payload.end());
std::cout << "Body frame received: " << message << std::endl;
}
std::string RabbitMQClient::extract_short_string(const std::vector<uint8_t>& data, size_t& offset) {
if (offset >= data.size()) return "";
uint8_t length = data[offset++];
if (offset + length > data.size()) return "";
std::string result(data.begin() + offset, data.begin() + offset + length);
offset += length;
return result;
}
std::string RabbitMQClient::extract_long_string(const std::vector<uint8_t>& data, size_t& offset) {
if (offset + 4 > data.size()) return "";
uint32_t length = (data[offset++] << 24) | (data[offset++] << 16) | (data[offset++] << 8) | data[offset++];
if (offset + length > data.size()) return "";
std::string result(data.begin() + offset, data.begin() + offset + length);
offset += length;
return result;
}
std::string RabbitMQClient::extract_long_long_string(const std::vector<uint8_t>& data, size_t& offset) {
if (offset + 8 > data.size()) return "";
std::string result(data.begin() + offset, data.begin() + offset + 8);
offset += 8;
return result;
}
std::vector<uint8_t> RabbitMQClient::construct_publish_frame(const std::string& exchange, const std::string& routing_key, const std::string& message) {
std::vector<uint8_t> basic_publish_args;
basic_publish_args.push_back(0); // Reserved
basic_publish_args.insert(basic_publish_args.end(), exchange.begin(), exchange.end()); // Exchange name
basic_publish_args.push_back(0); // End of string
basic_publish_args.insert(basic_publish_args.end(), routing_key.begin(), routing_key.end()); // Routing key
basic_publish_args.push_back(0); // End of string
basic_publish_args.push_back(0); // Mandatory flag
basic_publish_args.push_back(0); // Immediate flag
auto basic_publish_frame = construct_method_frame(1, 60, 40, basic_publish_args); // Class 60 = Basic, Method 40 = Publish
// Message body
std::vector<uint8_t> body_frame = construct_frame(3, 1, std::vector<uint8_t>(message.begin(), message.end())); // Type 3 = Body frame
// Combine frames
std::vector<uint8_t> frame;
frame.insert(frame.end(), basic_publish_frame.begin(), basic_publish_frame.end());
frame.insert(frame.end(), body_frame.begin(), body_frame.end());
return frame;
}
std::vector<uint8_t> RabbitMQClient::construct_consume_frame(const std::string& queue) {
std::vector<uint8_t> basic_consume_args;
basic_consume_args.push_back(0); // Reserved
basic_consume_args.insert(basic_consume_args.end(), queue.begin(), queue.end()); // Queue name
basic_consume_args.push_back(0); // End of string
basic_consume_args.push_back(0); // Consumer tag (empty for auto-generated)
basic_consume_args.push_back(0); // End of string
basic_consume_args.push_back(0); // No-local flag
basic_consume_args.push_back(0); // No-ack flag
basic_consume_args.push_back(0); // Exclusive flag
basic_consume_args.push_back(0); // No-wait flag
basic_consume_args.push_back(0); // No arguments
return construct_method_frame(1, 60, 20, basic_consume_args); // Class 60 = Basic, Method 20 = Consume
}
std::vector<uint8_t> RabbitMQClient::construct_bind_queue_frame(const std::string& queue, const std::string& exchange, const std::string& routing_key) {
std::vector<uint8_t> bind_args;
bind_args.push_back(0); // Reserved
bind_args.insert(bind_args.end(), queue.begin(), queue.end()); // Queue name
bind_args.push_back(0); // End of string
bind_args.insert(bind_args.end(), exchange.begin(), exchange.end()); // Exchange name
bind_args.push_back(0); // End of string
bind_args.insert(bind_args.end(), routing_key.begin(), routing_key.end()); // Routing key
bind_args.push_back(0); // End of string
bind_args.push_back(0); // No-wait flag
bind_args.push_back(0); // No arguments
return construct_method_frame(1, 50, 20, bind_args); // Class 50 = Queue, Method 20 = Bind
}
std::vector<uint8_t> RabbitMQClient::construct_unbind_queue_frame(const std::string& queue, const std::string& exchange, const std::string& routing_key) {
std::vector<uint8_t> unbind_args;
unbind_args.push_back(0); // Reserved
unbind_args.insert(unbind_args.end(), queue.begin(), queue.end()); // Queue name
unbind_args.push_back(0); // End of string
unbind_args.insert(unbind_args.end(), exchange.begin(), exchange.end()); // Exchange name
unbind_args.push_back(0); // End of string
unbind_args.insert(unbind_args.end(), routing_key.begin(), routing_key.end()); // Routing key
unbind_args.push_back(0); // End of string
unbind_args.push_back(0); // No-wait flag
unbind_args.push_back(0); // No arguments
return construct_method_frame(1, 50, 50, unbind_args); // Class 50 = Queue, Method 50 = Unbind
}
std::vector<uint8_t> RabbitMQClient::construct_declare_queue_frame(const std::string& queue, bool durable, bool exclusive, bool auto_delete) {
std::vector<uint8_t> declare_args;
declare_args.push_back(0); // Reserved
declare_args.insert(declare_args.end(), queue.begin(), queue.end()); // Queue name
declare_args.push_back(0); // End of string
declare_args.push_back(durable ? 1 : 0); // Durable flag
declare_args.push_back(exclusive ? 1 : 0); // Exclusive flag
declare_args.push_back(auto_delete ? 1 : 0); // Auto-delete flag
declare_args.push_back(0); // No-wait flag
declare_args.push_back(0); // No arguments
return construct_method_frame(1, 50, 10, declare_args); // Class 50 = Queue, Method 10 = Declare
}
std::vector<uint8_t> RabbitMQClient::construct_declare_exchange_frame(const std::string& exchange, const std::string& type, bool durable, bool auto_delete) {
std::vector<uint8_t> declare_args;
declare_args.push_back(0); // Reserved
declare_args.insert(declare_args.end(), exchange.begin(), exchange.end()); // Exchange name
declare_args.push_back(0); // End of string
declare_args.insert(declare_args.end(), type.begin(), type.end()); // Exchange type
declare_args.push_back(0); // End of string
declare_args.push_back(durable ? 1 : 0); // Durable flag
declare_args.push_back(auto_delete ? 1 : 0); // Auto-delete flag
declare_args.push_back(0); // No-wait flag
declare_args.push_back(0); // No arguments
return construct_method_frame(1, 40, 10, declare_args); // Class 40 = Exchange, Method 10 = Declare
}
std::vector<uint8_t> RabbitMQClient::construct_method_frame(uint8_t type, uint16_t channel, uint16_t class_id, uint16_t method_id, const std::vector<uint8_t>& arguments) {
std::vector<uint8_t> payload;
payload.push_back(static_cast<uint8_t>(class_id >> 8)); // Class ID high byte
payload.push_back(static_cast<uint8_t>(class_id & 0xFF)); // Class ID low byte
payload.push_back(static_cast<uint8_t>(method_id >> 8)); // Method ID high byte
payload.push_back(static_cast<uint8_t>(method_id & 0xFF)); // Method ID low byte
payload.insert(payload.end(), arguments.begin(), arguments.end());
return construct_frame(type, channel, payload);
}
std::vector<uint8_t> RabbitMQClient::construct_frame(uint8_t type, uint16_t channel, const std::vector<uint8_t>& payload) {
std::vector<uint8_t> frame;
uint32_t size = static_cast<uint32_t>(payload.size());
// Frame header
frame.push_back(type); // Frame type
frame.push_back(static_cast<uint8_t>(channel >> 8)); // Channel high byte
frame.push_back(static_cast<uint8_t>(channel & 0xFF)); // Channel low byte
frame.push_back(static_cast<uint8_t>(size >> 24)); // Size byte 1
frame.push_back(static_cast<uint8_t>(size >> 16)); // Size byte 2
frame.push_back(static_cast<uint8_t>(size >> 8)); // Size byte 3
frame.push_back(static_cast<uint8_t>(size & 0xFF)); // Size byte 4
// Payload
frame.insert(frame.end(), payload.begin(), payload.end());
// Frame end marker
frame.push_back(0xCE);
return frame;
}
#ifndef RABBITMQ_CLIENT_HPP
#define RABBITMQ_CLIENT_HPP
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <unordered_map>
// Forward declarations for uvw
namespace uvw {
class Loop;
class TCPHandle;
} // namespace uvw
class RabbitMQClient {
public:
using MessageCallback = std::function<void(const std::string& queue, const std::string& message)>;
using ErrorCallback = std::function<void(const std::string& error)>;
using ConnectedCallback = std::function<void()>;
using DisconnectedCallback = std::function<void()>;
using ReconnectedCallback = std::function<void()>;
using QueueDeclaredCallback = std::function<void(const std::string& queue)>;
using ExchangeDeclaredCallback = std::function<void(const std::string& exchange)>;
RabbitMQClient(const std::string& host, int port, const std::string& user, const std::string& password);
~RabbitMQClient();
void connect();
void publish(const std::string& exchange, const std::string& routing_key, const std::string& message);
void consume(const std::string& queue, MessageCallback callback);
void bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key);
void unbind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key);
void declare_queue(const std::string& queue, bool durable, bool exclusive, bool auto_delete);
void declare_exchange(const std::string& exchange, const std::string& type, bool durable, bool auto_delete);
void set_on_error(ErrorCallback callback);
void set_on_connected(ConnectedCallback callback);
void set_on_disconnected(DisconnectedCallback callback);
void set_on_reconnected(ReconnectedCallback callback);
void set_on_queue_declared(QueueDeclaredCallback callback);
void set_on_exchange_declared(ExchangeDeclaredCallback callback);
private:
std::string _host;
int _port;
std::string _user;
std::string _password;
std::shared_ptr<uvw::Loop> _loop;
std::shared_ptr<uvw::TCPHandle> _tcp;
std::unordered_map<std::string, MessageCallback> _consumers;
std::vector<uint8_t> _buffer;
ErrorCallback _on_error;
ConnectedCallback _on_connected;
DisconnectedCallback _on_disconnected;
ReconnectedCallback _on_reconnected;
QueueDeclaredCallback _on_queue_declared;
ExchangeDeclaredCallback _on_exchange_declared;
void send_amqp_header();
void send_amqp_frame(const std::vector<uint8_t>& frame);
size_t parse_frame_size();
void handle_amqp_frame(const std::vector<uint8_t>& frame);
void handle_method_frame(uint16_t channel, const std::vector<uint8_t>& payload);
void handle_connection_start(const std::vector<uint8_t>& payload);
void handle_connection_tune(const std::vector<uint8_t>& payload);
void handle_basic_deliver(const std::vector<uint8_t>& payload);
void handle_header_frame(uint16_t channel, const std::vector<uint8_t>& payload);
void handle_body_frame(uint16_t channel, const std::vector<uint8_t>& payload);
std::string extract_short_string(const std::vector<uint8_t>& data, size_t& offset);
std::string extract_long_string(const std::vector<uint8_t>& data, size_t& offset);
std::string extract_long_long_string(const std::vector<uint8_t>& data, size_t& offset);
std::vector<uint8_t> construct_publish_frame(const std::string& exchange, const std::string& routing_key, const std::string& message);
std::vector<uint8_t> construct_consume_frame(const std::string& queue);
std::vector<uint8_t> construct_bind_queue_frame(const std::string& queue, const std::string& exchange, const std::string& routing_key);
std::vector<uint8_t> construct_unbind_queue_frame(const std::string& queue, const std::string& exchange, const std::string& routing_key);
std::vector<uint8_t> construct_declare_queue_frame(const std::string& queue, bool durable, bool exclusive, bool auto_delete);
std::vector<uint8_t> construct_declare_exchange_frame(const std::string& exchange, const std::string& type, bool durable, bool auto_delete);
std::vector<uint8_t> construct_method_frame(uint8_t type, uint16_t channel, uint16_t class_id, uint16_t method_id, const std::vector<uint8_t>& arguments);
std::vector<uint8_t> construct_frame(uint8_t type, uint16_t channel, const std::vector<uint8_t>& payload);
};
#endif // RABBITMQ_CLIENT_HPP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment