Skip to content

Instantly share code, notes, and snippets.

@sichacvah
Created January 31, 2025 17:39
Show Gist options
  • Select an option

  • Save sichacvah/5999080998c2c10899f914c2cfd24f15 to your computer and use it in GitHub Desktop.

Select an option

Save sichacvah/5999080998c2c10899f914c2cfd24f15 to your computer and use it in GitHub Desktop.
queue 1 producer and multiple consumers
typedef void* sema_handle;
typedef void (*queueentry_callback)(void*);
typedef struct queueentry queueentry;
struct queueentry {
queueentry_callback Callback;
void* userdata;
};
/*
* queue 1 producer and multiple consumers
* purporse of queue is offload of http requests and other `heavy` tasks
*
* TODO(sichirc): need second queue with 1 consumer (main thread) and multiple producers
* when for example background thread finish task, it put's result to that queue
* and main thread will process it, seems like we need only to swap reads with writes to access that?
*/
typedef struct queue queue;
struct queue {
u32 MaxEntryCount;
u32 Read;
u32 Write;
sema_handle SemaphoreHandle;
slice Entries;
};
#define ELOWMEMORY 2
#define is_queue_empty(q) (q->Read == q->Write)
void InitEntryQueue(slice mem, u32 maxentrycount, sema_handle sem, qeueue* q, error* err) {
if (sizeof(queueentry) * maxentrycount > slice.cap) {
err->message = UnsafeCString("backed memory slice too small");
err->tag = ELOWMEMORY;
return;
}
q->MaxEntryCount = maxentrycount;
q->Entries = mem;
q->SemaphoreHandle = sem;
q->Read = 0;
q->Write = 0;
}
bool
AddQueueEntry(queue* q, queueentry_callback cb, void* userdata)
{
u32 read;
u32 write;
__atomic_load(&q->Write, &write, __ATOMIC_RELAXED);
__atomic_load(&q->Read, &read, __ATOMIC_ACQUIRE);
assert(write != read);
queueentry* Entry = q->Entries + i;
Entry->Callback = cb;
Entry->Data = userdata;
i++;
__atomic_store(&q->Write, &i, __ATOMIC_RELEASE);
return true;
}
bool
PollQueueEntry(queue* q, queueentry* entry) {
u32 read;
u32 write;
u32 nextread;
// will correct in compare exchange
__atomic_load(&q->Read, &read, __ATOMIC_RELAXED);
__atomic_load(&q->Write, &write, __ATOMIC_ACQUIRE);
if (read != write) {
nextread = (read + 1) % q->MaxEntryCount;
if (__atomic_compare_exchange(&q->Read, &read, &nextread, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
entry = q->Entries.base + read;
return true
}
}
return false;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment