diff --git a/lib/sensu/client.rb b/lib/sensu/client.rb index 479c12b..49872f2 100644 --- a/lib/sensu/client.rb +++ b/lib/sensu/client.rb @@ -233,12 +233,14 @@ module Sensu socket.logger = @logger socket.settings = @settings socket.amq = @amq + socket.queue = @check_request_queue diff --git a/lib/sensu/client.rb b/lib/sensu/client.rb index 479c12b..49872f2 100644 --- a/lib/sensu/client.rb +++ b/lib/sensu/client.rb @@ -233,12 +233,14 @@ module Sensu socket.logger = @logger socket.settings = @settings socket.amq = @amq + socket.queue = @check_request_queue diff --git a/lib/sensu/client.rb b/lib/sensu/client.rb index 479c12b..49872f2 100644 --- a/lib/sensu/client.rb +++ b/lib/sensu/client.rb @@ -233,12 +233,14 @@ module Sensu socket.logger = @logger socket.settings = @settings socket.amq = @amq + socket.queue = @check_request_queue end @logger.debug('binding client udp socket') EM::open_datagram_socket('127.0.0.1', 3030, Socket) do |socket| socket.logger = @logger socket.settings = @settings socket.amq = @amq + socket.queue = @check_request_queue socket.reply = false end end diff --git a/lib/sensu/socket.rb b/lib/sensu/socket.rb index 136dc98..cd2addf 100644 --- a/lib/sensu/socket.rb +++ b/lib/sensu/socket.rb @@ -1,6 +1,6 @@ module Sensu class Socket < EM::Connection - attr_accessor :logger, :settings, :amq, :reply + attr_accessor :logger, :settings, :amq, :reply, :queue def respond(data) unless @reply == false @@ -15,6 +15,24 @@ module Sensu elsif data.strip == 'ping' @logger.debug('socket received ping') respond('pong') + elsif data.strip =~ /^subscribe\s/ + @logger.debug('socket received subscribe command') + subscriptions = data.strip.split(/\s/) + subscriptions.shift + subscriptions.each do |subscription| + @logger.debug("subscribing to #{subscription}") + @queue.bind(@amq.fanout(subscription)) + respond('ok') + end + elsif data.strip =~ /^unsubscribe\s/ + @logger.debug('socket received subscribe command') + subscriptions = data.strip.split(/\s/) + subscriptions.shift + subscriptions.each do |subscription| + @logger.debug("unsubscribing from #{subscription}") + @queue.unbind(@amq.fanout(subscription)) + respond('ok') + end else @logger.debug('socket received data', { :data => data