heapless/
mpmc.rs

1//! A fixed capacity multiple-producer, multiple-consumer (MPMC) lock-free queue.
2//!
3//! **Note:** This module requires atomic compare-and-swap (CAS) instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Example
8//!
9//! This queue can be constructed in `const` context. Placing it in a `static` variable lets *all*
10//! contexts (interrupts/threads/`main`) safely enqueue and dequeue items.
11//!
12//! ```
13//! use core::sync::atomic::{AtomicU8, Ordering};
14//!
15//! use heapless::mpmc::Queue;
16//!
17//! static Q: Queue<u8, 2> = Queue::new();
18//!
19//! fn main() {
20//!     // Configure systick interrupt.
21//!
22//!     loop {
23//!         if let Some(x) = Q.dequeue() {
24//!             println!("{}", x);
25//!         } else {
26//!             // Wait for interrupt.
27//!         }
28//! #       break
29//!     }
30//! }
31//!
32//! fn systick() {
33//!     static COUNT: AtomicU8 = AtomicU8::new(0);
34//!     let count = COUNT.fetch_add(1, Ordering::SeqCst);
35//!
36//! #   let _ =
37//!     Q.enqueue(count);
38//! }
39//! ```
40//!
41//! # Benchmark
42//!
43//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled with `-C opt-level=z`:
44//!
45//! | Method                      | Time | N  |
46//! |:----------------------------|-----:|---:|
47//! | `Queue::<u8, 8>::enqueue()` |   34 |  0 |
48//! | `Queue::<u8, 8>::enqueue()` |   52 |  1 |
49//! | `Queue::<u8, 8>::enqueue()` |   69 |  2 |
50//! | `Queue::<u8, 8>::dequeue()` |   35 |  0 |
51//! | `Queue::<u8, 8>::dequeue()` |   53 |  1 |
52//! | `Queue::<u8, 8>::dequeue()` |   71 |  2 |
53//!
54//! - N denotes the number of interruptions. On Cortex-M, an interruption consists of an
55//!   interrupt handler preempting the would-be atomic section of the `enqueue`/`dequeue`
56//!   operation. Note that it does *not* matter if the higher priority handler uses the queue or
57//!   not.
58//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
59//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
60//!   `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
61//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
62//!   and `enqueue` returning `Ok`.
63//!
64//! # References
65//!
66//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
67//! cache padding.
68//!
69//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
70
71use core::{cell::UnsafeCell, mem::MaybeUninit};
72
73#[cfg(not(feature = "portable-atomic"))]
74use core::sync::atomic;
75#[cfg(feature = "portable-atomic")]
76use portable_atomic as atomic;
77
78use atomic::Ordering;
79
80use crate::storage::{OwnedStorage, Storage, ViewStorage};
81
82#[cfg(feature = "mpmc_large")]
83type AtomicTargetSize = atomic::AtomicUsize;
84#[cfg(not(feature = "mpmc_large"))]
85type AtomicTargetSize = atomic::AtomicU8;
86
87#[cfg(feature = "mpmc_large")]
88type UintSize = usize;
89#[cfg(not(feature = "mpmc_large"))]
90type UintSize = u8;
91
92#[cfg(feature = "mpmc_large")]
93type IntSize = isize;
94#[cfg(not(feature = "mpmc_large"))]
95type IntSize = i8;
96
97/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
98///
99/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
100/// struct if you want to write code that's generic over both.
101pub struct QueueInner<T, S: Storage> {
102    dequeue_pos: AtomicTargetSize,
103    enqueue_pos: AtomicTargetSize,
104    buffer: UnsafeCell<S::Buffer<Cell<T>>>,
105}
106
107/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
108///
109/// <div class="warning">
110///
111/// `N` must be a power of 2.
112///
113/// </div>
114///
115/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
116pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
117
118/// A [`Queue`] with dynamic capacity.
119///
120/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
121pub type QueueView<T> = QueueInner<T, ViewStorage>;
122
123impl<T, const N: usize> Queue<T, N> {
124    /// Creates an empty queue.
125    pub const fn new() -> Self {
126        const {
127            assert!(N > 1);
128            assert!(N.is_power_of_two());
129            assert!(N < UintSize::MAX as usize);
130        }
131
132        let mut cell_count = 0;
133
134        let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
135        while cell_count != N {
136            result_cells[cell_count] = Cell::new(cell_count);
137            cell_count += 1;
138        }
139
140        Self {
141            buffer: UnsafeCell::new(result_cells),
142            dequeue_pos: AtomicTargetSize::new(0),
143            enqueue_pos: AtomicTargetSize::new(0),
144        }
145    }
146
147    /// Used in `Storage` implementation.
148    pub(crate) fn as_view_private(&self) -> &QueueView<T> {
149        self
150    }
151    /// Used in `Storage` implementation.
152    pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
153        self
154    }
155}
156
157impl<T, S: Storage> QueueInner<T, S> {
158    /// Returns the maximum number of elements the queue can hold.
159    #[inline]
160    pub fn capacity(&self) -> usize {
161        S::len(self.buffer.get())
162    }
163
164    /// Get a reference to the `Queue`, erasing the `N` const-generic.
165    ///
166    ///
167    /// ```rust
168    /// # use heapless::mpmc::{Queue, QueueView};
169    /// let queue: Queue<u8, 2> = Queue::new();
170    /// let view: &QueueView<u8> = queue.as_view();
171    /// ```
172    ///
173    /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
174    ///
175    /// ```rust
176    /// # use heapless::mpmc::{Queue, QueueView};
177    /// let queue: Queue<u8, 2> = Queue::new();
178    /// let view: &QueueView<u8> = &queue;
179    /// ```
180    #[inline]
181    pub fn as_view(&self) -> &QueueView<T> {
182        S::as_mpmc_view(self)
183    }
184
185    /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
186    ///
187    /// ```rust
188    /// # use heapless::mpmc::{Queue, QueueView};
189    /// let mut queue: Queue<u8, 2> = Queue::new();
190    /// let view: &mut QueueView<u8> = queue.as_mut_view();
191    /// ```
192    ///
193    /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
194    ///
195    /// ```rust
196    /// # use heapless::mpmc::{Queue, QueueView};
197    /// let mut queue: Queue<u8, 2> = Queue::new();
198    /// let view: &mut QueueView<u8> = &mut queue;
199    /// ```
200    #[inline]
201    pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
202        S::as_mpmc_mut_view(self)
203    }
204
205    fn mask(&self) -> UintSize {
206        (S::len(self.buffer.get()) - 1) as _
207    }
208
209    /// Returns the item in the front of the queue, or `None` if the queue is empty.
210    pub fn dequeue(&self) -> Option<T> {
211        unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
212    }
213
214    /// Adds an `item` to the end of the queue.
215    ///
216    /// Returns back the `item` if the queue is full.
217    pub fn enqueue(&self, item: T) -> Result<(), T> {
218        unsafe {
219            enqueue(
220                S::as_ptr(self.buffer.get()),
221                &self.enqueue_pos,
222                self.mask(),
223                item,
224            )
225        }
226    }
227}
228
229impl<T, const N: usize> Default for Queue<T, N> {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235impl<T, S: Storage> Drop for QueueInner<T, S> {
236    fn drop(&mut self) {
237        // Drop all elements currently in the queue.
238        while self.dequeue().is_some() {}
239    }
240}
241
242unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
243
244struct Cell<T> {
245    data: MaybeUninit<T>,
246    sequence: AtomicTargetSize,
247}
248
249impl<T> Cell<T> {
250    const fn new(seq: usize) -> Self {
251        Self {
252            data: MaybeUninit::uninit(),
253            sequence: AtomicTargetSize::new(seq as UintSize),
254        }
255    }
256}
257
258unsafe fn dequeue<T>(
259    buffer: *mut Cell<T>,
260    dequeue_pos: &AtomicTargetSize,
261    mask: UintSize,
262) -> Option<T> {
263    let mut pos = dequeue_pos.load(Ordering::Relaxed);
264
265    let mut cell;
266    loop {
267        cell = buffer.add(usize::from(pos & mask));
268        let seq = (*cell).sequence.load(Ordering::Acquire);
269        let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
270
271        match dif.cmp(&0) {
272            core::cmp::Ordering::Equal => {
273                if dequeue_pos
274                    .compare_exchange_weak(
275                        pos,
276                        pos.wrapping_add(1),
277                        Ordering::Relaxed,
278                        Ordering::Relaxed,
279                    )
280                    .is_ok()
281                {
282                    break;
283                }
284            }
285            core::cmp::Ordering::Less => {
286                return None;
287            }
288            core::cmp::Ordering::Greater => {
289                pos = dequeue_pos.load(Ordering::Relaxed);
290            }
291        }
292    }
293
294    let data = (*cell).data.as_ptr().read();
295    (*cell)
296        .sequence
297        .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
298    Some(data)
299}
300
301unsafe fn enqueue<T>(
302    buffer: *mut Cell<T>,
303    enqueue_pos: &AtomicTargetSize,
304    mask: UintSize,
305    item: T,
306) -> Result<(), T> {
307    let mut pos = enqueue_pos.load(Ordering::Relaxed);
308
309    let mut cell;
310    loop {
311        cell = buffer.add(usize::from(pos & mask));
312        let seq = (*cell).sequence.load(Ordering::Acquire);
313        let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
314
315        match dif.cmp(&0) {
316            core::cmp::Ordering::Equal => {
317                if enqueue_pos
318                    .compare_exchange_weak(
319                        pos,
320                        pos.wrapping_add(1),
321                        Ordering::Relaxed,
322                        Ordering::Relaxed,
323                    )
324                    .is_ok()
325                {
326                    break;
327                }
328            }
329            core::cmp::Ordering::Less => {
330                return Err(item);
331            }
332            core::cmp::Ordering::Greater => {
333                pos = enqueue_pos.load(Ordering::Relaxed);
334            }
335        }
336    }
337
338    (*cell).data.as_mut_ptr().write(item);
339    (*cell)
340        .sequence
341        .store(pos.wrapping_add(1), Ordering::Release);
342    Ok(())
343}
344
345#[cfg(test)]
346mod tests {
347    use static_assertions::assert_not_impl_any;
348
349    use super::Queue;
350
351    // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
352    assert_not_impl_any!(Queue<*const (), 4>: Send);
353
354    #[test]
355    fn memory_leak() {
356        droppable!();
357
358        let q = Queue::<_, 2>::new();
359        q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
360        q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
361        drop(q);
362
363        assert_eq!(Droppable::count(), 0);
364    }
365
366    #[test]
367    fn sanity() {
368        let q = Queue::<_, 2>::new();
369        q.enqueue(0).unwrap();
370        q.enqueue(1).unwrap();
371        assert!(q.enqueue(2).is_err());
372
373        assert_eq!(q.dequeue(), Some(0));
374        assert_eq!(q.dequeue(), Some(1));
375        assert_eq!(q.dequeue(), None);
376    }
377
378    #[test]
379    fn drain_at_pos255() {
380        let q = Queue::<_, 2>::new();
381        for _ in 0..255 {
382            assert!(q.enqueue(0).is_ok());
383            assert_eq!(q.dequeue(), Some(0));
384        }
385
386        // Queue is empty, this should not block forever.
387        assert_eq!(q.dequeue(), None);
388    }
389
390    #[test]
391    fn full_at_wrapped_pos0() {
392        let q = Queue::<_, 2>::new();
393        for _ in 0..254 {
394            assert!(q.enqueue(0).is_ok());
395            assert_eq!(q.dequeue(), Some(0));
396        }
397        assert!(q.enqueue(0).is_ok());
398        assert!(q.enqueue(0).is_ok());
399        // this should not block forever
400        assert!(q.enqueue(0).is_err());
401    }
402
403    #[test]
404    fn enqueue_full() {
405        #[cfg(not(feature = "mpmc_large"))]
406        const CAPACITY: usize = 128;
407
408        #[cfg(feature = "mpmc_large")]
409        const CAPACITY: usize = 256;
410
411        let q: Queue<u8, CAPACITY> = Queue::new();
412
413        assert_eq!(q.capacity(), CAPACITY);
414
415        for _ in 0..CAPACITY {
416            q.enqueue(0xAA).unwrap();
417        }
418
419        // Queue is full, this should not block forever.
420        q.enqueue(0x55).unwrap_err();
421    }
422}