I have a table with millions of rows that needs processing. The type of processing is not really important with the exception that the processing needs to be done with Ruby --or any other type of user-space programming language.
I want the simplest possible solution using only Ruby and PostgreSQL as my production dependencies. I also don't want to create table other than the one in which I need to process. Subsequently solutions like queue_classic are off of the table. Thus I have devised the following algorithm:
- Ensure table has a serial integer column.
- Each Ruby process will take a unique integer.
- The integer will start at 0 and be less than or equal to the max number of running processes.
- The process will only work rows such that the id of the row mod max number of running processes equals the processes' unique integer.
The following algorithm ensures that no two workers will attempt to process the same rows. This will reduce contention on our table and allow greater throughput than if we were only running a single process.
MAX_WORKERS = 10
worker = nil
MAX_WORKERS.times do |i|
if lock = DB["SELECT pg_try_advisory_lock(?)", i].get
worker = i
break
end
end
if worker
while(true)
r = DB["SELECT * FROM t WHERE MOD(id, ?) = ?", MAX_WORKERS, worker].get
process(r)
end
else
puts("unable to work. increase MAX_WORKERS")
endMake this file executable and you will be able to execute this file in 10 seperate processes all of which are working in parallel.
The elegant feature of this code is the use of PostgreSQL's advisory lock. This mechanism is a lightweight locking utility. Each PostgreSQL session is elgable to lock an integer key. When the PostgreSQL session is disconnected the key is returned.
Note that the key is shared with anyone connecting to the PostgreSQL backend. This can cause confusion if you are using the lock for more than one function.
Ryan Smith
@ryandotsmith, ace hacker, builds distributed systems at heroku.
This article was motivated by many success and failures experienced with production systems at Heroku.
Last updated: 2012-04-12
I think this is effectively N queues (instead of using a single shared queue). It has the benefit of not contending over jobs, instead you contend to process a particular queue. The downside is if any particular queue falls behind, there's no one to help it catch up.
As such, I would guess that it performs better overall than a single shared queue when N is large (>100?) since there's less contention, but it may cause larger perc95+ times.