Last active
February 8, 2025 12:08
-
-
Save sichacvah/356dc9ba34cec8993a2bfe97f685f027 to your computer and use it in GitHub Desktop.
Queue Part 2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <stdbool.h> | |
| #include <string.h> | |
| #include <stdio.h> | |
| #include <assert.h> | |
| #include <pthread.h> | |
| #include <unistd.h> | |
| #include <errno.h> | |
| #include <time.h> | |
| #include <stdlib.h> | |
| int msleep(long msec) | |
| { | |
| struct timespec ts; | |
| int res; | |
| if (msec < 0) | |
| { | |
| errno = EINVAL; | |
| return 0; | |
| } | |
| ts.tv_sec = msec / 1000; | |
| ts.tv_nsec = (msec % 1000) * 1000000; | |
| do { | |
| res = nanosleep(&ts, &ts); | |
| } while (res && errno == EINTR); | |
| return res; | |
| } | |
| #define DMB __asm__ __volatile__("DMB ISH") | |
| #define nil (void *)0 | |
| typedef char i8; | |
| typedef unsigned char u8; | |
| typedef unsigned char byte; | |
| typedef short i16; | |
| typedef unsigned short u16; | |
| typedef int i32; | |
| typedef unsigned int u32; | |
| typedef long i64; | |
| typedef unsigned long u64; | |
| typedef float f32; | |
| typedef double f64; | |
| typedef unsigned long uintptr; | |
| typedef void (*taskcb)(void*); | |
| // ready to write, ready to read | |
| typedef struct task task; | |
| struct task { | |
| taskcb Callback; | |
| void* Userdata; | |
| u64 Counter; // even - we can store, odd - read in proccess cant write | |
| }; | |
| typedef struct queue queue; | |
| struct queue { | |
| volatile u64 MaxEntryCount; | |
| volatile u64 Read; | |
| volatile u64 Write; | |
| task *Tasks; | |
| }; | |
| void | |
| InitEntryQueue(task* entries, u64 maxentrycount, queue* q) | |
| { | |
| q->MaxEntryCount = maxentrycount; | |
| q->Tasks = entries; | |
| q->Read = 0; | |
| q->Write = 1; | |
| } | |
| bool | |
| AddQueueEntry(queue* q, taskcb cb, void* userdata) | |
| { | |
| u64 nextwrite; | |
| u64 counter; | |
| task* t; | |
| u64 write; | |
| write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED); | |
| nextwrite = (write + 1) % q->MaxEntryCount; | |
| t = q->Tasks + write; | |
| counter = __atomic_load_n(&t->Counter, __ATOMIC_ACQUIRE); | |
| if ((counter & 1) == 0) { | |
| t->Callback = cb; | |
| t->Userdata = userdata; | |
| __atomic_store(&q->Write, &nextwrite, __ATOMIC_RELAXED); | |
| __atomic_store_n(&t->Counter, counter + 1, __ATOMIC_RELEASE); | |
| return true; | |
| } | |
| return false; | |
| } | |
| bool | |
| PollQueueEntry(queue* q, u64 read, task* t) | |
| { | |
| u64 counter; | |
| task* cur; | |
| __atomic_thread_fence(__ATOMIC_SEQ_CST); | |
| cur = q->Tasks + read; | |
| __atomic_load(&cur->Counter, &counter, __ATOMIC_SEQ_CST); | |
| if ((counter & 1) == 0) { | |
| return false; | |
| } | |
| t->Callback = cur->Callback; | |
| t->Userdata = cur->Userdata; | |
| return __atomic_compare_exchange_n( | |
| &cur->Counter, | |
| &counter, | |
| counter + 1, | |
| true, | |
| __ATOMIC_SEQ_CST, | |
| __ATOMIC_RELAXED | |
| ); | |
| } | |
| bool | |
| _AddQueueEntry(queue* q, taskcb cb, void* userdata) | |
| { | |
| u64 nextwrite; | |
| u64 counter; | |
| task* t; | |
| u64 write; | |
| u64 read; | |
| write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED); | |
| read = __atomic_load_n(&q->Read, __ATOMIC_RELAXED); | |
| nextwrite = (write + 1) % q->MaxEntryCount; | |
| if (nextwrite == read) { | |
| return false; | |
| } | |
| t = q->Tasks + write; | |
| counter = __atomic_load_n(&t->Counter, __ATOMIC_ACQUIRE); | |
| if ((counter & 1) == 0) { | |
| t->Callback = cb; | |
| t->Userdata = userdata; | |
| __atomic_store(&q->Write, &nextwrite, __ATOMIC_RELAXED); | |
| __atomic_store_n(&t->Counter, counter + 1, __ATOMIC_RELEASE); | |
| return true; | |
| } | |
| return false; | |
| } | |
| bool | |
| _PollQueueEntry(queue* q, u64 r, task* t) { | |
| u64 nextread; | |
| u64 counter; | |
| task* cur; | |
| u64 read = __atomic_load_n(&q->Read, __ATOMIC_RELAXED); | |
| u64 write = __atomic_load_n(&q->Write, __ATOMIC_RELAXED); | |
| nextread = (read + 1) % q->MaxEntryCount; | |
| if (nextread == write) { | |
| return false; | |
| } | |
| if (__atomic_compare_exchange(&q->Read, &read, &nextread, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { | |
| cur = q->Tasks + nextread; | |
| __atomic_load(&cur->Counter, &counter, __ATOMIC_ACQUIRE); | |
| while ((counter & 1) == 0) { | |
| __atomic_load(&cur->Counter, &counter, __ATOMIC_ACQUIRE); // wait until store | |
| } | |
| t->Callback = cur->Callback; | |
| t->Userdata = cur->Userdata; | |
| counter++; | |
| __atomic_store(&cur->Counter, &counter, __ATOMIC_RELEASE); | |
| return true; | |
| } | |
| return false; | |
| } | |
| void | |
| PrintQueue(queue* q) | |
| { | |
| assert(q != nil); | |
| printf("Queue:\n\tWrite = %lu\n\tRead = %lu\n", q->Write, q->Read); | |
| } | |
| void | |
| defaultcb(void* userdata) | |
| { | |
| //msleep(3); | |
| long l = (uintptr)userdata; | |
| printf("DATA = %lu\n", l); | |
| } | |
| #define QSIZE 64 | |
| typedef struct threadinfo threadinfo; | |
| struct threadinfo { | |
| pthread_t id; | |
| int num; | |
| queue* Queue; | |
| }; | |
| void* | |
| ThreadProc(void *arg) | |
| { | |
| threadinfo *info = (threadinfo *)arg; | |
| queue* q = info->Queue; | |
| task t; | |
| u64 i = 0; | |
| while (true) { | |
| if (PollQueueEntry(q, i, &t)) { | |
| taskcb cb = t.Callback; | |
| assert(cb); | |
| assert(t.Userdata); | |
| cb(t.Userdata); | |
| t.Callback = nil; | |
| t.Userdata = nil; | |
| } else { | |
| } | |
| i = (i + 1) % q->MaxEntryCount; | |
| } | |
| return nil; | |
| } | |
| #define THREADS_COUNT 256//(1024) | |
| task entries[QSIZE] = {0}; | |
| int test_multiple_consumers() { | |
| pthread_attr_t attr; | |
| threadinfo threads[THREADS_COUNT]; | |
| queue q; | |
| InitEntryQueue(entries, QSIZE, &q); | |
| if (pthread_attr_init(&attr)) { | |
| printf("Could not init pthread %s\n", strerror(errno)); | |
| return 2; | |
| } | |
| for (int j = 0; j < THREADS_COUNT; j++) { | |
| threads[j].num = j; | |
| threads[j].Queue = &q; | |
| pthread_create(&threads[j].id, &attr, &ThreadProc, &threads[j]); | |
| } | |
| u32 k; | |
| for (k = 0; k < QSIZE; k++) { | |
| (q.Tasks + k)->Counter = 0; | |
| } | |
| u64 i = 1; | |
| for (i = 1; i < QSIZE * 2; i += 1) { | |
| AddQueueEntry(&q, defaultcb, (void*)i); | |
| } | |
| while (true) { | |
| if (AddQueueEntry(&q, defaultcb, (void*)i)) { | |
| i++; | |
| } | |
| } | |
| return 0; | |
| } | |
| int main() { | |
| return test_multiple_consumers(); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment