Last active
April 29, 2019 01:13
-
-
Save l0s/d5e730feeb3ce533a0b05d01769049ee to your computer and use it in GitHub Desktop.
Distributed Queue Processing in C
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <pthread.h> | |
| #include <string.h> | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <stdbool.h> | |
| #include <unistd.h> | |
| #include <errno.h> | |
| struct StringNode { | |
| char* string; | |
| struct StringNode* next; | |
| }; | |
| struct StringNode* init_StringNode( const char* string ) { | |
| struct StringNode* retval = malloc( sizeof( struct StringNode ) ); | |
| retval->string = malloc( strlen( string ) + 1 ); | |
| retval->string = strcpy( retval->string, string ); | |
| return retval; | |
| } | |
| void destroy_StringNode( struct StringNode* node ) { | |
| free( node->string ); | |
| free( node ); | |
| } | |
| struct WorkerContext { | |
| struct StringNode* queue_head; | |
| struct StringNode* queue_tail; | |
| pthread_mutex_t queue_lock; | |
| pthread_cond_t queue_monitor; | |
| bool active; | |
| }; | |
| void destroy_WorkerContext( struct WorkerContext* context ) { | |
| struct StringNode* node = context->queue_head; | |
| while( node != NULL ) { | |
| destroy_StringNode( node ); | |
| node = node->next; | |
| } | |
| pthread_mutex_destroy( &context->queue_lock ); | |
| pthread_cond_destroy( &context->queue_monitor ); | |
| free( context ); | |
| } | |
| struct WorkerContext* init_WorkerContext() { | |
| struct WorkerContext* retval = malloc( sizeof( struct WorkerContext ) ); | |
| retval->queue_head = NULL; | |
| retval->queue_tail = NULL; | |
| const int mutex_status = pthread_mutex_init( &retval->queue_lock, NULL ); | |
| if( mutex_status == EINVAL ) { | |
| printf( "invalid mutex attr\n"); | |
| destroy_WorkerContext( retval ); | |
| return NULL; | |
| } else if( mutex_status == ENOMEM ) { | |
| printf( "no memory to create mutex\n" ); | |
| destroy_WorkerContext( retval ); | |
| return NULL; | |
| } | |
| const int cond_status = pthread_cond_init( &retval->queue_monitor, NULL ); | |
| if( cond_status == EINVAL ) { | |
| printf( "invalid cond attr\n" ); | |
| destroy_WorkerContext( retval ); | |
| return NULL; | |
| } else if( cond_status == ENOMEM ) { | |
| printf( "no memory to create condition\n" ); | |
| destroy_WorkerContext( retval ); | |
| return NULL; | |
| } else if( cond_status == EAGAIN ) { | |
| printf( "temporary lack of condition resources\n" ); | |
| destroy_WorkerContext( retval ); | |
| return NULL; | |
| } | |
| retval->active = true; | |
| return retval; | |
| } | |
| int get_tid() { | |
| return ( int )( pthread_self() % 10l ); | |
| } | |
| void* process_queue( void* argument ) { | |
| struct WorkerContext* context = ( struct WorkerContext* )argument; | |
| while( true ) { | |
| printf( "T%d: locking the queue\n", get_tid() ); | |
| const int lock_status = pthread_mutex_lock( &context->queue_lock ); | |
| if( lock_status == EINVAL ) { | |
| printf( "T%d: invalid mutex, bailing\n", get_tid() ); | |
| return NULL; | |
| } else if( lock_status == EDEADLK ) { | |
| printf( "T%d: deadlock detected, bailing\n", get_tid() ); | |
| return NULL; | |
| } | |
| struct StringNode* node = context->queue_head; | |
| if( node == NULL ) { | |
| if( !context->active ) { | |
| printf( "T%d: queue is empty and context inactive, exiting\n", get_tid() ); | |
| pthread_mutex_unlock( &context->queue_lock ); | |
| return NULL; | |
| } | |
| printf( "T%d: queue is empty, waiting...\n", get_tid() ); | |
| pthread_cond_wait( &context->queue_monitor, &context->queue_lock ); | |
| node = context->queue_head; | |
| if( node == NULL ) { | |
| printf( "T%d: queue is still empty after waking\n", get_tid() ); | |
| pthread_mutex_unlock( &context->queue_lock ); | |
| if( !context->active ) { | |
| printf( "T%d: context was deactivated while waiting, exiting\n", get_tid() ); | |
| return NULL; | |
| } | |
| continue; | |
| } | |
| } | |
| context->queue_head = node->next; | |
| pthread_mutex_unlock( &context->queue_lock ); | |
| // process the record | |
| printf( "T%d: processing %s\n", get_tid(), node->string ); | |
| sleep( 1 ); | |
| destroy_StringNode( node ); | |
| } | |
| } | |
| int main( const int argc, const char* argv[] ) { | |
| struct WorkerContext* context = init_WorkerContext(); | |
| const int concurrency = 2; | |
| pthread_t thread_pool[ concurrency ]; | |
| for( int i = concurrency; --i>= 0; ) { | |
| pthread_create( &thread_pool[ i ], | |
| NULL, | |
| process_queue, | |
| ( void* )context ); | |
| } | |
| printf( "main: threads created\n" ); | |
| const int length = 6; | |
| char* input[ length ]; | |
| input[ 0 ] = "foo"; | |
| input[ 1 ] = "bar"; | |
| input[ 2 ] = "baz"; | |
| input[ 3 ] = "alice"; | |
| input[ 4 ] = "bob"; | |
| input[ 5 ] = "mallory"; | |
| for( int i = 0; i < length; i++ ) { | |
| pthread_mutex_lock( &context->queue_lock ); | |
| struct StringNode* node = init_StringNode( input[ i ] ); | |
| printf( "main: queuing %s\n", node->string ); | |
| if( context->queue_head == NULL ) { | |
| context->queue_head = node; | |
| } else { | |
| context->queue_tail->next = node; | |
| } | |
| context->queue_tail = node; | |
| // wake up one worker | |
| printf( "main: waking up one worker\n" ); | |
| pthread_cond_signal( &context->queue_monitor ); | |
| pthread_mutex_unlock( &context->queue_lock ); | |
| } | |
| context->active = false; | |
| printf( "main: waiting for the workers to complete\n" ); | |
| for( int i = concurrency; --i >= 0; ) { | |
| pthread_join( thread_pool[ i ], NULL ); | |
| } | |
| printf( "-- main: exiting\n"); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| main: threads created | |
| main: queuing foo | |
| main: waking up one worker | |
| T2: locking the queue | |
| main: queuing bar | |
| main: waking up one worker | |
| T6: locking the queue | |
| T6: processing foo | |
| main: queuing baz | |
| main: waking up one worker | |
| main: queuing alice | |
| main: waking up one worker | |
| main: queuing bob | |
| main: waking up one worker | |
| main: queuing mallory | |
| main: waking up one worker | |
| main: waiting for the workers to complete | |
| T2: processing bar | |
| T6: locking the queue | |
| T6: processing baz | |
| T2: locking the queue | |
| T2: processing alice | |
| T6: locking the queue | |
| T6: processing bob | |
| T2: locking the queue | |
| T2: processing mallory | |
| T6: locking the queue | |
| T6: queue is empty and context inactive, exiting | |
| T2: locking the queue | |
| T2: queue is empty and context inactive, exiting | |
| -- main: exiting |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Linux 615e3740dda0 4.9.125-linuxkit #1 SMP Fri Sep 7 08:20:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux | |
| Compiled with: | |
| gcc-8 -g -Wno-unused-result -Wreturn-type -Wmain -Werror=return-type -Werror=main -pipe -O2 -std=c11 mutex.c -o output/mutex -lpthread -lm |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment