Skip to content

Instantly share code, notes, and snippets.

@bcambel
Forked from octplane/mongo_pubsub.rb
Created November 16, 2011 14:08
Show Gist options
  • Select an option

  • Save bcambel/1370138 to your computer and use it in GitHub Desktop.

Select an option

Save bcambel/1370138 to your computer and use it in GitHub Desktop.

Revisions

  1. @octplane octplane created this gist Nov 9, 2010.
    86 changes: 86 additions & 0 deletions mongo_pubsub.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    require 'rubygems'
    require 'mongo'

    module MongoPubSub
    QUEUES_COLLECTION = 'queues'
    class EndSubscriptionException < Exception; end
    class Publisher
    def initialize(queue_name, mongo_connection)
    # Initialize queue collection as a capped collection
    if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name)
    mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 })
    end
    @publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
    end

    def push(message)
    document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message }
    @publish_collection.save(document)
    end
    end
    class Subscriber
    def initialize(queue_name, mongo_connection)
    @subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
    # Find latest event in collection, we will ignore all earlier events
    @earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id']
    end
    def each
    tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]])
    while true do
    doc = tail.next_document
    if doc != nil && doc["_id"].to_i > @earliest
    begin
    yield doc['message']
    rescue EndSubscriptionException
    break
    end
    elsif doc != nil
    # This event is too old, ignore it
    else
    # No event to process. Wait a bit.
    sleep(1)
    end
    end
    end
    end
    end


    if __FILE__ == $0
    # Simple Pub/Sub test
    # Create a subscriber and published on the test queue
    # send a message that's ignored and send 4 valid message
    # Ends with a special meaning message that stops properly
    # the subcriber

    m = Mongo::Connection.new('127.0.0.1', 27017)
    puts "Starting Publisher"
    p = MongoPubSub::Publisher.new('test', m)

    # this message should not display
    p.push("This message is too old, ignore me.")

    puts "Starting Subscriber"
    n = Mongo::Connection.new('127.0.0.1', 27017)
    s = MongoPubSub::Subscriber.new('test', n)


    subcriber = Thread.new do
    puts "Now waiting for messages"
    s.each do |message|
    puts "Subscriber: received \"#{message}\""
    # handle exit message correctly
    if message == "Exit"
    raise MongoPubSub::EndSubscriptionException.new
    end
    end
    puts "Ending Subscriber !"
    end

    ['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes|
    puts "Publisher #{mes}"
    p.push(mes)
    end

    subcriber.join
    end