heapless/
mpmc.rs

1//! A fixed capacity Multiple-Producer Multiple-Consumer (MPMC) lock-free queue
2//!
3//! NOTE: This module requires atomic CAS operations. On targets where they're not natively available,
4//! they are emulated by the [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
5//!
6//! # Example
7//!
8//! This queue can be constructed in "const context". Placing it in a `static` variable lets *all*
9//! contexts (interrupts / threads / `main`) safely enqueue and dequeue items from it.
10//!
11//! ``` ignore
12//! #![no_main]
13//! #![no_std]
14//!
15//! use panic_semihosting as _;
16//!
17//! use cortex_m::{asm, peripheral::syst::SystClkSource};
18//! use cortex_m_rt::{entry, exception};
19//! use cortex_m_semihosting::hprintln;
20//! use heapless::mpmc::Q2;
21//!
22//! static Q: Q2<u8> = Q2::new();
23//!
24//! #[entry]
25//! fn main() -> ! {
26//!     if let Some(p) = cortex_m::Peripherals::take() {
27//!         let mut syst = p.SYST;
28//!
29//!         // configures the system timer to trigger a SysTick exception every second
30//!         syst.set_clock_source(SystClkSource::Core);
31//!         syst.set_reload(12_000_000);
32//!         syst.enable_counter();
33//!         syst.enable_interrupt();
34//!     }
35//!
36//!     loop {
37//!         if let Some(x) = Q.dequeue() {
38//!             hprintln!("{}", x).ok();
39//!         } else {
40//!             asm::wfi();
41//!         }
42//!     }
43//! }
44//!
45//! #[exception]
46//! fn SysTick() {
47//!     static mut COUNT: u8 = 0;
48//!
49//!     Q.enqueue(*COUNT).ok();
50//!     *COUNT += 1;
51//! }
52//! ```
53//!
54//! # Benchmark
55//!
56//! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles
57//!
58//! N| `Q8::<u8>::enqueue().ok()` (`z`) | `Q8::<u8>::dequeue()` (`z`) |
59//! -|----------------------------------|-----------------------------|
60//! 0|34                                |35                           |
61//! 1|52                                |53                           |
62//! 2|69                                |71                           |
63//!
64//! - `N` denotes the number of *interruptions*. On Cortex-M, an interruption consists of an
65//!   interrupt handler preempting the would-be atomic section of the `enqueue` / `dequeue`
66//!   operation. Note that it does *not* matter if the higher priority handler uses the queue or
67//!   not.
68//! - All execution times are in clock cycles. 1 clock cycle = 125 ns.
69//! - Execution time is *dependent* of `mem::size_of::<T>()`. Both operations include one
70//! `memcpy(T)` in their successful path.
71//! - The optimization level is indicated in parentheses.
72//! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue`
73//! and `Ok` is returned by `enqueue`).
74//!
75//! # Portability
76//!
77//! This module requires CAS atomic instructions which are not available on all architectures
78//! (e.g.  ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be
79//! emulated however with [`portable-atomic`](https://crates.io/crates/portable-atomic), which is
80//! enabled with the `cas` feature and is enabled by default for `thumbv6m-none-eabi` and `riscv32`
81//! targets.
82//!
83//! # References
84//!
85//! This is an implementation of Dmitry Vyukov's ["Bounded MPMC queue"][0] minus the cache padding.
86//!
87//! [0]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
88
89use core::{cell::UnsafeCell, mem::MaybeUninit};
90
91#[cfg(not(feature = "portable-atomic"))]
92use core::sync::atomic;
93#[cfg(feature = "portable-atomic")]
94use portable_atomic as atomic;
95
96use atomic::Ordering;
97
98#[cfg(feature = "mpmc_large")]
99type AtomicTargetSize = atomic::AtomicUsize;
100#[cfg(not(feature = "mpmc_large"))]
101type AtomicTargetSize = atomic::AtomicU8;
102
103#[cfg(feature = "mpmc_large")]
104type IntSize = usize;
105#[cfg(not(feature = "mpmc_large"))]
106type IntSize = u8;
107
108/// MPMC queue with a capability for 2 elements.
109pub type Q2<T> = MpMcQueue<T, 2>;
110
111/// MPMC queue with a capability for 4 elements.
112pub type Q4<T> = MpMcQueue<T, 4>;
113
114/// MPMC queue with a capability for 8 elements.
115pub type Q8<T> = MpMcQueue<T, 8>;
116
117/// MPMC queue with a capability for 16 elements.
118pub type Q16<T> = MpMcQueue<T, 16>;
119
120/// MPMC queue with a capability for 32 elements.
121pub type Q32<T> = MpMcQueue<T, 32>;
122
123/// MPMC queue with a capability for 64 elements.
124pub type Q64<T> = MpMcQueue<T, 64>;
125
126/// MPMC queue with a capacity for N elements
127/// N must be a power of 2
128/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
129pub struct MpMcQueue<T, const N: usize> {
130    buffer: UnsafeCell<[Cell<T>; N]>,
131    dequeue_pos: AtomicTargetSize,
132    enqueue_pos: AtomicTargetSize,
133}
134
135impl<T, const N: usize> MpMcQueue<T, N> {
136    const MASK: IntSize = (N - 1) as IntSize;
137    const EMPTY_CELL: Cell<T> = Cell::new(0);
138
139    const ASSERT: [(); 1] = [()];
140
141    /// Creates an empty queue
142    pub const fn new() -> Self {
143        // Const assert
144        crate::sealed::greater_than_1::<N>();
145        crate::sealed::power_of_two::<N>();
146
147        // Const assert on size.
148        Self::ASSERT[!(N < (IntSize::MAX as usize)) as usize];
149
150        let mut cell_count = 0;
151
152        let mut result_cells: [Cell<T>; N] = [Self::EMPTY_CELL; N];
153        while cell_count != N {
154            result_cells[cell_count] = Cell::new(cell_count);
155            cell_count += 1;
156        }
157
158        Self {
159            buffer: UnsafeCell::new(result_cells),
160            dequeue_pos: AtomicTargetSize::new(0),
161            enqueue_pos: AtomicTargetSize::new(0),
162        }
163    }
164
165    /// Returns the item in the front of the queue, or `None` if the queue is empty
166    pub fn dequeue(&self) -> Option<T> {
167        unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
168    }
169
170    /// Adds an `item` to the end of the queue
171    ///
172    /// Returns back the `item` if the queue is full
173    pub fn enqueue(&self, item: T) -> Result<(), T> {
174        unsafe {
175            enqueue(
176                self.buffer.get() as *mut _,
177                &self.enqueue_pos,
178                Self::MASK,
179                item,
180            )
181        }
182    }
183}
184
185impl<T, const N: usize> Default for MpMcQueue<T, N> {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
192
193struct Cell<T> {
194    data: MaybeUninit<T>,
195    sequence: AtomicTargetSize,
196}
197
198impl<T> Cell<T> {
199    const fn new(seq: usize) -> Self {
200        Self {
201            data: MaybeUninit::uninit(),
202            sequence: AtomicTargetSize::new(seq as IntSize),
203        }
204    }
205}
206
207unsafe fn dequeue<T>(
208    buffer: *mut Cell<T>,
209    dequeue_pos: &AtomicTargetSize,
210    mask: IntSize,
211) -> Option<T> {
212    let mut pos = dequeue_pos.load(Ordering::Relaxed);
213
214    let mut cell;
215    loop {
216        cell = buffer.add(usize::from(pos & mask));
217        let seq = (*cell).sequence.load(Ordering::Acquire);
218        let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8);
219
220        if dif == 0 {
221            if dequeue_pos
222                .compare_exchange_weak(
223                    pos,
224                    pos.wrapping_add(1),
225                    Ordering::Relaxed,
226                    Ordering::Relaxed,
227                )
228                .is_ok()
229            {
230                break;
231            }
232        } else if dif < 0 {
233            return None;
234        } else {
235            pos = dequeue_pos.load(Ordering::Relaxed);
236        }
237    }
238
239    let data = (*cell).data.as_ptr().read();
240    (*cell)
241        .sequence
242        .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
243    Some(data)
244}
245
246unsafe fn enqueue<T>(
247    buffer: *mut Cell<T>,
248    enqueue_pos: &AtomicTargetSize,
249    mask: IntSize,
250    item: T,
251) -> Result<(), T> {
252    let mut pos = enqueue_pos.load(Ordering::Relaxed);
253
254    let mut cell;
255    loop {
256        cell = buffer.add(usize::from(pos & mask));
257        let seq = (*cell).sequence.load(Ordering::Acquire);
258        let dif = (seq as i8).wrapping_sub(pos as i8);
259
260        if dif == 0 {
261            if enqueue_pos
262                .compare_exchange_weak(
263                    pos,
264                    pos.wrapping_add(1),
265                    Ordering::Relaxed,
266                    Ordering::Relaxed,
267                )
268                .is_ok()
269            {
270                break;
271            }
272        } else if dif < 0 {
273            return Err(item);
274        } else {
275            pos = enqueue_pos.load(Ordering::Relaxed);
276        }
277    }
278
279    (*cell).data.as_mut_ptr().write(item);
280    (*cell)
281        .sequence
282        .store(pos.wrapping_add(1), Ordering::Release);
283    Ok(())
284}
285
286#[cfg(test)]
287mod tests {
288    use super::Q2;
289
290    #[test]
291    fn sanity() {
292        let q = Q2::new();
293        q.enqueue(0).unwrap();
294        q.enqueue(1).unwrap();
295        assert!(q.enqueue(2).is_err());
296
297        assert_eq!(q.dequeue(), Some(0));
298        assert_eq!(q.dequeue(), Some(1));
299        assert_eq!(q.dequeue(), None);
300    }
301
302    #[test]
303    fn drain_at_pos255() {
304        let q = Q2::new();
305        for _ in 0..255 {
306            assert!(q.enqueue(0).is_ok());
307            assert_eq!(q.dequeue(), Some(0));
308        }
309        // this should not block forever
310        assert_eq!(q.dequeue(), None);
311    }
312
313    #[test]
314    fn full_at_wrapped_pos0() {
315        let q = Q2::new();
316        for _ in 0..254 {
317            assert!(q.enqueue(0).is_ok());
318            assert_eq!(q.dequeue(), Some(0));
319        }
320        assert!(q.enqueue(0).is_ok());
321        assert!(q.enqueue(0).is_ok());
322        // this should not block forever
323        assert!(q.enqueue(0).is_err());
324    }
325}