heapless/spsc.rs
1//! A fixed capacity single-producer, single-consumer (SPSC) lock-free queue.
2//!
3//! *Note:* This module requires atomic load and store instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Examples
8//!
9//! [`Queue`] can be used as a plain queue:
10//!
11//! ```
12//! use heapless::spsc::Queue;
13//!
14//! let mut queue: Queue<u8, 4> = Queue::new();
15//!
16//! assert_eq!(queue.enqueue(0), Ok(()));
17//! assert_eq!(queue.enqueue(1), Ok(()));
18//! assert_eq!(queue.enqueue(2), Ok(()));
19//! assert_eq!(queue.enqueue(3), Err(3)); // Queue is full.
20//!
21//! assert_eq!(queue.dequeue(), Some(0));
22//! ```
23//!
24//! [`Queue::split`] can be used to split the queue into a [`Producer`]/[`Consumer`] pair.
25//!
26//! After splitting a `&'static mut Queue`, the resulting [`Producer`] and [`Consumer`]
27//! can be moved into different execution contexts, e.g. threads, interrupt handlers, etc.
28//!
29//!
30//! ```
31//! use heapless::spsc::{Producer, Queue};
32//!
33//! #[derive(Debug)]
34//! enum Event {
35//! A,
36//! B,
37//! }
38//!
39//! fn main() {
40//! let queue: &'static mut Queue<Event, 4> = {
41//! static mut Q: Queue<Event, 4> = Queue::new();
42//! // SAFETY: `Q` is only accessible in this scope
43//! // and `main` is only called once.
44//! unsafe { &mut Q }
45//! };
46//!
47//! let (producer, mut consumer) = queue.split();
48//!
49//! // `producer` can be moved into `interrupt_handler` using a static mutex or the mechanism
50//! // provided by the concurrency framework you are using, e.g. a resource in RTIC.
51//! # let mut producer = producer;
52//! # interrupt_handler(&mut producer);
53//!
54//! loop {
55//! match consumer.dequeue() {
56//! Some(Event::A) => { /* .. */ }
57//! Some(Event::B) => { /* .. */ }
58//! None => { /* Sleep. */ }
59//! }
60//! # break
61//! }
62//! }
63//!
64//! // This is a different execution context that can preempt `main`.
65//! fn interrupt_handler(producer: &mut Producer<'static, Event>) {
66//! # let condition = true;
67//!
68//! // ..
69//!
70//! if condition {
71//! producer.enqueue(Event::A).unwrap();
72//! } else {
73//! producer.enqueue(Event::B).unwrap();
74//! }
75//!
76//! // ..
77//! }
78//! ```
79//!
80//! # Benchmarks
81//!
82//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled
83//! with `-C opt-level=3`:
84//!
85//! | Method | Time |
86//! |:-------------------------------|-----:|
87//! | `Producer::<u8, _>::enqueue()` | 16 |
88//! | `Queue::<u8, _>::enqueue()` | 14 |
89//! | `Consumer::<u8, _>::dequeue()` | 15 |
90//! | `Queue::<u8, _>::dequeue()` | 12 |
91//!
92//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
93//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
94//! `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
95//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some` and
96//! `enqueue` returning `Ok`.
97//!
98//! # References
99//!
100//! This is an implementation based on [https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular](
101//! https://web.archive.org/web/20250117082625/https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular
102//! ).
103
104use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
105
106#[cfg(not(feature = "portable-atomic"))]
107use core::sync::atomic;
108#[cfg(feature = "portable-atomic")]
109use portable_atomic as atomic;
110
111use atomic::{AtomicUsize, Ordering};
112
113use crate::storage::{OwnedStorage, Storage, ViewStorage};
114
115/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
116///
117/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
118/// struct if you want to write code that's generic over both.
119pub struct QueueInner<T, S: Storage> {
120 // this is from where we dequeue items
121 pub(crate) head: AtomicUsize,
122
123 // this is where we enqueue new items
124 pub(crate) tail: AtomicUsize,
125
126 pub(crate) buffer: S::Buffer<UnsafeCell<MaybeUninit<T>>>,
127}
128
129/// A statically allocated single-producer, single-consumer queue with a capacity of `N - 1`
130/// elements.
131///
132/// <div class="warning">
133///
134/// To get better performance, use a value for `N` that is a power of 2.
135///
136/// </div>
137///
138/// You will likely want to use [`split`](QueueInner::split) to create a producer-consumer pair.
139pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
140
141/// A [`Queue`] with dynamic capacity.
142///
143/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by
144/// reference.
145pub type QueueView<T> = QueueInner<T, ViewStorage>;
146
147impl<T, const N: usize> Queue<T, N> {
148 /// Creates an empty queue.
149 pub const fn new() -> Self {
150 const {
151 assert!(N > 1);
152 }
153
154 Queue {
155 head: AtomicUsize::new(0),
156 tail: AtomicUsize::new(0),
157 buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
158 }
159 }
160
161 /// Used in `Storage` implementation
162 pub(crate) fn as_view_private(&self) -> &QueueView<T> {
163 self
164 }
165
166 /// Used in `Storage` implementation
167 pub(crate) fn as_mut_view_private(&mut self) -> &mut QueueView<T> {
168 self
169 }
170}
171
172impl<T, S: Storage> QueueInner<T, S> {
173 /// Get a reference to the `Queue`, erasing the `N` const-generic.
174 pub fn as_view(&self) -> &QueueView<T> {
175 S::as_queue_view(self)
176 }
177
178 /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
179 pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
180 S::as_mut_queue_view(self)
181 }
182
183 #[inline]
184 fn increment(&self, val: usize) -> usize {
185 (val + 1) % self.n()
186 }
187
188 #[inline]
189 fn n(&self) -> usize {
190 self.buffer.borrow().len()
191 }
192
193 /// Returns the maximum number of elements the queue can hold.
194 #[inline]
195 pub fn capacity(&self) -> usize {
196 self.n() - 1
197 }
198
199 /// Returns the number of elements in the queue.
200 #[inline]
201 pub fn len(&self) -> usize {
202 let current_head = self.head.load(Ordering::Relaxed);
203 let current_tail = self.tail.load(Ordering::Relaxed);
204
205 current_tail
206 .wrapping_sub(current_head)
207 .wrapping_add(self.n())
208 % self.n()
209 }
210
211 /// Returns whether the queue is empty.
212 #[inline]
213 pub fn is_empty(&self) -> bool {
214 self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
215 }
216
217 /// Returns whether the queue is full.
218 #[inline]
219 pub fn is_full(&self) -> bool {
220 self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
221 }
222
223 /// Iterates from the front of the queue to the back.
224 pub fn iter(&self) -> Iter<'_, T> {
225 Iter {
226 rb: self.as_view(),
227 index: 0,
228 len: self.len(),
229 }
230 }
231
232 /// Returns an iterator that allows modifying each value.
233 pub fn iter_mut(&mut self) -> IterMut<'_, T> {
234 let len = self.len();
235 IterMut {
236 rb: self.as_view(),
237 index: 0,
238 len,
239 }
240 }
241
242 /// Adds an `item` to the end of the queue.
243 ///
244 /// Returns back the `item` if the queue is full.
245 #[inline]
246 pub fn enqueue(&mut self, item: T) -> Result<(), T> {
247 unsafe { self.inner_enqueue(item) }
248 }
249
250 /// Returns the item in the front of the queue, or `None` if the queue is empty.
251 #[inline]
252 pub fn dequeue(&mut self) -> Option<T> {
253 unsafe { self.inner_dequeue() }
254 }
255
256 /// Returns a reference to the item in the front of the queue without dequeuing it, or
257 /// `None` if the queue is empty.
258 ///
259 /// # Examples
260 /// ```
261 /// use heapless::spsc::Queue;
262 ///
263 /// let mut queue: Queue<u8, 235> = Queue::new();
264 /// let (mut producer, mut consumer) = queue.split();
265 /// assert_eq!(None, consumer.peek());
266 /// producer.enqueue(1);
267 /// assert_eq!(Some(&1), consumer.peek());
268 /// assert_eq!(Some(1), consumer.dequeue());
269 /// assert_eq!(None, consumer.peek());
270 /// ```
271 pub fn peek(&self) -> Option<&T> {
272 if self.is_empty() {
273 None
274 } else {
275 let head = self.head.load(Ordering::Relaxed);
276 Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) })
277 }
278 }
279
280 // The memory for enqueueing is "owned" by the tail pointer.
281 //
282 // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
283 // items without doing pointer arithmetic and accessing internal fields of this type.
284 unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
285 let current_tail = self.tail.load(Ordering::Relaxed);
286 let next_tail = self.increment(current_tail);
287
288 if next_tail == self.head.load(Ordering::Acquire) {
289 Err(val)
290 } else {
291 (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
292 self.tail.store(next_tail, Ordering::Release);
293
294 Ok(())
295 }
296 }
297
298 // The memory for enqueueing is "owned" by the tail pointer.
299 //
300 // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
301 // items without doing pointer arithmetic and accessing internal fields of this type.
302 unsafe fn inner_enqueue_unchecked(&self, val: T) {
303 let current_tail = self.tail.load(Ordering::Relaxed);
304
305 (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
306 self.tail
307 .store(self.increment(current_tail), Ordering::Release);
308 }
309
310 /// Adds an `item` to the end of the queue, without checking if it's full.
311 ///
312 /// # Safety
313 ///
314 /// If the queue is full, this operation will leak a value (`T`'s destructor won't run on
315 /// the value that got overwritten by `item`), *and* will allow the `dequeue` operation
316 /// to create a copy of `item`, which could result in `T`'s destructor running on `item`
317 /// twice.
318 pub unsafe fn enqueue_unchecked(&mut self, item: T) {
319 self.inner_enqueue_unchecked(item);
320 }
321
322 // The memory for dequeuing is "owned" by the head pointer.
323 //
324 // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
325 // items without doing pointer arithmetic and accessing internal fields of this type.
326 unsafe fn inner_dequeue(&self) -> Option<T> {
327 let current_head = self.head.load(Ordering::Relaxed);
328
329 if current_head == self.tail.load(Ordering::Acquire) {
330 None
331 } else {
332 let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
333
334 self.head
335 .store(self.increment(current_head), Ordering::Release);
336
337 Some(v)
338 }
339 }
340
341 // The memory for dequeuing is "owned" by the head pointer.
342 //
343 // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
344 // items without doing pointer arithmetic and accessing internal fields of this type.
345 unsafe fn inner_dequeue_unchecked(&self) -> T {
346 let current_head = self.head.load(Ordering::Relaxed);
347 let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
348
349 self.head
350 .store(self.increment(current_head), Ordering::Release);
351
352 v
353 }
354
355 /// Returns the item in the front of the queue, without checking if there is something in the
356 /// queue.
357 ///
358 /// # Safety
359 ///
360 /// The queue must not be empty. Calling this on an empty queue causes [undefined behavior].
361 ///
362 /// [undefined behavior]: https://doc.rust-lang.org/reference/behavior-considered-undefined.html
363 pub unsafe fn dequeue_unchecked(&mut self) -> T {
364 self.inner_dequeue_unchecked()
365 }
366
367 /// Splits a queue into producer and consumer endpoints.
368 ///
369 /// If you need this function in a `const` context,
370 /// check out [`Queue::split_const`] and [`QueueView::split_const`].
371 ///
372 /// # Examples
373 ///
374 /// Create a queue and split it at runtime
375 ///
376 /// ```
377 /// # use heapless::spsc::Queue;
378 /// let mut queue: Queue<(), 4> = Queue::new();
379 /// let (mut producer, mut consumer) = queue.split();
380 /// producer.enqueue(()).unwrap();
381 /// assert_eq!(consumer.dequeue(), Some(()));
382 /// ```
383 ///
384 /// Create a queue at compile time, split it at runtime,
385 /// and pass it to an interrupt handler via a mutex.
386 ///
387 /// ```
388 /// use core::cell::RefCell;
389 /// use critical_section::Mutex;
390 /// use heapless::spsc::{Producer, Queue};
391 ///
392 /// static PRODUCER: Mutex<RefCell<Option<Producer<'static, ()>>>> =
393 /// { Mutex::new(RefCell::new(None)) };
394 ///
395 /// fn interrupt() {
396 /// let mut producer = {
397 /// static mut P: Option<Producer<'static, ()>> = None;
398 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
399 /// // and `interrupt` cannot be called directly or preempt itself.
400 /// unsafe { &mut P }
401 /// }
402 /// .get_or_insert_with(|| {
403 /// critical_section::with(|cs| PRODUCER.borrow_ref_mut(cs).take().unwrap())
404 /// });
405 ///
406 /// producer.enqueue(()).unwrap();
407 /// }
408 ///
409 /// fn main() {
410 /// let mut consumer = {
411 /// let (p, c) = {
412 /// static mut Q: Queue<(), 4> = Queue::new();
413 /// // SAFETY: `Q` is only accessible in this scope
414 /// // and `main` is only called once.
415 /// #[allow(static_mut_refs)]
416 /// unsafe {
417 /// Q.split()
418 /// }
419 /// };
420 ///
421 /// critical_section::with(move |cs| {
422 /// let mut producer = PRODUCER.borrow_ref_mut(cs);
423 /// *producer = Some(p);
424 /// });
425 ///
426 /// c
427 /// };
428 ///
429 /// // Interrupt occurs.
430 /// # interrupt();
431 ///
432 /// consumer.dequeue().unwrap();
433 /// }
434 /// ```
435 pub fn split(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
436 (
437 Producer { rb: self.as_view() },
438 Consumer { rb: self.as_view() },
439 )
440 }
441}
442
443impl<T, const N: usize> Queue<T, N> {
444 /// Splits a queue into producer and consumer endpoints.
445 ///
446 /// Unlike [`Queue::split`](), this method can be used in a `const` context
447 ///
448 /// # Example
449 ///
450 /// Create and split a queue at compile time, and pass it to the main
451 /// function and an interrupt handler via a mutex at runtime.
452 ///
453 /// ```
454 /// use core::cell::RefCell;
455 ///
456 /// use critical_section::Mutex;
457 /// use heapless::spsc::{Consumer, Producer, Queue};
458 ///
459 /// static PC: (
460 /// Mutex<RefCell<Option<Producer<'_, ()>>>>,
461 /// Mutex<RefCell<Option<Consumer<'_, ()>>>>,
462 /// ) = {
463 /// static mut Q: Queue<(), 4> = Queue::new();
464 /// // SAFETY: `Q` is only accessible in this scope.
465 /// #[allow(static_mut_refs)]
466 /// let (p, c) = unsafe { Q.split_const() };
467 ///
468 /// (
469 /// Mutex::new(RefCell::new(Some(p))),
470 /// Mutex::new(RefCell::new(Some(c))),
471 /// )
472 /// };
473 ///
474 /// fn interrupt() {
475 /// let mut producer = {
476 /// static mut P: Option<Producer<'_, ()>> = None;
477 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
478 /// // and `interrupt` cannot be called directly or preempt itself.
479 /// unsafe { &mut P }
480 /// }
481 /// .get_or_insert_with(|| {
482 /// critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
483 /// });
484 ///
485 /// producer.enqueue(()).unwrap();
486 /// }
487 ///
488 /// fn main() {
489 /// let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
490 ///
491 /// // Interrupt occurs.
492 /// # interrupt();
493 ///
494 /// consumer.dequeue().unwrap();
495 /// }
496 /// ```
497 pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
498 (Producer { rb: self }, Consumer { rb: self })
499 }
500}
501
502impl<T> QueueView<T> {
503 /// Splits a queue into producer and consumer endpoints.
504 ///
505 /// Unlike [`Queue::split`](), this method can be used in a `const` context
506 ///
507 /// # Example
508 ///
509 /// Create and split a queue at compile time, and pass it to the main
510 /// function and an interrupt handler via a mutex at runtime.
511 ///
512 /// ```
513 /// use core::cell::RefCell;
514 ///
515 /// use critical_section::Mutex;
516 /// use heapless::spsc::{Consumer, Producer, Queue, QueueView};
517 ///
518 /// static PC: (
519 /// Mutex<RefCell<Option<Producer<'_, ()>>>>,
520 /// Mutex<RefCell<Option<Consumer<'_, ()>>>>,
521 /// ) = {
522 /// static mut Q: &mut QueueView<()> = &mut Queue::<(), 4>::new();
523 /// // SAFETY: `Q` is only accessible in this scope.
524 /// #[allow(static_mut_refs)]
525 /// let (p, c) = unsafe { Q.split_const() };
526 ///
527 /// (
528 /// Mutex::new(RefCell::new(Some(p))),
529 /// Mutex::new(RefCell::new(Some(c))),
530 /// )
531 /// };
532 ///
533 /// fn interrupt() {
534 /// let mut producer = {
535 /// static mut P: Option<Producer<'_, ()>> = None;
536 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
537 /// // and `interrupt` cannot be called directly or preempt itself.
538 /// unsafe { &mut P }
539 /// }
540 /// .get_or_insert_with(|| {
541 /// critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
542 /// });
543 ///
544 /// producer.enqueue(()).unwrap();
545 /// }
546 ///
547 /// fn main() {
548 /// let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
549 ///
550 /// // Interrupt occurs.
551 /// # interrupt();
552 ///
553 /// consumer.dequeue().unwrap();
554 /// }
555 /// ```
556 pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
557 (Producer { rb: self }, Consumer { rb: self })
558 }
559}
560
561impl<T, const N: usize> Default for Queue<T, N> {
562 fn default() -> Self {
563 Self::new()
564 }
565}
566
567impl<T, const N: usize> Clone for Queue<T, N>
568where
569 T: Clone,
570{
571 fn clone(&self) -> Self {
572 let mut new: Self = Self::new();
573
574 for s in self.iter() {
575 // SAFETY: `new.capacity() == self.capacity() >= self.len()`,
576 // so no overflow is possible.
577 unsafe {
578 new.enqueue_unchecked(s.clone());
579 }
580 }
581
582 new
583 }
584}
585
586impl<T, S, S2> PartialEq<QueueInner<T, S2>> for QueueInner<T, S>
587where
588 T: PartialEq,
589 S: Storage,
590 S2: Storage,
591{
592 fn eq(&self, other: &QueueInner<T, S2>) -> bool {
593 self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
594 }
595}
596
597impl<T, S: Storage> Eq for QueueInner<T, S> where T: Eq {}
598
599/// An iterator over the items of a queue.
600pub struct Iter<'a, T> {
601 rb: &'a QueueView<T>,
602 index: usize,
603 len: usize,
604}
605
606impl<T> Clone for Iter<'_, T> {
607 fn clone(&self) -> Self {
608 Self {
609 rb: self.rb,
610 index: self.index,
611 len: self.len,
612 }
613 }
614}
615
616/// An iterator over the items of a queue.
617pub struct IterMut<'a, T> {
618 rb: &'a QueueView<T>,
619 index: usize,
620 len: usize,
621}
622impl<'a, T> Iterator for Iter<'a, T> {
623 type Item = &'a T;
624
625 fn next(&mut self) -> Option<Self::Item> {
626 if self.index < self.len {
627 let head = self.rb.head.load(Ordering::Relaxed);
628
629 let i = (head + self.index) % self.rb.n();
630 self.index += 1;
631
632 Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
633 } else {
634 None
635 }
636 }
637}
638
639impl<'a, T> Iterator for IterMut<'a, T> {
640 type Item = &'a mut T;
641
642 fn next(&mut self) -> Option<Self::Item> {
643 if self.index < self.len {
644 let head = self.rb.head.load(Ordering::Relaxed);
645
646 let i = (head + self.index) % self.rb.n();
647 self.index += 1;
648
649 Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
650 } else {
651 None
652 }
653 }
654}
655
656impl<T> DoubleEndedIterator for Iter<'_, T> {
657 fn next_back(&mut self) -> Option<Self::Item> {
658 if self.index < self.len {
659 let head = self.rb.head.load(Ordering::Relaxed);
660
661 // self.len > 0, since it's larger than self.index > 0
662 let i = (head + self.len - 1) % self.rb.n();
663 self.len -= 1;
664 Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
665 } else {
666 None
667 }
668 }
669}
670
671impl<T> DoubleEndedIterator for IterMut<'_, T> {
672 fn next_back(&mut self) -> Option<Self::Item> {
673 if self.index < self.len {
674 let head = self.rb.head.load(Ordering::Relaxed);
675
676 // self.len > 0, since it's larger than self.index > 0
677 let i = (head + self.len - 1) % self.rb.n();
678 self.len -= 1;
679 Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
680 } else {
681 None
682 }
683 }
684}
685
686impl<T, S: Storage> Drop for QueueInner<T, S> {
687 fn drop(&mut self) {
688 for item in self {
689 unsafe {
690 ptr::drop_in_place(item);
691 }
692 }
693 }
694}
695
696impl<T, S> fmt::Debug for QueueInner<T, S>
697where
698 T: fmt::Debug,
699 S: Storage,
700{
701 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
702 f.debug_list().entries(self.iter()).finish()
703 }
704}
705
706impl<T, S> hash::Hash for QueueInner<T, S>
707where
708 T: hash::Hash,
709 S: Storage,
710{
711 fn hash<H: hash::Hasher>(&self, state: &mut H) {
712 // iterate over self in order
713 for t in self.iter() {
714 hash::Hash::hash(t, state);
715 }
716 }
717}
718
719impl<'a, T, S: Storage> IntoIterator for &'a QueueInner<T, S> {
720 type Item = &'a T;
721 type IntoIter = Iter<'a, T>;
722
723 fn into_iter(self) -> Self::IntoIter {
724 self.iter()
725 }
726}
727
728impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner<T, S> {
729 type Item = &'a mut T;
730 type IntoIter = IterMut<'a, T>;
731
732 fn into_iter(self) -> Self::IntoIter {
733 self.iter_mut()
734 }
735}
736
737/// A consumer; it can dequeue items from the queue.
738///
739/// **Note:** The consumer semantically owns the `head` pointer of the queue.
740pub struct Consumer<'a, T> {
741 rb: &'a QueueView<T>,
742}
743
744unsafe impl<T> Send for Consumer<'_, T> where T: Send {}
745
746/// A producer; it can enqueue items into the queue.
747///
748/// **Note:** The producer semantically owns the `tail` pointer of the queue.
749pub struct Producer<'a, T> {
750 rb: &'a QueueView<T>,
751}
752
753unsafe impl<T> Send for Producer<'_, T> where T: Send {}
754
755impl<T> Consumer<'_, T> {
756 /// Returns the item in the front of the queue, or `None` if the queue is empty.
757 #[inline]
758 pub fn dequeue(&mut self) -> Option<T> {
759 unsafe { self.rb.inner_dequeue() }
760 }
761
762 /// Returns the item in the front of the queue, without checking if there are elements in the
763 /// queue.
764 ///
765 /// # Safety
766 ///
767 /// See [`Queue::dequeue_unchecked`].
768 #[inline]
769 pub unsafe fn dequeue_unchecked(&mut self) -> T {
770 self.rb.inner_dequeue_unchecked()
771 }
772
773 /// Returns if there are any items to dequeue. When this returns `true`, at least the
774 /// first subsequent dequeue will succeed.
775 #[inline]
776 pub fn ready(&self) -> bool {
777 !self.rb.is_empty()
778 }
779
780 /// Returns the number of elements in the queue.
781 #[inline]
782 pub fn len(&self) -> usize {
783 self.rb.len()
784 }
785
786 /// Returns whether the queue is empty.
787 ///
788 /// # Examples
789 ///
790 /// ```
791 /// use heapless::spsc::Queue;
792 ///
793 /// let mut queue: Queue<u8, 235> = Queue::new();
794 /// let (mut producer, mut consumer) = queue.split();
795 /// assert!(consumer.is_empty());
796 /// ```
797 #[inline]
798 pub fn is_empty(&self) -> bool {
799 self.len() == 0
800 }
801
802 /// Returns the maximum number of elements the queue can hold.
803 #[inline]
804 pub fn capacity(&self) -> usize {
805 self.rb.capacity()
806 }
807
808 /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is
809 /// empty.
810 ///
811 /// # Examples
812 ///
813 /// ```
814 /// use heapless::spsc::Queue;
815 ///
816 /// let mut queue: Queue<u8, 235> = Queue::new();
817 /// let (mut producer, mut consumer) = queue.split();
818 /// assert_eq!(None, consumer.peek());
819 /// producer.enqueue(1);
820 /// assert_eq!(Some(&1), consumer.peek());
821 /// assert_eq!(Some(1), consumer.dequeue());
822 /// assert_eq!(None, consumer.peek());
823 /// ```
824 #[inline]
825 pub fn peek(&self) -> Option<&T> {
826 self.rb.peek()
827 }
828}
829
830impl<T> Producer<'_, T> {
831 /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full.
832 #[inline]
833 pub fn enqueue(&mut self, item: T) -> Result<(), T> {
834 unsafe { self.rb.inner_enqueue(item) }
835 }
836
837 /// Adds an `item` to the end of the queue, without checking if the queue is full.
838 ///
839 /// # Safety
840 ///
841 /// See [`Queue::enqueue_unchecked`].
842 #[inline]
843 pub unsafe fn enqueue_unchecked(&mut self, item: T) {
844 self.rb.inner_enqueue_unchecked(item);
845 }
846
847 /// Returns if there is any space to enqueue a new item. When this returns true, at
848 /// least the first subsequent enqueue will succeed.
849 #[inline]
850 pub fn ready(&self) -> bool {
851 !self.rb.is_full()
852 }
853
854 /// Returns the number of elements in the queue.
855 #[inline]
856 pub fn len(&self) -> usize {
857 self.rb.len()
858 }
859
860 /// Returns whether the queue is empty.
861 ///
862 /// # Examples
863 ///
864 /// ```
865 /// use heapless::spsc::Queue;
866 ///
867 /// let mut queue: Queue<u8, 235> = Queue::new();
868 /// let (mut producer, mut consumer) = queue.split();
869 /// assert!(producer.is_empty());
870 /// ```
871 #[inline]
872 pub fn is_empty(&self) -> bool {
873 self.len() == 0
874 }
875
876 /// Returns the maximum number of elements the queue can hold.
877 #[inline]
878 pub fn capacity(&self) -> usize {
879 self.rb.capacity()
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use std::hash::{Hash, Hasher};
886
887 use super::{Consumer, Producer, Queue};
888
889 use static_assertions::assert_not_impl_any;
890
891 // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
892 assert_not_impl_any!(Queue<*const (), 4>: Send);
893
894 // Ensure a `Producer` containing `!Send` values stays `!Send` itself.
895 assert_not_impl_any!(Producer<*const ()>: Send);
896
897 // Ensure a `Consumer` containing `!Send` values stays `!Send` itself.
898 assert_not_impl_any!(Consumer<*const ()>: Send);
899
900 #[test]
901 fn const_split() {
902 use critical_section::Mutex;
903 use std::cell::RefCell;
904
905 use super::{Consumer, Producer};
906
907 #[allow(clippy::type_complexity)]
908 static PC: (
909 Mutex<RefCell<Option<Producer<'_, ()>>>>,
910 Mutex<RefCell<Option<Consumer<'_, ()>>>>,
911 ) = {
912 static mut Q: Queue<(), 4> = Queue::new();
913 // SAFETY: `Q` is only accessible in this scope.
914 #[allow(static_mut_refs)]
915 let (p, c) = unsafe { Q.split_const() };
916
917 (
918 Mutex::new(RefCell::new(Some(p))),
919 Mutex::new(RefCell::new(Some(c))),
920 )
921 };
922 let producer = critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap());
923 let consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
924
925 let mut producer: Producer<'static, ()> = producer;
926 let mut consumer: Consumer<'static, ()> = consumer;
927
928 assert_eq!(producer.enqueue(()), Ok(()));
929 assert_eq!(consumer.dequeue(), Some(()));
930 }
931
932 #[test]
933 fn full() {
934 let mut rb: Queue<i32, 3> = Queue::new();
935
936 assert!(!rb.is_full());
937
938 rb.enqueue(1).unwrap();
939 assert!(!rb.is_full());
940
941 rb.enqueue(2).unwrap();
942 assert!(rb.is_full());
943 }
944
945 #[test]
946 fn empty() {
947 let mut rb: Queue<i32, 3> = Queue::new();
948
949 assert!(rb.is_empty());
950
951 rb.enqueue(1).unwrap();
952 assert!(!rb.is_empty());
953
954 rb.enqueue(2).unwrap();
955 assert!(!rb.is_empty());
956 }
957
958 #[test]
959 #[cfg_attr(miri, ignore)] // too slow
960 fn len() {
961 let mut rb: Queue<i32, 3> = Queue::new();
962
963 assert_eq!(rb.len(), 0);
964
965 rb.enqueue(1).unwrap();
966 assert_eq!(rb.len(), 1);
967
968 rb.enqueue(2).unwrap();
969 assert_eq!(rb.len(), 2);
970
971 for _ in 0..1_000_000 {
972 let v = rb.dequeue().unwrap();
973 println!("{v}");
974 rb.enqueue(v).unwrap();
975 assert_eq!(rb.len(), 2);
976 }
977 }
978
979 #[test]
980 #[cfg_attr(miri, ignore)] // too slow
981 fn try_overflow() {
982 const N: usize = 23;
983 let mut rb: Queue<i32, N> = Queue::new();
984
985 for i in 0..N as i32 - 1 {
986 rb.enqueue(i).unwrap();
987 }
988
989 for _ in 0..1_000_000 {
990 for i in 0..N as i32 - 1 {
991 let d = rb.dequeue().unwrap();
992 assert_eq!(d, i);
993 rb.enqueue(i).unwrap();
994 }
995 }
996 }
997
998 #[test]
999 fn sanity() {
1000 let mut rb: Queue<i32, 10> = Queue::new();
1001
1002 let (mut p, mut c) = rb.split();
1003
1004 assert!(p.ready());
1005
1006 assert!(!c.ready());
1007
1008 assert_eq!(c.dequeue(), None);
1009
1010 p.enqueue(0).unwrap();
1011
1012 assert_eq!(c.dequeue(), Some(0));
1013 }
1014
1015 #[test]
1016 fn static_new() {
1017 static mut _Q: Queue<i32, 4> = Queue::new();
1018 }
1019
1020 #[test]
1021 fn drop() {
1022 struct Droppable;
1023 impl Droppable {
1024 fn new() -> Self {
1025 unsafe {
1026 COUNT += 1;
1027 }
1028 Self
1029 }
1030 }
1031
1032 impl Drop for Droppable {
1033 fn drop(&mut self) {
1034 unsafe {
1035 COUNT -= 1;
1036 }
1037 }
1038 }
1039
1040 static mut COUNT: i32 = 0;
1041
1042 {
1043 let mut v: Queue<Droppable, 4> = Queue::new();
1044 v.enqueue(Droppable::new()).ok().unwrap();
1045 v.enqueue(Droppable::new()).ok().unwrap();
1046 v.dequeue().unwrap();
1047 }
1048
1049 assert_eq!(unsafe { COUNT }, 0);
1050
1051 {
1052 let mut v: Queue<Droppable, 4> = Queue::new();
1053 v.enqueue(Droppable::new()).ok().unwrap();
1054 v.enqueue(Droppable::new()).ok().unwrap();
1055 }
1056
1057 assert_eq!(unsafe { COUNT }, 0);
1058 }
1059
1060 #[test]
1061 fn iter() {
1062 let mut rb: Queue<i32, 4> = Queue::new();
1063
1064 rb.enqueue(0).unwrap();
1065 rb.dequeue().unwrap();
1066 rb.enqueue(1).unwrap();
1067 rb.enqueue(2).unwrap();
1068 rb.enqueue(3).unwrap();
1069
1070 let mut items = rb.iter();
1071
1072 // assert_eq!(items.next(), Some(&0));
1073 assert_eq!(items.next(), Some(&1));
1074 assert_eq!(items.next(), Some(&2));
1075 assert_eq!(items.next(), Some(&3));
1076 assert_eq!(items.next(), None);
1077 }
1078
1079 #[test]
1080 fn iter_double_ended() {
1081 let mut rb: Queue<i32, 4> = Queue::new();
1082
1083 rb.enqueue(0).unwrap();
1084 rb.enqueue(1).unwrap();
1085 rb.enqueue(2).unwrap();
1086
1087 let mut items = rb.iter();
1088
1089 assert_eq!(items.next(), Some(&0));
1090 assert_eq!(items.next_back(), Some(&2));
1091 assert_eq!(items.next(), Some(&1));
1092 assert_eq!(items.next(), None);
1093 assert_eq!(items.next_back(), None);
1094 }
1095
1096 #[test]
1097 fn iter_mut() {
1098 let mut rb: Queue<i32, 4> = Queue::new();
1099
1100 rb.enqueue(0).unwrap();
1101 rb.enqueue(1).unwrap();
1102 rb.enqueue(2).unwrap();
1103
1104 let mut items = rb.iter_mut();
1105
1106 assert_eq!(items.next(), Some(&mut 0));
1107 assert_eq!(items.next(), Some(&mut 1));
1108 assert_eq!(items.next(), Some(&mut 2));
1109 assert_eq!(items.next(), None);
1110 }
1111
1112 #[test]
1113 fn iter_mut_double_ended() {
1114 let mut rb: Queue<i32, 4> = Queue::new();
1115
1116 rb.enqueue(0).unwrap();
1117 rb.enqueue(1).unwrap();
1118 rb.enqueue(2).unwrap();
1119
1120 let mut items = rb.iter_mut();
1121
1122 assert_eq!(items.next(), Some(&mut 0));
1123 assert_eq!(items.next_back(), Some(&mut 2));
1124 assert_eq!(items.next(), Some(&mut 1));
1125 assert_eq!(items.next(), None);
1126 assert_eq!(items.next_back(), None);
1127 }
1128
1129 #[test]
1130 fn wrap_around() {
1131 let mut rb: Queue<i32, 4> = Queue::new();
1132
1133 rb.enqueue(0).unwrap();
1134 rb.enqueue(1).unwrap();
1135 rb.enqueue(2).unwrap();
1136 rb.dequeue().unwrap();
1137 rb.dequeue().unwrap();
1138 rb.dequeue().unwrap();
1139 rb.enqueue(3).unwrap();
1140 rb.enqueue(4).unwrap();
1141
1142 assert_eq!(rb.len(), 2);
1143 }
1144
1145 #[test]
1146 fn ready_flag() {
1147 let mut rb: Queue<i32, 3> = Queue::new();
1148 let (mut p, mut c) = rb.split();
1149 assert!(!c.ready());
1150 assert!(p.ready());
1151
1152 p.enqueue(0).unwrap();
1153
1154 assert!(c.ready());
1155 assert!(p.ready());
1156
1157 p.enqueue(1).unwrap();
1158
1159 assert!(c.ready());
1160 assert!(!p.ready());
1161
1162 c.dequeue().unwrap();
1163
1164 assert!(c.ready());
1165 assert!(p.ready());
1166
1167 c.dequeue().unwrap();
1168
1169 assert!(!c.ready());
1170 assert!(p.ready());
1171 }
1172
1173 #[test]
1174 fn clone() {
1175 let mut rb1: Queue<i32, 4> = Queue::new();
1176 rb1.enqueue(0).unwrap();
1177 rb1.enqueue(0).unwrap();
1178 rb1.dequeue().unwrap();
1179 rb1.enqueue(0).unwrap();
1180 let rb2 = rb1.clone();
1181 assert_eq!(rb1.capacity(), rb2.capacity());
1182 assert_eq!(rb1.len(), rb2.len());
1183 assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
1184 }
1185
1186 #[test]
1187 fn eq() {
1188 // generate two queues with same content
1189 // but different buffer alignment
1190 let mut rb1: Queue<i32, 4> = Queue::new();
1191 rb1.enqueue(0).unwrap();
1192 rb1.enqueue(0).unwrap();
1193 rb1.dequeue().unwrap();
1194 rb1.enqueue(0).unwrap();
1195 let mut rb2: Queue<i32, 4> = Queue::new();
1196 rb2.enqueue(0).unwrap();
1197 rb2.enqueue(0).unwrap();
1198 assert!(rb1 == rb2);
1199 // test for symmetry
1200 assert!(rb2 == rb1);
1201 // test for changes in content
1202 rb1.enqueue(0).unwrap();
1203 assert!(rb1 != rb2);
1204 rb2.enqueue(1).unwrap();
1205 assert!(rb1 != rb2);
1206 // test for refexive relation
1207 assert!(rb1 == rb1);
1208 assert!(rb2 == rb2);
1209 }
1210
1211 #[test]
1212 fn hash_equality() {
1213 // generate two queues with same content
1214 // but different buffer alignment
1215 let rb1 = {
1216 let mut rb1: Queue<i32, 4> = Queue::new();
1217 rb1.enqueue(0).unwrap();
1218 rb1.enqueue(0).unwrap();
1219 rb1.dequeue().unwrap();
1220 rb1.enqueue(0).unwrap();
1221 rb1
1222 };
1223 let rb2 = {
1224 let mut rb2: Queue<i32, 4> = Queue::new();
1225 rb2.enqueue(0).unwrap();
1226 rb2.enqueue(0).unwrap();
1227 rb2
1228 };
1229 let hash1 = {
1230 let mut hasher1 = hash32::FnvHasher::default();
1231 rb1.hash(&mut hasher1);
1232 hasher1.finish()
1233 };
1234 let hash2 = {
1235 let mut hasher2 = hash32::FnvHasher::default();
1236 rb2.hash(&mut hasher2);
1237 hasher2.finish()
1238 };
1239 assert_eq!(hash1, hash2);
1240 }
1241}