heapless/
mpmc.rs

1//! A fixed capacity multiple-producer, multiple-consumer (MPMC) lock-free queue.
2//!
3//! # Deprecation
4//!
5//! <div class="warning">
6//! The current implementation of `mpmc` is marked as deprecated due to not being truly lock-free
7//! </div>
8//!
9//! If a thread is parked, or pre-empted for a long time by an higher-priority task
10//! during an `enqueue` or `dequeue` operation, it is possible that the queue ends-up
11//! in a state were no other task can successfully enqueue or dequeue items from it
12//! until the pre-empted task can finish its operation.
13//!
14//! In that case, [`enqueue`](QueueInner::dequeue) and [`dequeue`](QueueInner::enqueue)
15//! will return an error, but will not panic or reach undefined behaviour
16//!
17//! This makes `mpmc` unsuitable for some use cases such as using it as a pool of objects.
18//!
19//! ## When can this queue be used?
20//!
21//! This queue should be used for cross-task communication only when items sent over the queue
22//! can be dropped in case of concurrent operations, or when it is possible to retry
23//! the dequeue/enqueue operation after other tasks have had the opportunity to make progress.
24//!
25//! In that case you can safely ignore the warnings using `#[expect(deprecated)]`
26//! when `new` is called
27//!
28//! For more information, and possible alternative, please see
29//! <https://github.com/rust-embedded/heapless/issues/583>
30//!
31//!
32//! **Note:** This module requires atomic compare-and-swap (CAS) instructions. On
33//! targets where they're not natively available, they are emulated by the
34//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
35//!
36//! # Example
37//!
38//! This queue can be constructed in `const` context. Placing it in a `static` variable lets *all*
39//! contexts (interrupts/threads/`main`) safely enqueue and dequeue items.
40//!
41//! ```
42//! use core::sync::atomic::{AtomicU8, Ordering};
43//!
44//! use heapless::mpmc::Queue;
45//!
46//! static Q: Queue<u8, 2> = Queue::new();
47//!
48//! fn main() {
49//!     // Configure systick interrupt.
50//!
51//!     loop {
52//!         if let Some(x) = Q.dequeue() {
53//!             println!("{}", x);
54//!         } else {
55//!             // Wait for interrupt.
56//!         }
57//! #       break
58//!     }
59//! }
60//!
61//! fn systick() {
62//!     static COUNT: AtomicU8 = AtomicU8::new(0);
63//!     let count = COUNT.fetch_add(1, Ordering::SeqCst);
64//!
65//! #   let _ =
66//!     Q.enqueue(count);
67//! }
68//! ```
69//!
70//! # Benchmark
71//!
72//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled
73//! with `-C opt-level=z`:
74//!
75//! | Method                      | Time | N  |
76//! |:----------------------------|-----:|---:|
77//! | `Queue::<u8, 8>::enqueue()` |   34 |  0 |
78//! | `Queue::<u8, 8>::enqueue()` |   52 |  1 |
79//! | `Queue::<u8, 8>::enqueue()` |   69 |  2 |
80//! | `Queue::<u8, 8>::dequeue()` |   35 |  0 |
81//! | `Queue::<u8, 8>::dequeue()` |   53 |  1 |
82//! | `Queue::<u8, 8>::dequeue()` |   71 |  2 |
83//!
84//! - N denotes the number of interruptions. On Cortex-M, an interruption consists of an interrupt
85//!   handler preempting the would-be atomic section of the `enqueue`/`dequeue` operation. Note that
86//!   it does *not* matter if the higher priority handler uses the queue or not.
87//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
88//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
89//!   `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
90//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some` and
91//!   `enqueue` returning `Ok`.
92//!
93//! # References
94//!
95//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
96//! cache padding.
97//!
98//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
99
100use core::{cell::UnsafeCell, mem::MaybeUninit};
101
102#[cfg(not(feature = "portable-atomic"))]
103use core::sync::atomic;
104#[cfg(feature = "portable-atomic")]
105use portable_atomic as atomic;
106
107use atomic::Ordering;
108
109use crate::storage::{OwnedStorage, Storage, ViewStorage};
110
111#[cfg(feature = "mpmc_large")]
112type AtomicTargetSize = atomic::AtomicUsize;
113#[cfg(not(feature = "mpmc_large"))]
114type AtomicTargetSize = atomic::AtomicU8;
115
116#[cfg(feature = "mpmc_large")]
117type UintSize = usize;
118#[cfg(not(feature = "mpmc_large"))]
119type UintSize = u8;
120
121#[cfg(feature = "mpmc_large")]
122type IntSize = isize;
123#[cfg(not(feature = "mpmc_large"))]
124type IntSize = i8;
125
126/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
127///
128/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
129/// struct if you want to write code that's generic over both.
130pub struct QueueInner<T, S: Storage> {
131    dequeue_pos: AtomicTargetSize,
132    enqueue_pos: AtomicTargetSize,
133    buffer: UnsafeCell<S::Buffer<Cell<T>>>,
134}
135
136/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
137///
138/// <div class="warning">
139///
140/// `N` must be a power of 2.
141///
142/// </div>
143///
144/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
145pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
146
147/// A [`Queue`] with dynamic capacity.
148///
149/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by
150/// reference.
151pub type QueueView<T> = QueueInner<T, ViewStorage>;
152
153impl<T, const N: usize> Queue<T, N> {
154    #[deprecated(
155        note = "See the documentation of Queue::new() for more information: https://docs.rs/heapless/latest/heapless/mpmc/type.Queue.html#method.new"
156    )]
157    /// Creates an empty queue.
158    ///
159    /// # Deprecation
160    ///
161    /// <div class="warning">
162    /// The current implementation of `mpmc` is marked as deprecated due to not being truly
163    /// lock-free </div>
164    ///
165    /// If a thread is parked, or pre-empted for a long time by an higher-priority task
166    /// during an `enqueue` or `dequeue` operation, it is possible that the queue ends-up
167    /// in a state were no other task can successfully enqueue or dequeue items from it
168    /// until the pre-empted task can finish its operation.
169    ///
170    /// In that case, [`enqueue`](QueueInner::dequeue) and [`dequeue`](QueueInner::enqueue)
171    /// will return an error, but will not panic or reach undefined behaviour
172    ///
173    /// This makes `mpmc` unsuitable for some use cases such as using it as a pool of objects.
174    ///
175    /// ## When can this queue be used?
176    ///
177    /// This queue should be used for cross-task communication only when items sent over the queue
178    /// can be dropped in case of concurrent operations, or when it is possible to retry
179    /// the dequeue/enqueue operation after other tasks have had the opportunity to make progress.
180    ///
181    /// In that case you can safely ignore the warnings using `#[expect(deprecated)]`
182    /// when `new` is called
183    ///
184    /// For more information, and possible alternative, please see
185    /// <https://github.com/rust-embedded/heapless/issues/583>
186    pub const fn new() -> Self {
187        const {
188            assert!(N > 1);
189            assert!(N.is_power_of_two());
190            assert!(N < UintSize::MAX as usize);
191        }
192
193        let mut cell_count = 0;
194
195        let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
196        while cell_count != N {
197            result_cells[cell_count] = Cell::new(cell_count);
198            cell_count += 1;
199        }
200
201        Self {
202            buffer: UnsafeCell::new(result_cells),
203            dequeue_pos: AtomicTargetSize::new(0),
204            enqueue_pos: AtomicTargetSize::new(0),
205        }
206    }
207
208    /// Used in `Storage` implementation.
209    pub(crate) fn as_view_private(&self) -> &QueueView<T> {
210        self
211    }
212    /// Used in `Storage` implementation.
213    pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
214        self
215    }
216}
217
218impl<T, S: Storage> QueueInner<T, S> {
219    /// Returns the maximum number of elements the queue can hold.
220    #[inline]
221    pub fn capacity(&self) -> usize {
222        S::len(self.buffer.get())
223    }
224
225    /// Get a reference to the `Queue`, erasing the `N` const-generic.
226    ///
227    ///
228    /// ```rust
229    /// # use heapless::mpmc::{Queue, QueueView};
230    /// let queue: Queue<u8, 2> = Queue::new();
231    /// let view: &QueueView<u8> = queue.as_view();
232    /// ```
233    ///
234    /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements
235    /// `Unsize<QueueView<T>>`:
236    ///
237    /// ```rust
238    /// # use heapless::mpmc::{Queue, QueueView};
239    /// let queue: Queue<u8, 2> = Queue::new();
240    /// let view: &QueueView<u8> = &queue;
241    /// ```
242    #[inline]
243    pub fn as_view(&self) -> &QueueView<T> {
244        S::as_mpmc_view(self)
245    }
246
247    /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
248    ///
249    /// ```rust
250    /// # use heapless::mpmc::{Queue, QueueView};
251    /// ##[expect(deprecated)]
252    /// let mut queue: Queue<u8, 2> = Queue::new();
253    /// let view: &mut QueueView<u8> = queue.as_mut_view();
254    /// ```
255    ///
256    /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements
257    /// `Unsize<QueueView<T>>`:
258    ///
259    /// ```rust
260    /// # use heapless::mpmc::{Queue, QueueView};
261    /// ##[expect(deprecated)]
262    /// let mut queue: Queue<u8, 2> = Queue::new();
263    /// let view: &mut QueueView<u8> = &mut queue;
264    /// ```
265    #[inline]
266    pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
267        S::as_mpmc_mut_view(self)
268    }
269
270    fn mask(&self) -> UintSize {
271        (S::len(self.buffer.get()) - 1) as _
272    }
273
274    /// Returns the item in the front of the queue, or `None` if the queue is empty.
275    pub fn dequeue(&self) -> Option<T> {
276        unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
277    }
278
279    /// Adds an `item` to the end of the queue.
280    ///
281    /// Returns back the `item` if the queue is full.
282    pub fn enqueue(&self, item: T) -> Result<(), T> {
283        unsafe {
284            enqueue(
285                S::as_ptr(self.buffer.get()),
286                &self.enqueue_pos,
287                self.mask(),
288                item,
289            )
290        }
291    }
292}
293
294impl<T, const N: usize> Default for Queue<T, N> {
295    fn default() -> Self {
296        #[allow(deprecated)]
297        Self::new()
298    }
299}
300
301impl<T, S: Storage> Drop for QueueInner<T, S> {
302    fn drop(&mut self) {
303        // Drop all elements currently in the queue.
304        while self.dequeue().is_some() {}
305    }
306}
307
308unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
309
310struct Cell<T> {
311    data: MaybeUninit<T>,
312    sequence: AtomicTargetSize,
313}
314
315impl<T> Cell<T> {
316    const fn new(seq: usize) -> Self {
317        Self {
318            data: MaybeUninit::uninit(),
319            sequence: AtomicTargetSize::new(seq as UintSize),
320        }
321    }
322}
323
324unsafe fn dequeue<T>(
325    buffer: *mut Cell<T>,
326    dequeue_pos: &AtomicTargetSize,
327    mask: UintSize,
328) -> Option<T> {
329    let mut pos = dequeue_pos.load(Ordering::Relaxed);
330
331    let mut cell;
332    loop {
333        cell = buffer.add(usize::from(pos & mask));
334        let seq = (*cell).sequence.load(Ordering::Acquire);
335        let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
336
337        match dif.cmp(&0) {
338            core::cmp::Ordering::Equal => {
339                if dequeue_pos
340                    .compare_exchange_weak(
341                        pos,
342                        pos.wrapping_add(1),
343                        Ordering::Relaxed,
344                        Ordering::Relaxed,
345                    )
346                    .is_ok()
347                {
348                    break;
349                }
350            }
351            core::cmp::Ordering::Less => {
352                return None;
353            }
354            core::cmp::Ordering::Greater => {
355                pos = dequeue_pos.load(Ordering::Relaxed);
356            }
357        }
358    }
359
360    let data = (*cell).data.as_ptr().read();
361    (*cell)
362        .sequence
363        .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
364    Some(data)
365}
366
367unsafe fn enqueue<T>(
368    buffer: *mut Cell<T>,
369    enqueue_pos: &AtomicTargetSize,
370    mask: UintSize,
371    item: T,
372) -> Result<(), T> {
373    let mut pos = enqueue_pos.load(Ordering::Relaxed);
374
375    let mut cell;
376    loop {
377        cell = buffer.add(usize::from(pos & mask));
378        let seq = (*cell).sequence.load(Ordering::Acquire);
379        let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
380
381        match dif.cmp(&0) {
382            core::cmp::Ordering::Equal => {
383                if enqueue_pos
384                    .compare_exchange_weak(
385                        pos,
386                        pos.wrapping_add(1),
387                        Ordering::Relaxed,
388                        Ordering::Relaxed,
389                    )
390                    .is_ok()
391                {
392                    break;
393                }
394            }
395            core::cmp::Ordering::Less => {
396                return Err(item);
397            }
398            core::cmp::Ordering::Greater => {
399                pos = enqueue_pos.load(Ordering::Relaxed);
400            }
401        }
402    }
403
404    (*cell).data.as_mut_ptr().write(item);
405    (*cell)
406        .sequence
407        .store(pos.wrapping_add(1), Ordering::Release);
408    Ok(())
409}
410
411#[cfg(test)]
412mod tests {
413    use static_assertions::assert_not_impl_any;
414
415    use super::Queue;
416
417    // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
418    assert_not_impl_any!(Queue<*const (), 4>: Send);
419
420    #[test]
421    fn memory_leak() {
422        droppable!();
423
424        #[expect(deprecated)]
425        let q = Queue::<_, 2>::new();
426        q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
427        q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
428        drop(q);
429
430        assert_eq!(Droppable::count(), 0);
431    }
432
433    #[test]
434    fn sanity() {
435        #[expect(deprecated)]
436        let q = Queue::<_, 2>::new();
437        q.enqueue(0).unwrap();
438        q.enqueue(1).unwrap();
439        assert!(q.enqueue(2).is_err());
440
441        assert_eq!(q.dequeue(), Some(0));
442        assert_eq!(q.dequeue(), Some(1));
443        assert_eq!(q.dequeue(), None);
444    }
445
446    #[test]
447    fn drain_at_pos255() {
448        #[expect(deprecated)]
449        let q = Queue::<_, 2>::new();
450        for _ in 0..255 {
451            assert!(q.enqueue(0).is_ok());
452            assert_eq!(q.dequeue(), Some(0));
453        }
454
455        // Queue is empty, this should not block forever.
456        assert_eq!(q.dequeue(), None);
457    }
458
459    #[test]
460    fn full_at_wrapped_pos0() {
461        #[expect(deprecated)]
462        let q = Queue::<_, 2>::new();
463        for _ in 0..254 {
464            assert!(q.enqueue(0).is_ok());
465            assert_eq!(q.dequeue(), Some(0));
466        }
467        assert!(q.enqueue(0).is_ok());
468        assert!(q.enqueue(0).is_ok());
469        // this should not block forever
470        assert!(q.enqueue(0).is_err());
471    }
472
473    #[test]
474    fn enqueue_full() {
475        #[cfg(not(feature = "mpmc_large"))]
476        const CAPACITY: usize = 128;
477
478        #[cfg(feature = "mpmc_large")]
479        const CAPACITY: usize = 256;
480
481        #[expect(deprecated)]
482        let q: Queue<u8, CAPACITY> = Queue::new();
483
484        assert_eq!(q.capacity(), CAPACITY);
485
486        for _ in 0..CAPACITY {
487            q.enqueue(0xAA).unwrap();
488        }
489
490        // Queue is full, this should not block forever.
491        q.enqueue(0x55).unwrap_err();
492    }
493}