Skip to content

Instantly share code, notes, and snippets.

@ryansmith3136
Created April 13, 2012 06:40
Show Gist options
  • Select an option

  • Save ryansmith3136/2374465 to your computer and use it in GitHub Desktop.

Select an option

Save ryansmith3136/2374465 to your computer and use it in GitHub Desktop.
Process Partitioning

Process Partitioning

The Problem

When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges. Our process may fall behind in the backlog of work which can produce a situation in which it is mathematically impossible for our process to catch up. Our singleton process could crash and leave our system in a degraded state. The mean latency of data processing could be dramatically affected by outlying situations. For these reasons, we wish to design a system which allows N number of processes to work on a single data set.

In order to arrive at a possible solution, let me outline some assumptions of the system.

  • Data store containing items to be processed.
  • Data store with atomic updates or conditional puts.
  • Horizontally scalable platform that contains homogenous environment variables.

Solution

Our approach involves distributing the workload over N processors. Each processor will coordinate with a centralized data store to obtain an integer based identity. Thus each process will be identified by an integer from 0 to N. The processor will use it's identity to exclusively find work. We will assign each item of work an integer value and the processor will select the item of work if the item's value modulo N is equal to the processor's id. Let us explore the details of our approach.

Identity Coordination

Each processor should have access to N --which is the maximum number of processors. N can be an environment variable defined in each processor's memory. Upon initialization, each processor will successively choose a number from 0 to N until the processor can globally lock it's identity. To lock an identity, each processor must request a lock on the identity with a central data store. Tools like lock-smith provide a convenient way to acquire a global lock. The following code snippet is an example of identity coordination in Ruby:

def acquire_lock
  ENV["N"].to_i.times do |i|
    Locksmith::Dynamodb.lock("my-process-#{i}") do
      yield(i) #critical section
    end
  end
end

Work Item Selection

Selecting data for each processor to process will be determined by the data store containing the data to be processed. Stores supporting rich predicate analysis (e.g. SQL) will allow the processor to submit a query for data based on a predicate where as store's like Dynamodb will require the processor to scan data into memory and apply the predicates locally. You should take your data's size into consideration when choosing the store for your data to be processed. Scanning the table into memory may not be feasible. An example of both approaches in Ruby:

SQL

acquire_lock do |partition|
  sql = "select * from awaiting_processing where MOD(id, ?) = ?"
  DB.exec(sql, Integer(ENV["N"]), partition)
end

In-Memory Scan

acquire_lock do |partition|
  DB.scan.select do |item|
    item.id % Integer(ENV["N"]) == partition
  end
end

One caveat with the previous examples... It may not always be possible to have an integer based identity on your items-to-be-processed. In these cases we can use the CRC-32 algorithm to produce an checksum of the bytes of data and use the checksum on our modulo computation.

acquire_lock do |partition|
  DB.scan.select do |item|
    Zlib.crc32(item.id) % Integer(ENV["N"]) == partition
  end
end

Fault Tolerance

In order to address the problem of maximizing the availability of our processors, we need only keep redundant processor online. If a processor should fail, it's lock will be released allowing a redundant processor to acquire the lock in the identity coordination phase. For critical systems, keeping 2*N processor should be sufficient.

Links

Updates

  • 2012-10-14 - Generalize article. Discuss the solution in more generality.
  • 2012-04-12 - First draft. Addresses only Ruby + PostgreSQL

Author

Ryan Smith

@ryandotsmith, ace hacker, builds distributed systems at heroku.

This article was motivated by many success and failures experienced with production systems at Heroku.

@wuputah
Copy link

wuputah commented Oct 16, 2012

I think this is effectively N queues (instead of using a single shared queue). It has the benefit of not contending over jobs, instead you contend to process a particular queue. The downside is if any particular queue falls behind, there's no one to help it catch up.

As such, I would guess that it performs better overall than a single shared queue when N is large (>100?) since there's less contention, but it may cause larger perc95+ times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment