Created
March 27, 2026 14:45
-
-
Save grahamking/c22f9d5c220b9d40c9a4a971caea9704 to your computer and use it in GitHub Desktop.
Rust Linux no_std, lock free queue / channel. Well tested. Originally part of https://github.com/grahamking/ort
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! MIT License | |
| //! Copyright (c) 2025 Graham King | |
| //! | |
| //! A queue, multi-producer and multi-consumer, without the standard library. Is Send and Sync. | |
| //! Linux x86_64 only. | |
| //! | |
| //! Replaces `std::sync::channel` and friends. | |
| //! | |
| //! Built around a circular buffer. The consumers must never fall behind by more than the buffer | |
| //! length, otherwise they miss items. | |
| //! | |
| //! If a producer tries to get an item but there aren't any yet, it is parked on a `futex`. | |
| //! | |
| //! The modulus operation to access the circular buffer is done right at the end, meaning the | |
| //! consumer and producer positions do not wrap. This makes the math a lot simpler, but means it | |
| //! won't work past u32::MAX items. That's fine for `ort`. | |
| //! | |
| //! A Queue is always wrapped in an Arc - `new` returns `Arc<Queue>`, so it is freely cloneable. | |
| //! The consumers also clone a copy to keep internally. | |
| //! | |
| //! When the queue will not be sent any more items close it, which unblocks any waiting producers. | |
| //! | |
| //! Usage: | |
| //! | |
| //! // A buffer with space for 32 items | |
| //! let producer_1 = Queue::<32>::new(); | |
| //! let producer_2 = producer_1.clone(); | |
| //! | |
| //! // Two consumers. It doesn't matter which producer. | |
| //! // The consumer will start reading from the current producer position, so if you create it | |
| //! after adding to the queue, those items will not be read. | |
| //! let consumer1 = producer_1.consumer(); | |
| //! let consumer2 = producer_1.consumer(); | |
| //! | |
| //! // Add an item | |
| //! producer_1.add(item); | |
| //! // Receive it | |
| //! let x = consumer_1.get_next().unwrap(); | |
| //! let y = consumer_2.get_next().unwrap(); | |
| //! | |
| //! // No more items. Doesn't matter which producer. | |
| //! producer_1.close(); | |
| //! | |
| //! assert!(consumer1.get_next().is_none()); | |
| use core::ffi::c_int; | |
| use core::ptr::null; | |
| use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering}; | |
| extern crate alloc; | |
| use alloc::sync::Arc; | |
| use alloc::vec::Vec; | |
| const SYS_FUTEX: c_long = 202; | |
| const FUTEX_WAIT: c_int = 0; | |
| const FUTEX_WAKE: c_int = 1; | |
| pub struct Queue<T: Clone + Default, const N: usize> { | |
| data: [T; N], | |
| // The next empty position | |
| insert_pos: AtomicU32, | |
| // The read_end is one past the last visible item, it's the full stop for reads. | |
| read_end: AtomicU32, | |
| wait: AtomicI32, | |
| is_closed: AtomicBool, | |
| } | |
| pub struct Consumer<T: Clone + Default, const N: usize> { | |
| queue: Arc<Queue<T, N>>, | |
| current: usize, | |
| } | |
| impl<T: Clone + Default, const N: usize> Consumer<T, N> { | |
| pub fn get_next(&mut self) -> Option<T> { | |
| let item = self.queue.get(self.current); | |
| self.current += 1; | |
| item | |
| } | |
| } | |
| impl<T: Clone + Default, const N: usize> Queue<T, N> { | |
| pub fn new() -> Arc<Self> { | |
| let data: [T; N] = unsafe { | |
| (0..N) | |
| .map(|_| T::default()) | |
| .collect::<Vec<_>>() | |
| .try_into() | |
| .unwrap_unchecked() | |
| }; | |
| Arc::new(Queue { | |
| data, | |
| insert_pos: AtomicU32::new(0), | |
| read_end: AtomicU32::new(0), | |
| wait: AtomicI32::new(0), | |
| is_closed: AtomicBool::new(false), | |
| }) | |
| } | |
| pub fn last(&self) -> usize { | |
| self.read_end.load(Ordering::Relaxed) as usize | |
| } | |
| pub fn consumer(self: &Arc<Self>) -> Consumer<T, N> { | |
| Consumer { | |
| queue: Arc::clone(self), | |
| current: self.read_end.load(Ordering::Relaxed) as usize, | |
| } | |
| } | |
| // Why the two phase commit? | |
| // We move the insert_pos indicator forward before inserting the item. | |
| // That "reserves" the slot for us, no other thread will write to it. | |
| // But it isn't written yet, so we shouldn't read from it. | |
| pub fn add(&self, item: T) { | |
| let insert_at = self.insert_pos.fetch_add(1, Ordering::Relaxed); | |
| unsafe { | |
| let ptr = self.data.as_ptr().add(insert_at as usize % N) as *mut T; | |
| *ptr = item; | |
| } | |
| // If the current read end is at our position, we can commit our item by | |
| // moving the read end forward one, which exposes this item. | |
| // If not, there must be other threads before us that haven't commited, so | |
| // wait a bit. | |
| loop { | |
| let new_commit_pos = self.read_end.compare_exchange( | |
| insert_at, | |
| insert_at + 1, | |
| Ordering::Relaxed, | |
| Ordering::Relaxed, | |
| ); | |
| if new_commit_pos == Ok(insert_at) { | |
| // Success, our item is visible | |
| break; | |
| } | |
| // Othewise wait for other items to commit and retry | |
| core::hint::spin_loop(); | |
| } | |
| self.wake_threads(); | |
| } | |
| pub fn get(&self, idx: usize) -> Option<T> { | |
| while idx == self.last() { | |
| if self.is_closed.load(Ordering::Relaxed) { | |
| return None; | |
| } | |
| // no values, park until there are | |
| self.park_thread(); | |
| } | |
| Some(self.data[idx % N].clone()) | |
| } | |
| pub fn close(&self) { | |
| self.is_closed.store(true, Ordering::Relaxed); | |
| } | |
| /* | |
| pub fn dump(&self) { | |
| for i in 0..N { | |
| println!("{i}: {:?}", self.data[i]); | |
| } | |
| } | |
| */ | |
| fn park_thread(&self) { | |
| unsafe { | |
| libc::syscall( | |
| libc::SYS_FUTEX, | |
| self.wait.as_ptr() as *const i32, | |
| libc::FUTEX_WAIT, | |
| 0, | |
| null::<c_int>(), | |
| null::<c_int>(), | |
| 0, | |
| ); | |
| } | |
| } | |
| fn wake_threads(&self) { | |
| unsafe { | |
| libc::syscall( | |
| libc::SYS_FUTEX, | |
| self.wait.as_ptr() as *const i32, | |
| libc::FUTEX_WAKE, | |
| 999, // wake all the waiters, could be i32:MAX. | |
| null::<c_int>(), | |
| null::<c_int>(), | |
| 0, | |
| ); | |
| } | |
| } | |
| } | |
| impl<T: Clone + Default, const N: usize> Drop for Queue<T, N> { | |
| fn drop(&mut self) { | |
| self.close(); | |
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::*; | |
| const NUM_ITEMS: usize = 40; | |
| #[derive(Default, Clone)] | |
| pub struct Item { | |
| pub val: usize, | |
| #[allow(dead_code)] | |
| pub s: &'static str, | |
| } | |
| impl Item { | |
| pub fn new(val: usize, s: &'static str) -> Self { | |
| Item { val, s } | |
| } | |
| } | |
| #[test] | |
| fn test_queue() { | |
| let q = Queue::<_, NUM_ITEMS>::new(); | |
| let mut c1 = q.consumer(); | |
| let mut c2 = q.consumer(); | |
| // Ideally we would do all this in threads, but no_std | |
| // Producer | |
| for i in 0..NUM_ITEMS { | |
| let i1 = Item::new(i, "x"); | |
| q.add(i1); | |
| } | |
| q.close(); | |
| // Consumer 1 | |
| for i in 0..10 { | |
| let got_c1 = c1.get_next().unwrap(); | |
| assert_eq!(i, got_c1.val); | |
| } | |
| // Consumer 1 | |
| for i in 0..15 { | |
| let got_c2 = c2.get_next().unwrap(); | |
| assert_eq!(i, got_c2.val); | |
| } | |
| } | |
| } | |
| #[link(name = "c", kind = "dylib")] | |
| unsafe extern "C" { | |
| pub fn syscall(num: c_long, ...) -> c_long; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment