Created
April 13, 2012 06:40
-
-
Save ryansmith3136/2374465 to your computer and use it in GitHub Desktop.
Revisions
-
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -8,9 +8,7 @@ When working with large, high volume, low latency systems, it is often the case * Our singleton process could crash and leave our system in a degraded state. * The average latency of data processing could be dramatically affected by outlying cases. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. * Data store containing items to be processed. * Data store with atomic updates or conditional puts. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 7 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,7 +2,13 @@ ### The Problem When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges: * Our process may fall behind resulting in a situation which it is impossible for our process to catch up. * Our singleton process could crash and leave our system in a degraded state. * The average latency of data processing could be dramatically affected by outlying cases. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,7 +2,7 @@ ### The Problem When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges; our process may fall behind resulting in a situation which it is impossible for our process to catch up. Our singleton process could crash and leave our system in a degraded state. The average latency of data processing could be dramatically affected by outlying cases. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 4 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -65,6 +65,10 @@ end In order to address the problem of maximizing the availability of our processors, we need only keep redundant processor online. If a processor should fail, it's lock will be released allowing a redundant processor to acquire the lock in the identity coordination phase. For critical systems, keeping 2*N processor should be sufficient. ### Conclusion Process partitioning provides a way to process great amounts of data in parallel. It offer a simple design that can be implemented in any language on a variety of data stores. This approach is a great alternative to commonly seen queue based approaches. In fact, there are many cases in which this approach provides a greater level of concurrency which will allow a more robust data processing system. The proof of the concurrency improvements will be an exercise left to the reader. ### Links * [lock-smith](https://github.com/ryandotsmith/lock-smith) A locking toolkit for Ruby. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,7 +51,7 @@ acquire_lock do |partition| end ``` One caveat with the previous examples... It may not always be possible to have an integer based identity on your items-to-be-processed. In these cases we can use the CRC-32 algorithm to produce an checksum of the bytes of data and use the checksum in our modulo computation. ```ruby acquire_lock do |partition| -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -36,7 +36,7 @@ Selecting data for each processor to process will be determined by the data stor ```ruby acquire_lock do |partition| sql = "select * from items_to_be_processed where MOD(id, ?) = ?" DB.exec(sql, Integer(ENV["N"]), partition) end ``` -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,7 +30,7 @@ end #### Work Item Selection Selecting data for each processor to process will be determined by the data store containing the items to be processed. Data stores supporting predicate analysis (e.g. SQL) will allow the processor to submit a query for data based on our modulo predicate. Data stores like Dynamodb will require the processor to scan data into memory and apply the predicates locally. You should take your data's size into consideration when choosing the store for your items to be processed. Scanning the table into memory may not be feasible. An example of both approaches in Ruby: **SQL** -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,7 +16,7 @@ Our approach involves distributing the workload over N processors. Each processo #### Identity Coordination Each processor should have access to N --which is the maximum number of processors. N can be an environment variable defined in each processor's memory. Upon initialization, each processor will successively choose a number from 0 to N until the processor can globally lock it's identity. To lock an identity, each processor must request a lock on the identity with a central data store. Tools such as [lock-smith](https://github.com/ryandotsmith/lock-smith) provide a convenient way to acquire a global lock. The following code snippet is an example of identity coordination in Ruby: ```ruby def acquire_lock -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,7 +16,7 @@ Our approach involves distributing the workload over N processors. Each processo #### Identity Coordination Each processor should have access to N --which is the maximum number of processors. N can be an environment variable defined in each processor's memory. Upon initialization, each processor will successively choose a number from 0 to N until the processor can globally lock it's identity. To lock an identity, each processor must request a lock on the identity with a central data store. Tools like [lock-smith](https://github.com/ryandotsmith/lock-smith) provide a convenient way to acquire a global lock. The following code snippet is an example of identity coordination in Ruby: ```ruby def acquire_lock -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,7 +12,7 @@ In order to arrive at a possible solution, let me outline some assumptions of th ### Solution Our approach involves distributing the workload over N processors. Each processor will coordinate with a centralized data store to obtain an integer based identity. Thus each process will be identified by an integer from 0 to N. The processor will use it's identity to exclusively find work. We will assign each item of work an integer value and the processor will select the item of work if the item's value modulo N is equal to the processor's id. Let us explore the details of our approach. #### Identity Coordination -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,7 +4,7 @@ When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges. Our process may fall behind in the backlog of work which can produce a situation in which it is mathematically impossible for our process to catch up. Our singleton process could crash and leave our system in a degraded state. The mean latency of data processing could be dramatically affected by outlying situations. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. * Data store containing items to be processed. * Data store with atomic updates or conditional puts. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 5 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,7 +4,11 @@ When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges. Our process may fall behind in the backlog of work which can produce a situation in which it is mathematically impossible for our process to catch up. Our singleton process could crash and leave our system in a degraded state. The mean latency of data processing could be dramatically affected by outlying situations. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. There exists: * Data store containing items to be processed. * Data store with atomic updates or conditional puts. * Horizontally scalable platform that contains homogenous environment variables. ### Solution -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 0 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -83,5 +83,3 @@ builds distributed systems at [heroku](https://www.heroku.com). This article was motivated by many success and failures experienced with production systems at Heroku. -
♠ ace hacker revised this gist
Oct 15, 2012 . 1 changed file with 50 additions and 45 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,72 +1,77 @@ # Process Partitioning ### The Problem When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges. Our process may fall behind in the backlog of work which can produce a situation in which it is mathematically impossible for our process to catch up. Our singleton process could crash and leave our system in a degraded state. The mean latency of data processing could be dramatically affected by outlying situations. For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system. There exists a network accessible collection of data. Inside the collection there exists atoms; these atoms could be rows, columns, or items in a relational or column oriented database. In addition to the network access to the atoms, we also need a concurrency mechanism; this can be an atomic update or a conditional put. Finally, each atom must provide a key that we can access for our partitioning strategy. PostgreSQL, MySQL, Casandra, DynamoDB, MongoDB all satisfy these properties. ### Solution Our approach involves distributing the workload over N processors. Each processor will coordinate with a centralized data store to obtain an integer based identity. Thus each process will be identified by an integer from 0 to N. The processor will use it's identity to exclusively find work. We will assign each item of work an integer value and the processor will select the item of work if the item's value modulo N is equal to the processor's id. Now let us explore the details of our approach. #### Identity Coordination Each processor should have access to N. N can be an environment variable defined in each processor's memory. Upon initialization, each processor will successively choose a number from 0 to N until the processor can globally lock it's identity. To lock an identity, each processor must request a lock on the identity with a central data store. Tools like [lock-smith](https://github.com/ryandotsmith/lock-smith) provide a convenient way to acquire a global lock. The following code snippet is an example of identity coordination in Ruby: ```ruby def acquire_lock ENV["N"].to_i.times do |i| Locksmith::Dynamodb.lock("my-process-#{i}") do yield(i) #critical section end end end ``` #### Work Item Selection Selecting data for each processor to process will be determined by the data store containing the data to be processed. Stores supporting rich predicate analysis (e.g. SQL) will allow the processor to submit a query for data based on a predicate where as store's like Dynamodb will require the processor to scan data into memory and apply the predicates locally. You should take your data's size into consideration when choosing the store for your data to be processed. Scanning the table into memory may not be feasible. An example of both approaches in Ruby: **SQL** ```ruby acquire_lock do |partition| sql = "select * from awaiting_processing where MOD(id, ?) = ?" DB.exec(sql, Integer(ENV["N"]), partition) end ``` **In-Memory Scan** ```ruby acquire_lock do |partition| DB.scan.select do |item| item.id % Integer(ENV["N"]) == partition end end ``` One caveat with the previous examples... It may not always be possible to have an integer based identity on your items-to-be-processed. In these cases we can use the CRC-32 algorithm to produce an checksum of the bytes of data and use the checksum on our modulo computation. ```ruby acquire_lock do |partition| DB.scan.select do |item| Zlib.crc32(item.id) % Integer(ENV["N"]) == partition end end ``` #### Fault Tolerance In order to address the problem of maximizing the availability of our processors, we need only keep redundant processor online. If a processor should fail, it's lock will be released allowing a redundant processor to acquire the lock in the identity coordination phase. For critical systems, keeping 2*N processor should be sufficient. ### Links * [lock-smith](https://github.com/ryandotsmith/lock-smith) A locking toolkit for Ruby. * [ddbsync](https://github.com/ryandotsmith/ddbsync) DynamoDB based locking mechanism for Go. * [pg_advisory_lock](http://www.postgresql.org/docs/9.1/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS) * [MySQL GET_LOCK](http://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_get-lock) ### Updates * 2012-10-14 - Generalize article. Discuss the solution in more generality. * 2012-04-12 - First draft. Addresses only Ruby + PostgreSQL ### Author -
♠ ace hacker renamed this gist
Oct 15, 2012 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Ryan Smith (ace hacker) revised this gist
Apr 13, 2012 . 1 changed file with 14 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,22 +3,22 @@ ### The Problem I have a table with millions of rows that needs processing. The type of processing is not really important for this article. We simply need to process some data with Ruby --or any other user-space programming language. ### A Solution My environment already is dependent on Ruby & PostgreSQL so I want a solution that leverages my existing technologies. Also, I don't want to create a table other than the one which I need to process. Subsequently solutions like [queue_classic](https://github.com/ryandotsmith/queue_classic) are off of the table. Thus I have devised the following algorithm: * Ensure the table has a serial integer column. * Each Ruby process will take a unique integer. * The process's integer will start at 0 and be less than or equal to the max number of running processes. * The process will only work rows such that the id of the row mod max number of running processes equals the processes' unique integer. @@ -52,8 +52,8 @@ end Make this file executable and you will be able to execute this file in 10 seperate processes all of which are working in parallel. The elegant feature of this code is the use of PostgreSQL's advisory lock (a lightweight locking utility). Each PostgreSQL session is elgable to lock an integer key. When the PostgreSQL session is disconnected the key is returned. @@ -64,6 +64,7 @@ than one function. ### Links * [pg_advisory_lock](http://www.postgresql.org/docs/9.1/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS) * [MySQL GET_LOCK](http://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_get-lock) * [Leader Election](http://en.wikipedia.org/wiki/Leader_election) * [mfine](https://github.com/mfine) -
Ryan Smith (ace hacker) renamed this gist
Apr 13, 2012 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
♠ ace hacker created this gist
Apr 13, 2012 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,81 @@ # Leader Election With Ruby & PostgreSQL ### The Problem I have a table with millions of rows that needs processing. The type of processing is not really important with the exception that the processing needs to be done with Ruby --or any other type of user-space programming language. ### A Solution I want the simplest possible solution using only Ruby and PostgreSQL as my production dependencies. I also don't want to create table other than the one in which I need to process. Subsequently solutions like [queue_classic](https://github.com/ryandotsmith/queue_classic) are off of the table. Thus I have devised the following algorithm: * Ensure table has a serial integer column. * Each Ruby process will take a unique integer. * The integer will start at 0 and be less than or equal to the max number of running processes. * The process will only work rows such that the id of the row mod max number of running processes equals the processes' unique integer. The following algorithm ensures that no two workers will attempt to process the same rows. This will reduce contention on our table and allow greater throughput than if we were only running a single process. ### The Code ```ruby MAX_WORKERS = 10 worker = nil MAX_WORKERS.times do |i| if lock = DB["SELECT pg_try_advisory_lock(?)", i].get worker = i break end end if worker while(true) r = DB["SELECT * FROM t WHERE MOD(id, ?) = ?", MAX_WORKERS, worker].get process(r) end else puts("unable to work. increase MAX_WORKERS") end ``` Make this file executable and you will be able to execute this file in 10 seperate processes all of which are working in parallel. The elegant feature of this code is the use of PostgreSQL's advisory lock. This mechanism is a lightweight locking utility. Each PostgreSQL session is elgable to lock an integer key. When the PostgreSQL session is disconnected the key is returned. *Note* that the key is shared with anyone connecting to the PostgreSQL backend. This can cause confusion if you are using the lock for more than one function. ### Links * [pg_advisory_lock](http://www.postgresql.org/docs/9.1/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS) * [Leader Election](http://en.wikipedia.org/wiki/Leader_election) * [mfine](https://github.com/mfine) ### Author **Ryan Smith** [@ryandotsmith](http://twitter.com/ryandotsmith), [ace hacker](http://ryandotsmith.heroku.com), builds distributed systems at [heroku](https://www.heroku.com). This article was motivated by many success and failures experienced with production systems at Heroku. Last updated: 2012-04-12