|
|
@@ -0,0 +1,86 @@ |
|
|
require 'rubygems' |
|
|
require 'mongo' |
|
|
|
|
|
module MongoPubSub |
|
|
QUEUES_COLLECTION = 'queues' |
|
|
class EndSubscriptionException < Exception; end |
|
|
class Publisher |
|
|
def initialize(queue_name, mongo_connection) |
|
|
# Initialize queue collection as a capped collection |
|
|
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name) |
|
|
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 }) |
|
|
end |
|
|
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name] |
|
|
end |
|
|
|
|
|
def push(message) |
|
|
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message } |
|
|
@publish_collection.save(document) |
|
|
end |
|
|
end |
|
|
class Subscriber |
|
|
def initialize(queue_name, mongo_connection) |
|
|
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name] |
|
|
# Find latest event in collection, we will ignore all earlier events |
|
|
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id'] |
|
|
end |
|
|
def each |
|
|
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]]) |
|
|
while true do |
|
|
doc = tail.next_document |
|
|
if doc != nil && doc["_id"].to_i > @earliest |
|
|
begin |
|
|
yield doc['message'] |
|
|
rescue EndSubscriptionException |
|
|
break |
|
|
end |
|
|
elsif doc != nil |
|
|
# This event is too old, ignore it |
|
|
else |
|
|
# No event to process. Wait a bit. |
|
|
sleep(1) |
|
|
end |
|
|
end |
|
|
end |
|
|
end |
|
|
end |
|
|
|
|
|
|
|
|
if __FILE__ == $0 |
|
|
# Simple Pub/Sub test |
|
|
# Create a subscriber and published on the test queue |
|
|
# send a message that's ignored and send 4 valid message |
|
|
# Ends with a special meaning message that stops properly |
|
|
# the subcriber |
|
|
|
|
|
m = Mongo::Connection.new('127.0.0.1', 27017) |
|
|
puts "Starting Publisher" |
|
|
p = MongoPubSub::Publisher.new('test', m) |
|
|
|
|
|
# this message should not display |
|
|
p.push("This message is too old, ignore me.") |
|
|
|
|
|
puts "Starting Subscriber" |
|
|
n = Mongo::Connection.new('127.0.0.1', 27017) |
|
|
s = MongoPubSub::Subscriber.new('test', n) |
|
|
|
|
|
|
|
|
subcriber = Thread.new do |
|
|
puts "Now waiting for messages" |
|
|
s.each do |message| |
|
|
puts "Subscriber: received \"#{message}\"" |
|
|
# handle exit message correctly |
|
|
if message == "Exit" |
|
|
raise MongoPubSub::EndSubscriptionException.new |
|
|
end |
|
|
end |
|
|
puts "Ending Subscriber !" |
|
|
end |
|
|
|
|
|
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes| |
|
|
puts "Publisher #{mes}" |
|
|
p.push(mes) |
|
|
end |
|
|
|
|
|
subcriber.join |
|
|
end |