require 'rubygems' require 'mongo' module MongoPubSub QUEUES_COLLECTION = 'queues' class EndSubscriptionException < Exception; end class Publisher def initialize(queue_name, mongo_connection) # Initialize queue collection as a capped collection if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name) mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 }) end @publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name] end def push(message) document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message } @publish_collection.save(document) end end class Subscriber def initialize(queue_name, mongo_connection) @subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name] # Find latest event in collection, we will ignore all earlier events @earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id'] end def each tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]]) while true do doc = tail.next_document if doc != nil && doc["_id"].to_i > @earliest begin yield doc['message'] rescue EndSubscriptionException break end elsif doc != nil # This event is too old, ignore it else # No event to process. Wait a bit. sleep(1) end end end end end if __FILE__ == $0 # Simple Pub/Sub test # Create a subscriber and published on the test queue # send a message that's ignored and send 4 valid message # Ends with a special meaning message that stops properly # the subcriber m = Mongo::Connection.new('127.0.0.1', 27017) puts "Starting Publisher" p = MongoPubSub::Publisher.new('test', m) # this message should not display p.push("This message is too old, ignore me.") puts "Starting Subscriber" n = Mongo::Connection.new('127.0.0.1', 27017) s = MongoPubSub::Subscriber.new('test', n) subcriber = Thread.new do puts "Now waiting for messages" s.each do |message| puts "Subscriber: received \"#{message}\"" # handle exit message correctly if message == "Exit" raise MongoPubSub::EndSubscriptionException.new end end puts "Ending Subscriber !" end ['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes| puts "Publisher #{mes}" p.push(mes) end subcriber.join end