heapless/mpmc.rs
1//! A fixed capacity multiple-producer, multiple-consumer (MPMC) lock-free queue.
2//!
3//! **Note:** This module requires atomic compare-and-swap (CAS) instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Example
8//!
9//! This queue can be constructed in `const` context. Placing it in a `static` variable lets *all*
10//! contexts (interrupts/threads/`main`) safely enqueue and dequeue items.
11//!
12//! ```
13//! use core::sync::atomic::{AtomicU8, Ordering};
14//!
15//! use heapless::mpmc::Queue;
16//!
17//! static Q: Queue<u8, 2> = Queue::new();
18//!
19//! fn main() {
20//! // Configure systick interrupt.
21//!
22//! loop {
23//! if let Some(x) = Q.dequeue() {
24//! println!("{}", x);
25//! } else {
26//! // Wait for interrupt.
27//! }
28//! # break
29//! }
30//! }
31//!
32//! fn systick() {
33//! static COUNT: AtomicU8 = AtomicU8::new(0);
34//! let count = COUNT.fetch_add(1, Ordering::SeqCst);
35//!
36//! # let _ =
37//! Q.enqueue(count);
38//! }
39//! ```
40//!
41//! # Benchmark
42//!
43//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled with `-C opt-level=z`:
44//!
45//! | Method | Time | N |
46//! |:----------------------------|-----:|---:|
47//! | `Queue::<u8, 8>::enqueue()` | 34 | 0 |
48//! | `Queue::<u8, 8>::enqueue()` | 52 | 1 |
49//! | `Queue::<u8, 8>::enqueue()` | 69 | 2 |
50//! | `Queue::<u8, 8>::dequeue()` | 35 | 0 |
51//! | `Queue::<u8, 8>::dequeue()` | 53 | 1 |
52//! | `Queue::<u8, 8>::dequeue()` | 71 | 2 |
53//!
54//! - N denotes the number of interruptions. On Cortex-M, an interruption consists of an
55//! interrupt handler preempting the would-be atomic section of the `enqueue`/`dequeue`
56//! operation. Note that it does *not* matter if the higher priority handler uses the queue or
57//! not.
58//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
59//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
60//! `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
61//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
62//! and `enqueue` returning `Ok`.
63//!
64//! # References
65//!
66//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
67//! cache padding.
68//!
69//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
70
71use core::{cell::UnsafeCell, mem::MaybeUninit};
72
73#[cfg(not(feature = "portable-atomic"))]
74use core::sync::atomic;
75#[cfg(feature = "portable-atomic")]
76use portable_atomic as atomic;
77
78use atomic::Ordering;
79
80use crate::storage::{OwnedStorage, Storage, ViewStorage};
81
82#[cfg(feature = "mpmc_large")]
83type AtomicTargetSize = atomic::AtomicUsize;
84#[cfg(not(feature = "mpmc_large"))]
85type AtomicTargetSize = atomic::AtomicU8;
86
87#[cfg(feature = "mpmc_large")]
88type UintSize = usize;
89#[cfg(not(feature = "mpmc_large"))]
90type UintSize = u8;
91
92#[cfg(feature = "mpmc_large")]
93type IntSize = isize;
94#[cfg(not(feature = "mpmc_large"))]
95type IntSize = i8;
96
97/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
98///
99/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
100/// struct if you want to write code that's generic over both.
101pub struct QueueInner<T, S: Storage> {
102 dequeue_pos: AtomicTargetSize,
103 enqueue_pos: AtomicTargetSize,
104 buffer: UnsafeCell<S::Buffer<Cell<T>>>,
105}
106
107/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
108///
109/// <div class="warning">
110///
111/// `N` must be a power of 2.
112///
113/// </div>
114///
115/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
116pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
117
118/// A [`Queue`] with dynamic capacity.
119///
120/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
121pub type QueueView<T> = QueueInner<T, ViewStorage>;
122
123impl<T, const N: usize> Queue<T, N> {
124 /// Creates an empty queue.
125 pub const fn new() -> Self {
126 const {
127 assert!(N > 1);
128 assert!(N.is_power_of_two());
129 assert!(N < UintSize::MAX as usize);
130 }
131
132 let mut cell_count = 0;
133
134 let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
135 while cell_count != N {
136 result_cells[cell_count] = Cell::new(cell_count);
137 cell_count += 1;
138 }
139
140 Self {
141 buffer: UnsafeCell::new(result_cells),
142 dequeue_pos: AtomicTargetSize::new(0),
143 enqueue_pos: AtomicTargetSize::new(0),
144 }
145 }
146
147 /// Used in `Storage` implementation.
148 pub(crate) fn as_view_private(&self) -> &QueueView<T> {
149 self
150 }
151 /// Used in `Storage` implementation.
152 pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
153 self
154 }
155}
156
157impl<T, S: Storage> QueueInner<T, S> {
158 /// Returns the maximum number of elements the queue can hold.
159 #[inline]
160 pub fn capacity(&self) -> usize {
161 S::len(self.buffer.get())
162 }
163
164 /// Get a reference to the `Queue`, erasing the `N` const-generic.
165 ///
166 ///
167 /// ```rust
168 /// # use heapless::mpmc::{Queue, QueueView};
169 /// let queue: Queue<u8, 2> = Queue::new();
170 /// let view: &QueueView<u8> = queue.as_view();
171 /// ```
172 ///
173 /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
174 ///
175 /// ```rust
176 /// # use heapless::mpmc::{Queue, QueueView};
177 /// let queue: Queue<u8, 2> = Queue::new();
178 /// let view: &QueueView<u8> = &queue;
179 /// ```
180 #[inline]
181 pub fn as_view(&self) -> &QueueView<T> {
182 S::as_mpmc_view(self)
183 }
184
185 /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
186 ///
187 /// ```rust
188 /// # use heapless::mpmc::{Queue, QueueView};
189 /// let mut queue: Queue<u8, 2> = Queue::new();
190 /// let view: &mut QueueView<u8> = queue.as_mut_view();
191 /// ```
192 ///
193 /// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
194 ///
195 /// ```rust
196 /// # use heapless::mpmc::{Queue, QueueView};
197 /// let mut queue: Queue<u8, 2> = Queue::new();
198 /// let view: &mut QueueView<u8> = &mut queue;
199 /// ```
200 #[inline]
201 pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
202 S::as_mpmc_mut_view(self)
203 }
204
205 fn mask(&self) -> UintSize {
206 (S::len(self.buffer.get()) - 1) as _
207 }
208
209 /// Returns the item in the front of the queue, or `None` if the queue is empty.
210 pub fn dequeue(&self) -> Option<T> {
211 unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
212 }
213
214 /// Adds an `item` to the end of the queue.
215 ///
216 /// Returns back the `item` if the queue is full.
217 pub fn enqueue(&self, item: T) -> Result<(), T> {
218 unsafe {
219 enqueue(
220 S::as_ptr(self.buffer.get()),
221 &self.enqueue_pos,
222 self.mask(),
223 item,
224 )
225 }
226 }
227}
228
229impl<T, const N: usize> Default for Queue<T, N> {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235impl<T, S: Storage> Drop for QueueInner<T, S> {
236 fn drop(&mut self) {
237 // Drop all elements currently in the queue.
238 while self.dequeue().is_some() {}
239 }
240}
241
242unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
243
244struct Cell<T> {
245 data: MaybeUninit<T>,
246 sequence: AtomicTargetSize,
247}
248
249impl<T> Cell<T> {
250 const fn new(seq: usize) -> Self {
251 Self {
252 data: MaybeUninit::uninit(),
253 sequence: AtomicTargetSize::new(seq as UintSize),
254 }
255 }
256}
257
258unsafe fn dequeue<T>(
259 buffer: *mut Cell<T>,
260 dequeue_pos: &AtomicTargetSize,
261 mask: UintSize,
262) -> Option<T> {
263 let mut pos = dequeue_pos.load(Ordering::Relaxed);
264
265 let mut cell;
266 loop {
267 cell = buffer.add(usize::from(pos & mask));
268 let seq = (*cell).sequence.load(Ordering::Acquire);
269 let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
270
271 match dif.cmp(&0) {
272 core::cmp::Ordering::Equal => {
273 if dequeue_pos
274 .compare_exchange_weak(
275 pos,
276 pos.wrapping_add(1),
277 Ordering::Relaxed,
278 Ordering::Relaxed,
279 )
280 .is_ok()
281 {
282 break;
283 }
284 }
285 core::cmp::Ordering::Less => {
286 return None;
287 }
288 core::cmp::Ordering::Greater => {
289 pos = dequeue_pos.load(Ordering::Relaxed);
290 }
291 }
292 }
293
294 let data = (*cell).data.as_ptr().read();
295 (*cell)
296 .sequence
297 .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
298 Some(data)
299}
300
301unsafe fn enqueue<T>(
302 buffer: *mut Cell<T>,
303 enqueue_pos: &AtomicTargetSize,
304 mask: UintSize,
305 item: T,
306) -> Result<(), T> {
307 let mut pos = enqueue_pos.load(Ordering::Relaxed);
308
309 let mut cell;
310 loop {
311 cell = buffer.add(usize::from(pos & mask));
312 let seq = (*cell).sequence.load(Ordering::Acquire);
313 let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
314
315 match dif.cmp(&0) {
316 core::cmp::Ordering::Equal => {
317 if enqueue_pos
318 .compare_exchange_weak(
319 pos,
320 pos.wrapping_add(1),
321 Ordering::Relaxed,
322 Ordering::Relaxed,
323 )
324 .is_ok()
325 {
326 break;
327 }
328 }
329 core::cmp::Ordering::Less => {
330 return Err(item);
331 }
332 core::cmp::Ordering::Greater => {
333 pos = enqueue_pos.load(Ordering::Relaxed);
334 }
335 }
336 }
337
338 (*cell).data.as_mut_ptr().write(item);
339 (*cell)
340 .sequence
341 .store(pos.wrapping_add(1), Ordering::Release);
342 Ok(())
343}
344
345#[cfg(test)]
346mod tests {
347 use static_assertions::assert_not_impl_any;
348
349 use super::Queue;
350
351 // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
352 assert_not_impl_any!(Queue<*const (), 4>: Send);
353
354 #[test]
355 fn memory_leak() {
356 droppable!();
357
358 let q = Queue::<_, 2>::new();
359 q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
360 q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
361 drop(q);
362
363 assert_eq!(Droppable::count(), 0);
364 }
365
366 #[test]
367 fn sanity() {
368 let q = Queue::<_, 2>::new();
369 q.enqueue(0).unwrap();
370 q.enqueue(1).unwrap();
371 assert!(q.enqueue(2).is_err());
372
373 assert_eq!(q.dequeue(), Some(0));
374 assert_eq!(q.dequeue(), Some(1));
375 assert_eq!(q.dequeue(), None);
376 }
377
378 #[test]
379 fn drain_at_pos255() {
380 let q = Queue::<_, 2>::new();
381 for _ in 0..255 {
382 assert!(q.enqueue(0).is_ok());
383 assert_eq!(q.dequeue(), Some(0));
384 }
385
386 // Queue is empty, this should not block forever.
387 assert_eq!(q.dequeue(), None);
388 }
389
390 #[test]
391 fn full_at_wrapped_pos0() {
392 let q = Queue::<_, 2>::new();
393 for _ in 0..254 {
394 assert!(q.enqueue(0).is_ok());
395 assert_eq!(q.dequeue(), Some(0));
396 }
397 assert!(q.enqueue(0).is_ok());
398 assert!(q.enqueue(0).is_ok());
399 // this should not block forever
400 assert!(q.enqueue(0).is_err());
401 }
402
403 #[test]
404 fn enqueue_full() {
405 #[cfg(not(feature = "mpmc_large"))]
406 const CAPACITY: usize = 128;
407
408 #[cfg(feature = "mpmc_large")]
409 const CAPACITY: usize = 256;
410
411 let q: Queue<u8, CAPACITY> = Queue::new();
412
413 assert_eq!(q.capacity(), CAPACITY);
414
415 for _ in 0..CAPACITY {
416 q.enqueue(0xAA).unwrap();
417 }
418
419 // Queue is full, this should not block forever.
420 q.enqueue(0x55).unwrap_err();
421 }
422}