Skip to content

Instantly share code, notes, and snippets.

@l0s
Last active April 29, 2019 01:13
Show Gist options
  • Select an option

  • Save l0s/d5e730feeb3ce533a0b05d01769049ee to your computer and use it in GitHub Desktop.

Select an option

Save l0s/d5e730feeb3ce533a0b05d01769049ee to your computer and use it in GitHub Desktop.
Distributed Queue Processing in C
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <errno.h>
struct StringNode {
char* string;
struct StringNode* next;
};
struct StringNode* init_StringNode( const char* string ) {
struct StringNode* retval = malloc( sizeof( struct StringNode ) );
retval->string = malloc( strlen( string ) + 1 );
retval->string = strcpy( retval->string, string );
return retval;
}
void destroy_StringNode( struct StringNode* node ) {
free( node->string );
free( node );
}
struct WorkerContext {
struct StringNode* queue_head;
struct StringNode* queue_tail;
pthread_mutex_t queue_lock;
pthread_cond_t queue_monitor;
bool active;
};
void destroy_WorkerContext( struct WorkerContext* context ) {
struct StringNode* node = context->queue_head;
while( node != NULL ) {
destroy_StringNode( node );
node = node->next;
}
pthread_mutex_destroy( &context->queue_lock );
pthread_cond_destroy( &context->queue_monitor );
free( context );
}
struct WorkerContext* init_WorkerContext() {
struct WorkerContext* retval = malloc( sizeof( struct WorkerContext ) );
retval->queue_head = NULL;
retval->queue_tail = NULL;
const int mutex_status = pthread_mutex_init( &retval->queue_lock, NULL );
if( mutex_status == EINVAL ) {
printf( "invalid mutex attr\n");
destroy_WorkerContext( retval );
return NULL;
} else if( mutex_status == ENOMEM ) {
printf( "no memory to create mutex\n" );
destroy_WorkerContext( retval );
return NULL;
}
const int cond_status = pthread_cond_init( &retval->queue_monitor, NULL );
if( cond_status == EINVAL ) {
printf( "invalid cond attr\n" );
destroy_WorkerContext( retval );
return NULL;
} else if( cond_status == ENOMEM ) {
printf( "no memory to create condition\n" );
destroy_WorkerContext( retval );
return NULL;
} else if( cond_status == EAGAIN ) {
printf( "temporary lack of condition resources\n" );
destroy_WorkerContext( retval );
return NULL;
}
retval->active = true;
return retval;
}
int get_tid() {
return ( int )( pthread_self() % 10l );
}
void* process_queue( void* argument ) {
struct WorkerContext* context = ( struct WorkerContext* )argument;
while( true ) {
printf( "T%d: locking the queue\n", get_tid() );
const int lock_status = pthread_mutex_lock( &context->queue_lock );
if( lock_status == EINVAL ) {
printf( "T%d: invalid mutex, bailing\n", get_tid() );
return NULL;
} else if( lock_status == EDEADLK ) {
printf( "T%d: deadlock detected, bailing\n", get_tid() );
return NULL;
}
struct StringNode* node = context->queue_head;
if( node == NULL ) {
if( !context->active ) {
printf( "T%d: queue is empty and context inactive, exiting\n", get_tid() );
pthread_mutex_unlock( &context->queue_lock );
return NULL;
}
printf( "T%d: queue is empty, waiting...\n", get_tid() );
pthread_cond_wait( &context->queue_monitor, &context->queue_lock );
node = context->queue_head;
if( node == NULL ) {
printf( "T%d: queue is still empty after waking\n", get_tid() );
pthread_mutex_unlock( &context->queue_lock );
if( !context->active ) {
printf( "T%d: context was deactivated while waiting, exiting\n", get_tid() );
return NULL;
}
continue;
}
}
context->queue_head = node->next;
pthread_mutex_unlock( &context->queue_lock );
// process the record
printf( "T%d: processing %s\n", get_tid(), node->string );
sleep( 1 );
destroy_StringNode( node );
}
}
int main( const int argc, const char* argv[] ) {
struct WorkerContext* context = init_WorkerContext();
const int concurrency = 2;
pthread_t thread_pool[ concurrency ];
for( int i = concurrency; --i>= 0; ) {
pthread_create( &thread_pool[ i ],
NULL,
process_queue,
( void* )context );
}
printf( "main: threads created\n" );
const int length = 6;
char* input[ length ];
input[ 0 ] = "foo";
input[ 1 ] = "bar";
input[ 2 ] = "baz";
input[ 3 ] = "alice";
input[ 4 ] = "bob";
input[ 5 ] = "mallory";
for( int i = 0; i < length; i++ ) {
pthread_mutex_lock( &context->queue_lock );
struct StringNode* node = init_StringNode( input[ i ] );
printf( "main: queuing %s\n", node->string );
if( context->queue_head == NULL ) {
context->queue_head = node;
} else {
context->queue_tail->next = node;
}
context->queue_tail = node;
// wake up one worker
printf( "main: waking up one worker\n" );
pthread_cond_signal( &context->queue_monitor );
pthread_mutex_unlock( &context->queue_lock );
}
context->active = false;
printf( "main: waiting for the workers to complete\n" );
for( int i = concurrency; --i >= 0; ) {
pthread_join( thread_pool[ i ], NULL );
}
printf( "-- main: exiting\n");
}
main: threads created
main: queuing foo
main: waking up one worker
T2: locking the queue
main: queuing bar
main: waking up one worker
T6: locking the queue
T6: processing foo
main: queuing baz
main: waking up one worker
main: queuing alice
main: waking up one worker
main: queuing bob
main: waking up one worker
main: queuing mallory
main: waking up one worker
main: waiting for the workers to complete
T2: processing bar
T6: locking the queue
T6: processing baz
T2: locking the queue
T2: processing alice
T6: locking the queue
T6: processing bob
T2: locking the queue
T2: processing mallory
T6: locking the queue
T6: queue is empty and context inactive, exiting
T2: locking the queue
T2: queue is empty and context inactive, exiting
-- main: exiting
Linux 615e3740dda0 4.9.125-linuxkit #1 SMP Fri Sep 7 08:20:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
Compiled with:
gcc-8 -g -Wno-unused-result -Wreturn-type -Wmain -Werror=return-type -Werror=main -pipe -O2 -std=c11 mutex.c -o output/mutex -lpthread -lm
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment