Skip to content

Instantly share code, notes, and snippets.

@sichacvah
Last active February 8, 2025 12:08
Show Gist options
  • Select an option

  • Save sichacvah/356dc9ba34cec8993a2bfe97f685f027 to your computer and use it in GitHub Desktop.

Select an option

Save sichacvah/356dc9ba34cec8993a2bfe97f685f027 to your computer and use it in GitHub Desktop.
Queue Part 2
#include <stdbool.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <stdlib.h>
int msleep(long msec)
{
struct timespec ts;
int res;
if (msec < 0)
{
errno = EINVAL;
return 0;
}
ts.tv_sec = msec / 1000;
ts.tv_nsec = (msec % 1000) * 1000000;
do {
res = nanosleep(&ts, &ts);
} while (res && errno == EINTR);
return res;
}
#define DMB __asm__ __volatile__("DMB ISH")
#define nil (void *)0
typedef char i8;
typedef unsigned char u8;
typedef unsigned char byte;
typedef short i16;
typedef unsigned short u16;
typedef int i32;
typedef unsigned int u32;
typedef long i64;
typedef unsigned long u64;
typedef float f32;
typedef double f64;
typedef unsigned long uintptr;
typedef void (*taskcb)(void*);
// ready to write, ready to read
typedef struct task task;
struct task {
taskcb Callback;
void* Userdata;
u64 Counter; // even - we can store, odd - read in proccess cant write
};
typedef struct queue queue;
struct queue {
volatile u64 MaxEntryCount;
volatile u64 Read;
volatile u64 Write;
task *Tasks;
};
void
InitEntryQueue(task* entries, u64 maxentrycount, queue* q)
{
q->MaxEntryCount = maxentrycount;
q->Tasks = entries;
q->Read = 0;
q->Write = 1;
}
bool
AddQueueEntry(queue* q, taskcb cb, void* userdata)
{
u64 nextwrite;
u64 counter;
task* t;
u64 write;
write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED);
nextwrite = (write + 1) % q->MaxEntryCount;
t = q->Tasks + write;
counter = __atomic_load_n(&t->Counter, __ATOMIC_ACQUIRE);
if ((counter & 1) == 0) {
t->Callback = cb;
t->Userdata = userdata;
__atomic_store(&q->Write, &nextwrite, __ATOMIC_RELAXED);
__atomic_store_n(&t->Counter, counter + 1, __ATOMIC_RELEASE);
return true;
}
return false;
}
bool
PollQueueEntry(queue* q, u64 read, task* t)
{
u64 counter;
task* cur;
__atomic_thread_fence(__ATOMIC_SEQ_CST);
cur = q->Tasks + read;
__atomic_load(&cur->Counter, &counter, __ATOMIC_SEQ_CST);
if ((counter & 1) == 0) {
return false;
}
t->Callback = cur->Callback;
t->Userdata = cur->Userdata;
return __atomic_compare_exchange_n(
&cur->Counter,
&counter,
counter + 1,
true,
__ATOMIC_SEQ_CST,
__ATOMIC_RELAXED
);
}
bool
_AddQueueEntry(queue* q, taskcb cb, void* userdata)
{
u64 nextwrite;
u64 counter;
task* t;
u64 write;
u64 read;
write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED);
read = __atomic_load_n(&q->Read, __ATOMIC_RELAXED);
nextwrite = (write + 1) % q->MaxEntryCount;
if (nextwrite == read) {
return false;
}
t = q->Tasks + write;
counter = __atomic_load_n(&t->Counter, __ATOMIC_ACQUIRE);
if ((counter & 1) == 0) {
t->Callback = cb;
t->Userdata = userdata;
__atomic_store(&q->Write, &nextwrite, __ATOMIC_RELAXED);
__atomic_store_n(&t->Counter, counter + 1, __ATOMIC_RELEASE);
return true;
}
return false;
}
bool
_PollQueueEntry(queue* q, u64 r, task* t) {
u64 nextread;
u64 counter;
task* cur;
u64 read = __atomic_load_n(&q->Read, __ATOMIC_RELAXED);
u64 write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED);
nextread = (read + 1) % q->MaxEntryCount;
if (nextread == write) {
return false;
}
if (__atomic_compare_exchange(&q->Read, &read, &nextread, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
cur = q->Tasks + nextread;
__atomic_load(&cur->Counter, &counter, __ATOMIC_ACQUIRE);
while ((counter & 1) == 0) {
__atomic_load(&cur->Counter, &counter, __ATOMIC_ACQUIRE); // wait until store
}
t->Callback = cur->Callback;
t->Userdata = cur->Userdata;
counter++;
__atomic_store(&cur->Counter, &counter, __ATOMIC_RELEASE);
return true;
}
return false;
}
void
PrintQueue(queue* q)
{
assert(q != nil);
printf("Queue:\n\tWrite = %lu\n\tRead = %lu\n", q->Write, q->Read);
}
void
defaultcb(void* userdata)
{
//msleep(3);
long l = (uintptr)userdata;
printf("DATA = %lu\n", l);
}
#define QSIZE 64
typedef struct threadinfo threadinfo;
struct threadinfo {
pthread_t id;
int num;
queue* Queue;
};
void*
ThreadProc(void *arg)
{
threadinfo *info = (threadinfo *)arg;
queue* q = info->Queue;
task t;
u64 i = 0;
while (true) {
if (PollQueueEntry(q, i, &t)) {
taskcb cb = t.Callback;
assert(cb);
assert(t.Userdata);
cb(t.Userdata);
t.Callback = nil;
t.Userdata = nil;
} else {
}
i = (i + 1) % q->MaxEntryCount;
}
return nil;
}
#define THREADS_COUNT 256//(1024)
task entries[QSIZE] = {0};
int test_multiple_consumers() {
pthread_attr_t attr;
threadinfo threads[THREADS_COUNT];
queue q;
InitEntryQueue(entries, QSIZE, &q);
if (pthread_attr_init(&attr)) {
printf("Could not init pthread %s\n", strerror(errno));
return 2;
}
for (int j = 0; j < THREADS_COUNT; j++) {
threads[j].num = j;
threads[j].Queue = &q;
pthread_create(&threads[j].id, &attr, &ThreadProc, &threads[j]);
}
u32 k;
for (k = 0; k < QSIZE; k++) {
(q.Tasks + k)->Counter = 0;
}
u64 i = 1;
for (i = 1; i < QSIZE * 2; i += 1) {
AddQueueEntry(&q, defaultcb, (void*)i);
}
while (true) {
if (AddQueueEntry(&q, defaultcb, (void*)i)) {
i++;
}
}
return 0;
}
int main() {
return test_multiple_consumers();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment