heapless/mpmc.rs
1//! A fixed capacity multiple-producer, multiple-consumer (MPMC) lock-free queue.
2//!
3//! # Deprecation
4//!
5//! <div class="warning">
6//! The current implementation of `mpmc` is marked as deprecated due to not being truly lock-free
7//! </div>
8//!
9//! If a thread is parked, or pre-empted for a long time by an higher-priority task
10//! during an `enqueue` or `dequeue` operation, it is possible that the queue ends-up
11//! in a state were no other task can successfully enqueue or dequeue items from it
12//! until the pre-empted task can finish its operation.
13//!
14//! In that case, [`enqueue`](QueueInner::dequeue) and [`dequeue`](QueueInner::enqueue)
15//! will return an error, but will not panic or reach undefined behaviour
16//!
17//! This makes `mpmc` unsuitable for some use cases such as using it as a pool of objects.
18//!
19//! ## When can this queue be used?
20//!
21//! This queue should be used for cross-task communication only when items sent over the queue
22//! can be dropped in case of concurrent operations, or when it is possible to retry
23//! the dequeue/enqueue operation after other tasks have had the opportunity to make progress.
24//!
25//! In that case you can safely ignore the warnings using `#[expect(deprecated)]`
26//! when `new` is called
27//!
28//! For more information, and possible alternative, please see
29//! <https://github.com/rust-embedded/heapless/issues/583>
30//!
31//!
32//! **Note:** This module requires atomic compare-and-swap (CAS) instructions. On
33//! targets where they're not natively available, they are emulated by the
34//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
35//!
36//! # Example
37//!
38//! This queue can be constructed in `const` context. Placing it in a `static` variable lets *all*
39//! contexts (interrupts/threads/`main`) safely enqueue and dequeue items.
40//!
41//! ```
42//! use core::sync::atomic::{AtomicU8, Ordering};
43//!
44//! use heapless::mpmc::Queue;
45//!
46//! static Q: Queue<u8, 2> = Queue::new();
47//!
48//! fn main() {
49//! // Configure systick interrupt.
50//!
51//! loop {
52//! if let Some(x) = Q.dequeue() {
53//! println!("{}", x);
54//! } else {
55//! // Wait for interrupt.
56//! }
57//! # break
58//! }
59//! }
60//!
61//! fn systick() {
62//! static COUNT: AtomicU8 = AtomicU8::new(0);
63//! let count = COUNT.fetch_add(1, Ordering::SeqCst);
64//!
65//! # let _ =
66//! Q.enqueue(count);
67//! }
68//! ```
69//!
70//! # Benchmark
71//!
72//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled
73//! with `-C opt-level=z`:
74//!
75//! | Method | Time | N |
76//! |:----------------------------|-----:|---:|
77//! | `Queue::<u8, 8>::enqueue()` | 34 | 0 |
78//! | `Queue::<u8, 8>::enqueue()` | 52 | 1 |
79//! | `Queue::<u8, 8>::enqueue()` | 69 | 2 |
80//! | `Queue::<u8, 8>::dequeue()` | 35 | 0 |
81//! | `Queue::<u8, 8>::dequeue()` | 53 | 1 |
82//! | `Queue::<u8, 8>::dequeue()` | 71 | 2 |
83//!
84//! - N denotes the number of interruptions. On Cortex-M, an interruption consists of an interrupt
85//! handler preempting the would-be atomic section of the `enqueue`/`dequeue` operation. Note that
86//! it does *not* matter if the higher priority handler uses the queue or not.
87//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
88//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
89//! `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
90//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some` and
91//! `enqueue` returning `Ok`.
92//!
93//! # References
94//!
95//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
96//! cache padding.
97//!
98//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
99
100use core::{cell::UnsafeCell, mem::MaybeUninit};
101
102#[cfg(not(feature = "portable-atomic"))]
103use core::sync::atomic;
104#[cfg(feature = "portable-atomic")]
105use portable_atomic as atomic;
106
107use atomic::Ordering;
108
109use crate::storage::{OwnedStorage, Storage, ViewStorage};
110
111#[cfg(feature = "mpmc_large")]
112type AtomicTargetSize = atomic::AtomicUsize;
113#[cfg(not(feature = "mpmc_large"))]
114type AtomicTargetSize = atomic::AtomicU8;
115
116#[cfg(feature = "mpmc_large")]
117type UintSize = usize;
118#[cfg(not(feature = "mpmc_large"))]
119type UintSize = u8;
120
121#[cfg(feature = "mpmc_large")]
122type IntSize = isize;
123#[cfg(not(feature = "mpmc_large"))]
124type IntSize = i8;
125
126/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
127///
128/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
129/// struct if you want to write code that's generic over both.
130pub struct QueueInner<T, S: Storage> {
131 dequeue_pos: AtomicTargetSize,
132 enqueue_pos: AtomicTargetSize,
133 buffer: UnsafeCell<S::Buffer<Cell<T>>>,
134}
135
136/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
137///
138/// <div class="warning">
139///
140/// `N` must be a power of 2.
141///
142/// </div>
143///
144/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
145pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
146
147/// A [`Queue`] with dynamic capacity.
148///
149/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by
150/// reference.
151pub type QueueView<T> = QueueInner<T, ViewStorage>;
152
153impl<T, const N: usize> Queue<T, N> {
154 #[deprecated(
155 note = "See the documentation of Queue::new() for more information: https://docs.rs/heapless/latest/heapless/mpmc/type.Queue.html#method.new"
156 )]
157 /// Creates an empty queue.
158 ///
159 /// # Deprecation
160 ///
161 /// <div class="warning">
162 /// The current implementation of `mpmc` is marked as deprecated due to not being truly
163 /// lock-free </div>
164 ///
165 /// If a thread is parked, or pre-empted for a long time by an higher-priority task
166 /// during an `enqueue` or `dequeue` operation, it is possible that the queue ends-up
167 /// in a state were no other task can successfully enqueue or dequeue items from it
168 /// until the pre-empted task can finish its operation.
169 ///
170 /// In that case, [`enqueue`](QueueInner::dequeue) and [`dequeue`](QueueInner::enqueue)
171 /// will return an error, but will not panic or reach undefined behaviour
172 ///
173 /// This makes `mpmc` unsuitable for some use cases such as using it as a pool of objects.
174 ///
175 /// ## When can this queue be used?
176 ///
177 /// This queue should be used for cross-task communication only when items sent over the queue
178 /// can be dropped in case of concurrent operations, or when it is possible to retry
179 /// the dequeue/enqueue operation after other tasks have had the opportunity to make progress.
180 ///
181 /// In that case you can safely ignore the warnings using `#[expect(deprecated)]`
182 /// when `new` is called
183 ///
184 /// For more information, and possible alternative, please see
185 /// <https://github.com/rust-embedded/heapless/issues/583>
186 pub const fn new() -> Self {
187 const {
188 assert!(N > 1);
189 assert!(N.is_power_of_two());
190 assert!(N < UintSize::MAX as usize);
191 }
192
193 let mut cell_count = 0;
194
195 let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
196 while cell_count != N {
197 result_cells[cell_count] = Cell::new(cell_count);
198 cell_count += 1;
199 }
200
201 Self {
202 buffer: UnsafeCell::new(result_cells),
203 dequeue_pos: AtomicTargetSize::new(0),
204 enqueue_pos: AtomicTargetSize::new(0),
205 }
206 }
207
208 /// Used in `Storage` implementation.
209 pub(crate) fn as_view_private(&self) -> &QueueView<T> {
210 self
211 }
212 /// Used in `Storage` implementation.
213 pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
214 self
215 }
216}
217
218impl<T, S: Storage> QueueInner<T, S> {
219 /// Returns the maximum number of elements the queue can hold.
220 #[inline]
221 pub fn capacity(&self) -> usize {
222 S::len(self.buffer.get())
223 }
224
225 /// Get a reference to the `Queue`, erasing the `N` const-generic.
226 ///
227 ///
228 /// ```rust
229 /// # use heapless::mpmc::{Queue, QueueView};
230 /// let queue: Queue<u8, 2> = Queue::new();
231 /// let view: &QueueView<u8> = queue.as_view();
232 /// ```
233 ///
234 /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements
235 /// `Unsize<QueueView<T>>`:
236 ///
237 /// ```rust
238 /// # use heapless::mpmc::{Queue, QueueView};
239 /// let queue: Queue<u8, 2> = Queue::new();
240 /// let view: &QueueView<u8> = &queue;
241 /// ```
242 #[inline]
243 pub fn as_view(&self) -> &QueueView<T> {
244 S::as_mpmc_view(self)
245 }
246
247 /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
248 ///
249 /// ```rust
250 /// # use heapless::mpmc::{Queue, QueueView};
251 /// ##[expect(deprecated)]
252 /// let mut queue: Queue<u8, 2> = Queue::new();
253 /// let view: &mut QueueView<u8> = queue.as_mut_view();
254 /// ```
255 ///
256 /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements
257 /// `Unsize<QueueView<T>>`:
258 ///
259 /// ```rust
260 /// # use heapless::mpmc::{Queue, QueueView};
261 /// ##[expect(deprecated)]
262 /// let mut queue: Queue<u8, 2> = Queue::new();
263 /// let view: &mut QueueView<u8> = &mut queue;
264 /// ```
265 #[inline]
266 pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
267 S::as_mpmc_mut_view(self)
268 }
269
270 fn mask(&self) -> UintSize {
271 (S::len(self.buffer.get()) - 1) as _
272 }
273
274 /// Returns the item in the front of the queue, or `None` if the queue is empty.
275 pub fn dequeue(&self) -> Option<T> {
276 unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
277 }
278
279 /// Adds an `item` to the end of the queue.
280 ///
281 /// Returns back the `item` if the queue is full.
282 pub fn enqueue(&self, item: T) -> Result<(), T> {
283 unsafe {
284 enqueue(
285 S::as_ptr(self.buffer.get()),
286 &self.enqueue_pos,
287 self.mask(),
288 item,
289 )
290 }
291 }
292}
293
294impl<T, const N: usize> Default for Queue<T, N> {
295 fn default() -> Self {
296 #[allow(deprecated)]
297 Self::new()
298 }
299}
300
301impl<T, S: Storage> Drop for QueueInner<T, S> {
302 fn drop(&mut self) {
303 // Drop all elements currently in the queue.
304 while self.dequeue().is_some() {}
305 }
306}
307
308unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
309
310struct Cell<T> {
311 data: MaybeUninit<T>,
312 sequence: AtomicTargetSize,
313}
314
315impl<T> Cell<T> {
316 const fn new(seq: usize) -> Self {
317 Self {
318 data: MaybeUninit::uninit(),
319 sequence: AtomicTargetSize::new(seq as UintSize),
320 }
321 }
322}
323
324unsafe fn dequeue<T>(
325 buffer: *mut Cell<T>,
326 dequeue_pos: &AtomicTargetSize,
327 mask: UintSize,
328) -> Option<T> {
329 let mut pos = dequeue_pos.load(Ordering::Relaxed);
330
331 let mut cell;
332 loop {
333 cell = buffer.add(usize::from(pos & mask));
334 let seq = (*cell).sequence.load(Ordering::Acquire);
335 let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
336
337 match dif.cmp(&0) {
338 core::cmp::Ordering::Equal => {
339 if dequeue_pos
340 .compare_exchange_weak(
341 pos,
342 pos.wrapping_add(1),
343 Ordering::Relaxed,
344 Ordering::Relaxed,
345 )
346 .is_ok()
347 {
348 break;
349 }
350 }
351 core::cmp::Ordering::Less => {
352 return None;
353 }
354 core::cmp::Ordering::Greater => {
355 pos = dequeue_pos.load(Ordering::Relaxed);
356 }
357 }
358 }
359
360 let data = (*cell).data.as_ptr().read();
361 (*cell)
362 .sequence
363 .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
364 Some(data)
365}
366
367unsafe fn enqueue<T>(
368 buffer: *mut Cell<T>,
369 enqueue_pos: &AtomicTargetSize,
370 mask: UintSize,
371 item: T,
372) -> Result<(), T> {
373 let mut pos = enqueue_pos.load(Ordering::Relaxed);
374
375 let mut cell;
376 loop {
377 cell = buffer.add(usize::from(pos & mask));
378 let seq = (*cell).sequence.load(Ordering::Acquire);
379 let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
380
381 match dif.cmp(&0) {
382 core::cmp::Ordering::Equal => {
383 if enqueue_pos
384 .compare_exchange_weak(
385 pos,
386 pos.wrapping_add(1),
387 Ordering::Relaxed,
388 Ordering::Relaxed,
389 )
390 .is_ok()
391 {
392 break;
393 }
394 }
395 core::cmp::Ordering::Less => {
396 return Err(item);
397 }
398 core::cmp::Ordering::Greater => {
399 pos = enqueue_pos.load(Ordering::Relaxed);
400 }
401 }
402 }
403
404 (*cell).data.as_mut_ptr().write(item);
405 (*cell)
406 .sequence
407 .store(pos.wrapping_add(1), Ordering::Release);
408 Ok(())
409}
410
411#[cfg(test)]
412mod tests {
413 use static_assertions::assert_not_impl_any;
414
415 use super::Queue;
416
417 // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
418 assert_not_impl_any!(Queue<*const (), 4>: Send);
419
420 #[test]
421 fn memory_leak() {
422 droppable!();
423
424 #[expect(deprecated)]
425 let q = Queue::<_, 2>::new();
426 q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
427 q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
428 drop(q);
429
430 assert_eq!(Droppable::count(), 0);
431 }
432
433 #[test]
434 fn sanity() {
435 #[expect(deprecated)]
436 let q = Queue::<_, 2>::new();
437 q.enqueue(0).unwrap();
438 q.enqueue(1).unwrap();
439 assert!(q.enqueue(2).is_err());
440
441 assert_eq!(q.dequeue(), Some(0));
442 assert_eq!(q.dequeue(), Some(1));
443 assert_eq!(q.dequeue(), None);
444 }
445
446 #[test]
447 fn drain_at_pos255() {
448 #[expect(deprecated)]
449 let q = Queue::<_, 2>::new();
450 for _ in 0..255 {
451 assert!(q.enqueue(0).is_ok());
452 assert_eq!(q.dequeue(), Some(0));
453 }
454
455 // Queue is empty, this should not block forever.
456 assert_eq!(q.dequeue(), None);
457 }
458
459 #[test]
460 fn full_at_wrapped_pos0() {
461 #[expect(deprecated)]
462 let q = Queue::<_, 2>::new();
463 for _ in 0..254 {
464 assert!(q.enqueue(0).is_ok());
465 assert_eq!(q.dequeue(), Some(0));
466 }
467 assert!(q.enqueue(0).is_ok());
468 assert!(q.enqueue(0).is_ok());
469 // this should not block forever
470 assert!(q.enqueue(0).is_err());
471 }
472
473 #[test]
474 fn enqueue_full() {
475 #[cfg(not(feature = "mpmc_large"))]
476 const CAPACITY: usize = 128;
477
478 #[cfg(feature = "mpmc_large")]
479 const CAPACITY: usize = 256;
480
481 #[expect(deprecated)]
482 let q: Queue<u8, CAPACITY> = Queue::new();
483
484 assert_eq!(q.capacity(), CAPACITY);
485
486 for _ in 0..CAPACITY {
487 q.enqueue(0xAA).unwrap();
488 }
489
490 // Queue is full, this should not block forever.
491 q.enqueue(0x55).unwrap_err();
492 }
493}