Created
January 31, 2025 17:39
-
-
Save sichacvah/5999080998c2c10899f914c2cfd24f15 to your computer and use it in GitHub Desktop.
queue 1 producer and multiple consumers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| typedef void* sema_handle; | |
| typedef void (*queueentry_callback)(void*); | |
| typedef struct queueentry queueentry; | |
| struct queueentry { | |
| queueentry_callback Callback; | |
| void* userdata; | |
| }; | |
| /* | |
| * queue 1 producer and multiple consumers | |
| * purporse of queue is offload of http requests and other `heavy` tasks | |
| * | |
| * TODO(sichirc): need second queue with 1 consumer (main thread) and multiple producers | |
| * when for example background thread finish task, it put's result to that queue | |
| * and main thread will process it, seems like we need only to swap reads with writes to access that? | |
| */ | |
| typedef struct queue queue; | |
| struct queue { | |
| u32 MaxEntryCount; | |
| u32 Read; | |
| u32 Write; | |
| sema_handle SemaphoreHandle; | |
| slice Entries; | |
| }; | |
| #define ELOWMEMORY 2 | |
| #define is_queue_empty(q) (q->Read == q->Write) | |
| void InitEntryQueue(slice mem, u32 maxentrycount, sema_handle sem, qeueue* q, error* err) { | |
| if (sizeof(queueentry) * maxentrycount > slice.cap) { | |
| err->message = UnsafeCString("backed memory slice too small"); | |
| err->tag = ELOWMEMORY; | |
| return; | |
| } | |
| q->MaxEntryCount = maxentrycount; | |
| q->Entries = mem; | |
| q->SemaphoreHandle = sem; | |
| q->Read = 0; | |
| q->Write = 0; | |
| } | |
| bool | |
| AddQueueEntry(queue* q, queueentry_callback cb, void* userdata) | |
| { | |
| u32 read; | |
| u32 write; | |
| __atomic_load(&q->Write, &write, __ATOMIC_RELAXED); | |
| __atomic_load(&q->Read, &read, __ATOMIC_ACQUIRE); | |
| assert(write != read); | |
| queueentry* Entry = q->Entries + i; | |
| Entry->Callback = cb; | |
| Entry->Data = userdata; | |
| i++; | |
| __atomic_store(&q->Write, &i, __ATOMIC_RELEASE); | |
| return true; | |
| } | |
| bool | |
| PollQueueEntry(queue* q, queueentry* entry) { | |
| u32 read; | |
| u32 write; | |
| u32 nextread; | |
| // will correct in compare exchange | |
| __atomic_load(&q->Read, &read, __ATOMIC_RELAXED); | |
| __atomic_load(&q->Write, &write, __ATOMIC_ACQUIRE); | |
| if (read != write) { | |
| nextread = (read + 1) % q->MaxEntryCount; | |
| if (__atomic_compare_exchange(&q->Read, &read, &nextread, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { | |
| entry = q->Entries.base + read; | |
| return true | |
| } | |
| } | |
| return false; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment