Created
November 9, 2010 16:17
-
-
Save octplane/669309 to your computer and use it in GitHub Desktop.
Revisions
-
octplane created this gist
Nov 9, 2010 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,86 @@ require 'rubygems' require 'mongo' module MongoPubSub QUEUES_COLLECTION = 'queues' class EndSubscriptionException < Exception; end class Publisher def initialize(queue_name, mongo_connection) # Initialize queue collection as a capped collection if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name) mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 }) end @publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name] end def push(message) document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message } @publish_collection.save(document) end end class Subscriber def initialize(queue_name, mongo_connection) @subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name] # Find latest event in collection, we will ignore all earlier events @earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id'] end def each tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]]) while true do doc = tail.next_document if doc != nil && doc["_id"].to_i > @earliest begin yield doc['message'] rescue EndSubscriptionException break end elsif doc != nil # This event is too old, ignore it else # No event to process. Wait a bit. sleep(1) end end end end end if __FILE__ == $0 # Simple Pub/Sub test # Create a subscriber and published on the test queue # send a message that's ignored and send 4 valid message # Ends with a special meaning message that stops properly # the subcriber m = Mongo::Connection.new('127.0.0.1', 27017) puts "Starting Publisher" p = MongoPubSub::Publisher.new('test', m) # this message should not display p.push("This message is too old, ignore me.") puts "Starting Subscriber" n = Mongo::Connection.new('127.0.0.1', 27017) s = MongoPubSub::Subscriber.new('test', n) subcriber = Thread.new do puts "Now waiting for messages" s.each do |message| puts "Subscriber: received \"#{message}\"" # handle exit message correctly if message == "Exit" raise MongoPubSub::EndSubscriptionException.new end end puts "Ending Subscriber !" end ['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes| puts "Publisher #{mes}" p.push(mes) end subcriber.join end