heapless/
spsc.rs

1//! A fixed capacity single-producer, single-consumer (SPSC) lock-free queue.
2//!
3//! *Note:* This module requires atomic load and store instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Examples
8//!
9//! [`Queue`] can be used as a plain queue:
10//!
11//! ```
12//! use heapless::spsc::Queue;
13//!
14//! let mut queue: Queue<u8, 4> = Queue::new();
15//!
16//! assert_eq!(queue.enqueue(0), Ok(()));
17//! assert_eq!(queue.enqueue(1), Ok(()));
18//! assert_eq!(queue.enqueue(2), Ok(()));
19//! assert_eq!(queue.enqueue(3), Err(3)); // Queue is full.
20//!
21//! assert_eq!(queue.dequeue(), Some(0));
22//! ```
23//!
24//! [`Queue::split`] can be used to split the queue into a [`Producer`]/[`Consumer`] pair.
25//!
26//! After splitting a `&'static mut Queue`, the resulting [`Producer`] and [`Consumer`]
27//! can be moved into different execution contexts, e.g. threads, interrupt handlers, etc.
28//!
29//!
30//! ```
31//! use heapless::spsc::{Producer, Queue};
32//!
33//! #[derive(Debug)]
34//! enum Event {
35//!     A,
36//!     B,
37//! }
38//!
39//! fn main() {
40//!     let queue: &'static mut Queue<Event, 4> = {
41//!         static mut Q: Queue<Event, 4> = Queue::new();
42//!         // SAFETY: `Q` is only accessible in this scope
43//!         // and `main` is only called once.
44//!         unsafe { &mut Q }
45//!     };
46//!
47//!     let (producer, mut consumer) = queue.split();
48//!
49//!     // `producer` can be moved into `interrupt_handler` using a static mutex or the mechanism
50//!     // provided by the concurrency framework you are using, e.g. a resource in RTIC.
51//! #   let mut producer = producer;
52//! #   interrupt_handler(&mut producer);
53//!
54//!     loop {
55//!         match consumer.dequeue() {
56//!             Some(Event::A) => { /* .. */ }
57//!             Some(Event::B) => { /* .. */ }
58//!             None => { /* Sleep. */ }
59//!         }
60//! #       break
61//!     }
62//! }
63//!
64//! // This is a different execution context that can preempt `main`.
65//! fn interrupt_handler(producer: &mut Producer<'static, Event>) {
66//! #   let condition = true;
67//!
68//!     // ..
69//!
70//!     if condition {
71//!         producer.enqueue(Event::A).unwrap();
72//!     } else {
73//!         producer.enqueue(Event::B).unwrap();
74//!     }
75//!
76//!     // ..
77//! }
78//! ```
79//!
80//! # Benchmarks
81//!
82//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled
83//! with `-C opt-level=3`:
84//!
85//! | Method                         | Time |
86//! |:-------------------------------|-----:|
87//! | `Producer::<u8, _>::enqueue()` |   16 |
88//! | `Queue::<u8, _>::enqueue()`    |   14 |
89//! | `Consumer::<u8, _>::dequeue()` |   15 |
90//! | `Queue::<u8, _>::dequeue()`    |   12 |
91//!
92//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
93//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
94//!   `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
95//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some` and
96//!   `enqueue` returning `Ok`.
97//!
98//! # References
99//!
100//! This is an implementation based on [https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular](
101//!   https://web.archive.org/web/20250117082625/https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular
102//! ).
103
104use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
105
106#[cfg(not(feature = "portable-atomic"))]
107use core::sync::atomic;
108#[cfg(feature = "portable-atomic")]
109use portable_atomic as atomic;
110
111use atomic::{AtomicUsize, Ordering};
112
113use crate::storage::{OwnedStorage, Storage, ViewStorage};
114
115/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
116///
117/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
118/// struct if you want to write code that's generic over both.
119pub struct QueueInner<T, S: Storage> {
120    // this is from where we dequeue items
121    pub(crate) head: AtomicUsize,
122
123    // this is where we enqueue new items
124    pub(crate) tail: AtomicUsize,
125
126    pub(crate) buffer: S::Buffer<UnsafeCell<MaybeUninit<T>>>,
127}
128
129/// A statically allocated single-producer, single-consumer queue with a capacity of `N - 1`
130/// elements.
131///
132/// <div class="warning">
133///
134/// To get better performance, use a value for `N` that is a power of 2.
135///
136/// </div>
137///
138/// You will likely want to use [`split`](QueueInner::split) to create a producer-consumer pair.
139pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
140
141/// A [`Queue`] with dynamic capacity.
142///
143/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by
144/// reference.
145pub type QueueView<T> = QueueInner<T, ViewStorage>;
146
147impl<T, const N: usize> Queue<T, N> {
148    /// Creates an empty queue.
149    pub const fn new() -> Self {
150        const {
151            assert!(N > 1);
152        }
153
154        Queue {
155            head: AtomicUsize::new(0),
156            tail: AtomicUsize::new(0),
157            buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
158        }
159    }
160
161    /// Used in `Storage` implementation
162    pub(crate) fn as_view_private(&self) -> &QueueView<T> {
163        self
164    }
165
166    /// Used in `Storage` implementation
167    pub(crate) fn as_mut_view_private(&mut self) -> &mut QueueView<T> {
168        self
169    }
170}
171
172impl<T, S: Storage> QueueInner<T, S> {
173    /// Get a reference to the `Queue`, erasing the `N` const-generic.
174    pub fn as_view(&self) -> &QueueView<T> {
175        S::as_queue_view(self)
176    }
177
178    /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
179    pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
180        S::as_mut_queue_view(self)
181    }
182
183    #[inline]
184    fn increment(&self, val: usize) -> usize {
185        (val + 1) % self.n()
186    }
187
188    #[inline]
189    fn n(&self) -> usize {
190        self.buffer.borrow().len()
191    }
192
193    /// Returns the maximum number of elements the queue can hold.
194    #[inline]
195    pub fn capacity(&self) -> usize {
196        self.n() - 1
197    }
198
199    /// Returns the number of elements in the queue.
200    #[inline]
201    pub fn len(&self) -> usize {
202        let current_head = self.head.load(Ordering::Relaxed);
203        let current_tail = self.tail.load(Ordering::Relaxed);
204
205        current_tail
206            .wrapping_sub(current_head)
207            .wrapping_add(self.n())
208            % self.n()
209    }
210
211    /// Returns whether the queue is empty.
212    #[inline]
213    pub fn is_empty(&self) -> bool {
214        self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
215    }
216
217    /// Returns whether the queue is full.
218    #[inline]
219    pub fn is_full(&self) -> bool {
220        self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
221    }
222
223    /// Iterates from the front of the queue to the back.
224    pub fn iter(&self) -> Iter<'_, T> {
225        Iter {
226            rb: self.as_view(),
227            index: 0,
228            len: self.len(),
229        }
230    }
231
232    /// Returns an iterator that allows modifying each value.
233    pub fn iter_mut(&mut self) -> IterMut<'_, T> {
234        let len = self.len();
235        IterMut {
236            rb: self.as_view(),
237            index: 0,
238            len,
239        }
240    }
241
242    /// Adds an `item` to the end of the queue.
243    ///
244    /// Returns back the `item` if the queue is full.
245    #[inline]
246    pub fn enqueue(&mut self, item: T) -> Result<(), T> {
247        unsafe { self.inner_enqueue(item) }
248    }
249
250    /// Returns the item in the front of the queue, or `None` if the queue is empty.
251    #[inline]
252    pub fn dequeue(&mut self) -> Option<T> {
253        unsafe { self.inner_dequeue() }
254    }
255
256    /// Returns a reference to the item in the front of the queue without dequeuing it, or
257    /// `None` if the queue is empty.
258    ///
259    /// # Examples
260    /// ```
261    /// use heapless::spsc::Queue;
262    ///
263    /// let mut queue: Queue<u8, 235> = Queue::new();
264    /// let (mut producer, mut consumer) = queue.split();
265    /// assert_eq!(None, consumer.peek());
266    /// producer.enqueue(1);
267    /// assert_eq!(Some(&1), consumer.peek());
268    /// assert_eq!(Some(1), consumer.dequeue());
269    /// assert_eq!(None, consumer.peek());
270    /// ```
271    pub fn peek(&self) -> Option<&T> {
272        if self.is_empty() {
273            None
274        } else {
275            let head = self.head.load(Ordering::Relaxed);
276            Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) })
277        }
278    }
279
280    // The memory for enqueueing is "owned" by the tail pointer.
281    //
282    // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
283    // items without doing pointer arithmetic and accessing internal fields of this type.
284    unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
285        let current_tail = self.tail.load(Ordering::Relaxed);
286        let next_tail = self.increment(current_tail);
287
288        if next_tail == self.head.load(Ordering::Acquire) {
289            Err(val)
290        } else {
291            (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
292            self.tail.store(next_tail, Ordering::Release);
293
294            Ok(())
295        }
296    }
297
298    // The memory for enqueueing is "owned" by the tail pointer.
299    //
300    // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
301    // items without doing pointer arithmetic and accessing internal fields of this type.
302    unsafe fn inner_enqueue_unchecked(&self, val: T) {
303        let current_tail = self.tail.load(Ordering::Relaxed);
304
305        (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
306        self.tail
307            .store(self.increment(current_tail), Ordering::Release);
308    }
309
310    /// Adds an `item` to the end of the queue, without checking if it's full.
311    ///
312    /// # Safety
313    ///
314    /// If the queue is full, this operation will leak a value (`T`'s destructor won't run on
315    /// the value that got overwritten by `item`), *and* will allow the `dequeue` operation
316    /// to create a copy of `item`, which could result in `T`'s destructor running on `item`
317    /// twice.
318    pub unsafe fn enqueue_unchecked(&mut self, item: T) {
319        self.inner_enqueue_unchecked(item);
320    }
321
322    // The memory for dequeuing is "owned" by the head pointer.
323    //
324    // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
325    // items without doing pointer arithmetic and accessing internal fields of this type.
326    unsafe fn inner_dequeue(&self) -> Option<T> {
327        let current_head = self.head.load(Ordering::Relaxed);
328
329        if current_head == self.tail.load(Ordering::Acquire) {
330            None
331        } else {
332            let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
333
334            self.head
335                .store(self.increment(current_head), Ordering::Release);
336
337            Some(v)
338        }
339    }
340
341    // The memory for dequeuing is "owned" by the head pointer.
342    //
343    // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
344    // items without doing pointer arithmetic and accessing internal fields of this type.
345    unsafe fn inner_dequeue_unchecked(&self) -> T {
346        let current_head = self.head.load(Ordering::Relaxed);
347        let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
348
349        self.head
350            .store(self.increment(current_head), Ordering::Release);
351
352        v
353    }
354
355    /// Returns the item in the front of the queue, without checking if there is something in the
356    /// queue.
357    ///
358    /// # Safety
359    ///
360    /// The queue must not be empty. Calling this on an empty queue causes [undefined behavior].
361    ///
362    /// [undefined behavior]: https://doc.rust-lang.org/reference/behavior-considered-undefined.html
363    pub unsafe fn dequeue_unchecked(&mut self) -> T {
364        self.inner_dequeue_unchecked()
365    }
366
367    /// Splits a queue into producer and consumer endpoints.
368    ///
369    /// If you need this function in a `const` context,
370    /// check out [`Queue::split_const`] and [`QueueView::split_const`].
371    ///
372    /// # Examples
373    ///
374    /// Create a queue and split it at runtime
375    ///
376    /// ```
377    /// # use heapless::spsc::Queue;
378    /// let mut queue: Queue<(), 4> = Queue::new();
379    /// let (mut producer, mut consumer) = queue.split();
380    /// producer.enqueue(()).unwrap();
381    /// assert_eq!(consumer.dequeue(), Some(()));
382    /// ```
383    ///
384    /// Create a queue at compile time, split it at runtime,
385    /// and pass it to an interrupt handler via a mutex.
386    ///
387    /// ```
388    /// use core::cell::RefCell;
389    /// use critical_section::Mutex;
390    /// use heapless::spsc::{Producer, Queue};
391    ///
392    /// static PRODUCER: Mutex<RefCell<Option<Producer<'static, ()>>>> =
393    ///     { Mutex::new(RefCell::new(None)) };
394    ///
395    /// fn interrupt() {
396    ///     let mut producer = {
397    ///         static mut P: Option<Producer<'static, ()>> = None;
398    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
399    ///         // and `interrupt` cannot be called directly or preempt itself.
400    ///         unsafe { &mut P }
401    ///     }
402    ///     .get_or_insert_with(|| {
403    ///         critical_section::with(|cs| PRODUCER.borrow_ref_mut(cs).take().unwrap())
404    ///     });
405    ///
406    ///     producer.enqueue(()).unwrap();
407    /// }
408    ///
409    /// fn main() {
410    ///     let mut consumer = {
411    ///         let (p, c) = {
412    ///             static mut Q: Queue<(), 4> = Queue::new();
413    ///             // SAFETY: `Q` is only accessible in this scope
414    ///             // and `main` is only called once.
415    ///             #[allow(static_mut_refs)]
416    ///             unsafe {
417    ///                 Q.split()
418    ///             }
419    ///         };
420    ///
421    ///         critical_section::with(move |cs| {
422    ///             let mut producer = PRODUCER.borrow_ref_mut(cs);
423    ///             *producer = Some(p);
424    ///         });
425    ///
426    ///         c
427    ///     };
428    ///
429    ///     // Interrupt occurs.
430    /// #   interrupt();
431    ///
432    ///     consumer.dequeue().unwrap();
433    /// }
434    /// ```
435    pub fn split(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
436        (
437            Producer { rb: self.as_view() },
438            Consumer { rb: self.as_view() },
439        )
440    }
441}
442
443impl<T, const N: usize> Queue<T, N> {
444    /// Splits a queue into producer and consumer endpoints.
445    ///
446    /// Unlike [`Queue::split`](), this method can be used in a `const` context
447    ///
448    /// # Example
449    ///
450    /// Create and split a queue at compile time, and pass it to the main
451    /// function and an interrupt handler via a mutex at runtime.
452    ///
453    /// ```
454    /// use core::cell::RefCell;
455    ///
456    /// use critical_section::Mutex;
457    /// use heapless::spsc::{Consumer, Producer, Queue};
458    ///
459    /// static PC: (
460    ///     Mutex<RefCell<Option<Producer<'_, ()>>>>,
461    ///     Mutex<RefCell<Option<Consumer<'_, ()>>>>,
462    /// ) = {
463    ///     static mut Q: Queue<(), 4> = Queue::new();
464    ///     // SAFETY: `Q` is only accessible in this scope.
465    ///     #[allow(static_mut_refs)]
466    ///     let (p, c) = unsafe { Q.split_const() };
467    ///
468    ///     (
469    ///         Mutex::new(RefCell::new(Some(p))),
470    ///         Mutex::new(RefCell::new(Some(c))),
471    ///     )
472    /// };
473    ///
474    /// fn interrupt() {
475    ///     let mut producer = {
476    ///         static mut P: Option<Producer<'_, ()>> = None;
477    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
478    ///         // and `interrupt` cannot be called directly or preempt itself.
479    ///         unsafe { &mut P }
480    ///     }
481    ///     .get_or_insert_with(|| {
482    ///         critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
483    ///     });
484    ///
485    ///     producer.enqueue(()).unwrap();
486    /// }
487    ///
488    /// fn main() {
489    ///     let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
490    ///
491    ///     // Interrupt occurs.
492    /// #   interrupt();
493    ///
494    ///     consumer.dequeue().unwrap();
495    /// }
496    /// ```
497    pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
498        (Producer { rb: self }, Consumer { rb: self })
499    }
500}
501
502impl<T> QueueView<T> {
503    /// Splits a queue into producer and consumer endpoints.
504    ///
505    /// Unlike [`Queue::split`](), this method can be used in a `const` context
506    ///
507    /// # Example
508    ///
509    /// Create and split a queue at compile time, and pass it to the main
510    /// function and an interrupt handler via a mutex at runtime.
511    ///
512    /// ```
513    /// use core::cell::RefCell;
514    ///
515    /// use critical_section::Mutex;
516    /// use heapless::spsc::{Consumer, Producer, Queue, QueueView};
517    ///
518    /// static PC: (
519    ///     Mutex<RefCell<Option<Producer<'_, ()>>>>,
520    ///     Mutex<RefCell<Option<Consumer<'_, ()>>>>,
521    /// ) = {
522    ///     static mut Q: &mut QueueView<()> = &mut Queue::<(), 4>::new();
523    ///     // SAFETY: `Q` is only accessible in this scope.
524    ///     #[allow(static_mut_refs)]
525    ///     let (p, c) = unsafe { Q.split_const() };
526    ///
527    ///     (
528    ///         Mutex::new(RefCell::new(Some(p))),
529    ///         Mutex::new(RefCell::new(Some(c))),
530    ///     )
531    /// };
532    ///
533    /// fn interrupt() {
534    ///     let mut producer = {
535    ///         static mut P: Option<Producer<'_, ()>> = None;
536    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
537    ///         // and `interrupt` cannot be called directly or preempt itself.
538    ///         unsafe { &mut P }
539    ///     }
540    ///     .get_or_insert_with(|| {
541    ///         critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
542    ///     });
543    ///
544    ///     producer.enqueue(()).unwrap();
545    /// }
546    ///
547    /// fn main() {
548    ///     let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
549    ///
550    ///     // Interrupt occurs.
551    /// #   interrupt();
552    ///
553    ///     consumer.dequeue().unwrap();
554    /// }
555    /// ```
556    pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
557        (Producer { rb: self }, Consumer { rb: self })
558    }
559}
560
561impl<T, const N: usize> Default for Queue<T, N> {
562    fn default() -> Self {
563        Self::new()
564    }
565}
566
567impl<T, const N: usize> Clone for Queue<T, N>
568where
569    T: Clone,
570{
571    fn clone(&self) -> Self {
572        let mut new: Self = Self::new();
573
574        for s in self.iter() {
575            // SAFETY: `new.capacity() == self.capacity() >= self.len()`,
576            // so no overflow is possible.
577            unsafe {
578                new.enqueue_unchecked(s.clone());
579            }
580        }
581
582        new
583    }
584}
585
586impl<T, S, S2> PartialEq<QueueInner<T, S2>> for QueueInner<T, S>
587where
588    T: PartialEq,
589    S: Storage,
590    S2: Storage,
591{
592    fn eq(&self, other: &QueueInner<T, S2>) -> bool {
593        self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
594    }
595}
596
597impl<T, S: Storage> Eq for QueueInner<T, S> where T: Eq {}
598
599/// An iterator over the items of a queue.
600pub struct Iter<'a, T> {
601    rb: &'a QueueView<T>,
602    index: usize,
603    len: usize,
604}
605
606impl<T> Clone for Iter<'_, T> {
607    fn clone(&self) -> Self {
608        Self {
609            rb: self.rb,
610            index: self.index,
611            len: self.len,
612        }
613    }
614}
615
616/// An iterator over the items of a queue.
617pub struct IterMut<'a, T> {
618    rb: &'a QueueView<T>,
619    index: usize,
620    len: usize,
621}
622impl<'a, T> Iterator for Iter<'a, T> {
623    type Item = &'a T;
624
625    fn next(&mut self) -> Option<Self::Item> {
626        if self.index < self.len {
627            let head = self.rb.head.load(Ordering::Relaxed);
628
629            let i = (head + self.index) % self.rb.n();
630            self.index += 1;
631
632            Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
633        } else {
634            None
635        }
636    }
637}
638
639impl<'a, T> Iterator for IterMut<'a, T> {
640    type Item = &'a mut T;
641
642    fn next(&mut self) -> Option<Self::Item> {
643        if self.index < self.len {
644            let head = self.rb.head.load(Ordering::Relaxed);
645
646            let i = (head + self.index) % self.rb.n();
647            self.index += 1;
648
649            Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
650        } else {
651            None
652        }
653    }
654}
655
656impl<T> DoubleEndedIterator for Iter<'_, T> {
657    fn next_back(&mut self) -> Option<Self::Item> {
658        if self.index < self.len {
659            let head = self.rb.head.load(Ordering::Relaxed);
660
661            // self.len > 0, since it's larger than self.index > 0
662            let i = (head + self.len - 1) % self.rb.n();
663            self.len -= 1;
664            Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
665        } else {
666            None
667        }
668    }
669}
670
671impl<T> DoubleEndedIterator for IterMut<'_, T> {
672    fn next_back(&mut self) -> Option<Self::Item> {
673        if self.index < self.len {
674            let head = self.rb.head.load(Ordering::Relaxed);
675
676            // self.len > 0, since it's larger than self.index > 0
677            let i = (head + self.len - 1) % self.rb.n();
678            self.len -= 1;
679            Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
680        } else {
681            None
682        }
683    }
684}
685
686impl<T, S: Storage> Drop for QueueInner<T, S> {
687    fn drop(&mut self) {
688        for item in self {
689            unsafe {
690                ptr::drop_in_place(item);
691            }
692        }
693    }
694}
695
696impl<T, S> fmt::Debug for QueueInner<T, S>
697where
698    T: fmt::Debug,
699    S: Storage,
700{
701    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
702        f.debug_list().entries(self.iter()).finish()
703    }
704}
705
706impl<T, S> hash::Hash for QueueInner<T, S>
707where
708    T: hash::Hash,
709    S: Storage,
710{
711    fn hash<H: hash::Hasher>(&self, state: &mut H) {
712        // iterate over self in order
713        for t in self.iter() {
714            hash::Hash::hash(t, state);
715        }
716    }
717}
718
719impl<'a, T, S: Storage> IntoIterator for &'a QueueInner<T, S> {
720    type Item = &'a T;
721    type IntoIter = Iter<'a, T>;
722
723    fn into_iter(self) -> Self::IntoIter {
724        self.iter()
725    }
726}
727
728impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner<T, S> {
729    type Item = &'a mut T;
730    type IntoIter = IterMut<'a, T>;
731
732    fn into_iter(self) -> Self::IntoIter {
733        self.iter_mut()
734    }
735}
736
737/// A consumer; it can dequeue items from the queue.
738///
739/// **Note:** The consumer semantically owns the `head` pointer of the queue.
740pub struct Consumer<'a, T> {
741    rb: &'a QueueView<T>,
742}
743
744unsafe impl<T> Send for Consumer<'_, T> where T: Send {}
745
746/// A producer; it can enqueue items into the queue.
747///
748/// **Note:** The producer semantically owns the `tail` pointer of the queue.
749pub struct Producer<'a, T> {
750    rb: &'a QueueView<T>,
751}
752
753unsafe impl<T> Send for Producer<'_, T> where T: Send {}
754
755impl<T> Consumer<'_, T> {
756    /// Returns the item in the front of the queue, or `None` if the queue is empty.
757    #[inline]
758    pub fn dequeue(&mut self) -> Option<T> {
759        unsafe { self.rb.inner_dequeue() }
760    }
761
762    /// Returns the item in the front of the queue, without checking if there are elements in the
763    /// queue.
764    ///
765    /// # Safety
766    ///
767    /// See [`Queue::dequeue_unchecked`].
768    #[inline]
769    pub unsafe fn dequeue_unchecked(&mut self) -> T {
770        self.rb.inner_dequeue_unchecked()
771    }
772
773    /// Returns if there are any items to dequeue. When this returns `true`, at least the
774    /// first subsequent dequeue will succeed.
775    #[inline]
776    pub fn ready(&self) -> bool {
777        !self.rb.is_empty()
778    }
779
780    /// Returns the number of elements in the queue.
781    #[inline]
782    pub fn len(&self) -> usize {
783        self.rb.len()
784    }
785
786    /// Returns whether the queue is empty.
787    ///
788    /// # Examples
789    ///
790    /// ```
791    /// use heapless::spsc::Queue;
792    ///
793    /// let mut queue: Queue<u8, 235> = Queue::new();
794    /// let (mut producer, mut consumer) = queue.split();
795    /// assert!(consumer.is_empty());
796    /// ```
797    #[inline]
798    pub fn is_empty(&self) -> bool {
799        self.len() == 0
800    }
801
802    /// Returns the maximum number of elements the queue can hold.
803    #[inline]
804    pub fn capacity(&self) -> usize {
805        self.rb.capacity()
806    }
807
808    /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is
809    /// empty.
810    ///
811    /// # Examples
812    ///
813    /// ```
814    /// use heapless::spsc::Queue;
815    ///
816    /// let mut queue: Queue<u8, 235> = Queue::new();
817    /// let (mut producer, mut consumer) = queue.split();
818    /// assert_eq!(None, consumer.peek());
819    /// producer.enqueue(1);
820    /// assert_eq!(Some(&1), consumer.peek());
821    /// assert_eq!(Some(1), consumer.dequeue());
822    /// assert_eq!(None, consumer.peek());
823    /// ```
824    #[inline]
825    pub fn peek(&self) -> Option<&T> {
826        self.rb.peek()
827    }
828}
829
830impl<T> Producer<'_, T> {
831    /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full.
832    #[inline]
833    pub fn enqueue(&mut self, item: T) -> Result<(), T> {
834        unsafe { self.rb.inner_enqueue(item) }
835    }
836
837    /// Adds an `item` to the end of the queue, without checking if the queue is full.
838    ///
839    /// # Safety
840    ///
841    /// See [`Queue::enqueue_unchecked`].
842    #[inline]
843    pub unsafe fn enqueue_unchecked(&mut self, item: T) {
844        self.rb.inner_enqueue_unchecked(item);
845    }
846
847    /// Returns if there is any space to enqueue a new item. When this returns true, at
848    /// least the first subsequent enqueue will succeed.
849    #[inline]
850    pub fn ready(&self) -> bool {
851        !self.rb.is_full()
852    }
853
854    /// Returns the number of elements in the queue.
855    #[inline]
856    pub fn len(&self) -> usize {
857        self.rb.len()
858    }
859
860    /// Returns whether the queue is empty.
861    ///
862    /// # Examples
863    ///
864    /// ```
865    /// use heapless::spsc::Queue;
866    ///
867    /// let mut queue: Queue<u8, 235> = Queue::new();
868    /// let (mut producer, mut consumer) = queue.split();
869    /// assert!(producer.is_empty());
870    /// ```
871    #[inline]
872    pub fn is_empty(&self) -> bool {
873        self.len() == 0
874    }
875
876    /// Returns the maximum number of elements the queue can hold.
877    #[inline]
878    pub fn capacity(&self) -> usize {
879        self.rb.capacity()
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use std::hash::{Hash, Hasher};
886
887    use super::{Consumer, Producer, Queue};
888
889    use static_assertions::assert_not_impl_any;
890
891    // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
892    assert_not_impl_any!(Queue<*const (), 4>: Send);
893
894    // Ensure a `Producer` containing `!Send` values stays `!Send` itself.
895    assert_not_impl_any!(Producer<*const ()>: Send);
896
897    // Ensure a `Consumer` containing `!Send` values stays `!Send` itself.
898    assert_not_impl_any!(Consumer<*const ()>: Send);
899
900    #[test]
901    fn const_split() {
902        use critical_section::Mutex;
903        use std::cell::RefCell;
904
905        use super::{Consumer, Producer};
906
907        #[allow(clippy::type_complexity)]
908        static PC: (
909            Mutex<RefCell<Option<Producer<'_, ()>>>>,
910            Mutex<RefCell<Option<Consumer<'_, ()>>>>,
911        ) = {
912            static mut Q: Queue<(), 4> = Queue::new();
913            // SAFETY: `Q` is only accessible in this scope.
914            #[allow(static_mut_refs)]
915            let (p, c) = unsafe { Q.split_const() };
916
917            (
918                Mutex::new(RefCell::new(Some(p))),
919                Mutex::new(RefCell::new(Some(c))),
920            )
921        };
922        let producer = critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap());
923        let consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
924
925        let mut producer: Producer<'static, ()> = producer;
926        let mut consumer: Consumer<'static, ()> = consumer;
927
928        assert_eq!(producer.enqueue(()), Ok(()));
929        assert_eq!(consumer.dequeue(), Some(()));
930    }
931
932    #[test]
933    fn full() {
934        let mut rb: Queue<i32, 3> = Queue::new();
935
936        assert!(!rb.is_full());
937
938        rb.enqueue(1).unwrap();
939        assert!(!rb.is_full());
940
941        rb.enqueue(2).unwrap();
942        assert!(rb.is_full());
943    }
944
945    #[test]
946    fn empty() {
947        let mut rb: Queue<i32, 3> = Queue::new();
948
949        assert!(rb.is_empty());
950
951        rb.enqueue(1).unwrap();
952        assert!(!rb.is_empty());
953
954        rb.enqueue(2).unwrap();
955        assert!(!rb.is_empty());
956    }
957
958    #[test]
959    #[cfg_attr(miri, ignore)] // too slow
960    fn len() {
961        let mut rb: Queue<i32, 3> = Queue::new();
962
963        assert_eq!(rb.len(), 0);
964
965        rb.enqueue(1).unwrap();
966        assert_eq!(rb.len(), 1);
967
968        rb.enqueue(2).unwrap();
969        assert_eq!(rb.len(), 2);
970
971        for _ in 0..1_000_000 {
972            let v = rb.dequeue().unwrap();
973            println!("{v}");
974            rb.enqueue(v).unwrap();
975            assert_eq!(rb.len(), 2);
976        }
977    }
978
979    #[test]
980    #[cfg_attr(miri, ignore)] // too slow
981    fn try_overflow() {
982        const N: usize = 23;
983        let mut rb: Queue<i32, N> = Queue::new();
984
985        for i in 0..N as i32 - 1 {
986            rb.enqueue(i).unwrap();
987        }
988
989        for _ in 0..1_000_000 {
990            for i in 0..N as i32 - 1 {
991                let d = rb.dequeue().unwrap();
992                assert_eq!(d, i);
993                rb.enqueue(i).unwrap();
994            }
995        }
996    }
997
998    #[test]
999    fn sanity() {
1000        let mut rb: Queue<i32, 10> = Queue::new();
1001
1002        let (mut p, mut c) = rb.split();
1003
1004        assert!(p.ready());
1005
1006        assert!(!c.ready());
1007
1008        assert_eq!(c.dequeue(), None);
1009
1010        p.enqueue(0).unwrap();
1011
1012        assert_eq!(c.dequeue(), Some(0));
1013    }
1014
1015    #[test]
1016    fn static_new() {
1017        static mut _Q: Queue<i32, 4> = Queue::new();
1018    }
1019
1020    #[test]
1021    fn drop() {
1022        struct Droppable;
1023        impl Droppable {
1024            fn new() -> Self {
1025                unsafe {
1026                    COUNT += 1;
1027                }
1028                Self
1029            }
1030        }
1031
1032        impl Drop for Droppable {
1033            fn drop(&mut self) {
1034                unsafe {
1035                    COUNT -= 1;
1036                }
1037            }
1038        }
1039
1040        static mut COUNT: i32 = 0;
1041
1042        {
1043            let mut v: Queue<Droppable, 4> = Queue::new();
1044            v.enqueue(Droppable::new()).ok().unwrap();
1045            v.enqueue(Droppable::new()).ok().unwrap();
1046            v.dequeue().unwrap();
1047        }
1048
1049        assert_eq!(unsafe { COUNT }, 0);
1050
1051        {
1052            let mut v: Queue<Droppable, 4> = Queue::new();
1053            v.enqueue(Droppable::new()).ok().unwrap();
1054            v.enqueue(Droppable::new()).ok().unwrap();
1055        }
1056
1057        assert_eq!(unsafe { COUNT }, 0);
1058    }
1059
1060    #[test]
1061    fn iter() {
1062        let mut rb: Queue<i32, 4> = Queue::new();
1063
1064        rb.enqueue(0).unwrap();
1065        rb.dequeue().unwrap();
1066        rb.enqueue(1).unwrap();
1067        rb.enqueue(2).unwrap();
1068        rb.enqueue(3).unwrap();
1069
1070        let mut items = rb.iter();
1071
1072        // assert_eq!(items.next(), Some(&0));
1073        assert_eq!(items.next(), Some(&1));
1074        assert_eq!(items.next(), Some(&2));
1075        assert_eq!(items.next(), Some(&3));
1076        assert_eq!(items.next(), None);
1077    }
1078
1079    #[test]
1080    fn iter_double_ended() {
1081        let mut rb: Queue<i32, 4> = Queue::new();
1082
1083        rb.enqueue(0).unwrap();
1084        rb.enqueue(1).unwrap();
1085        rb.enqueue(2).unwrap();
1086
1087        let mut items = rb.iter();
1088
1089        assert_eq!(items.next(), Some(&0));
1090        assert_eq!(items.next_back(), Some(&2));
1091        assert_eq!(items.next(), Some(&1));
1092        assert_eq!(items.next(), None);
1093        assert_eq!(items.next_back(), None);
1094    }
1095
1096    #[test]
1097    fn iter_mut() {
1098        let mut rb: Queue<i32, 4> = Queue::new();
1099
1100        rb.enqueue(0).unwrap();
1101        rb.enqueue(1).unwrap();
1102        rb.enqueue(2).unwrap();
1103
1104        let mut items = rb.iter_mut();
1105
1106        assert_eq!(items.next(), Some(&mut 0));
1107        assert_eq!(items.next(), Some(&mut 1));
1108        assert_eq!(items.next(), Some(&mut 2));
1109        assert_eq!(items.next(), None);
1110    }
1111
1112    #[test]
1113    fn iter_mut_double_ended() {
1114        let mut rb: Queue<i32, 4> = Queue::new();
1115
1116        rb.enqueue(0).unwrap();
1117        rb.enqueue(1).unwrap();
1118        rb.enqueue(2).unwrap();
1119
1120        let mut items = rb.iter_mut();
1121
1122        assert_eq!(items.next(), Some(&mut 0));
1123        assert_eq!(items.next_back(), Some(&mut 2));
1124        assert_eq!(items.next(), Some(&mut 1));
1125        assert_eq!(items.next(), None);
1126        assert_eq!(items.next_back(), None);
1127    }
1128
1129    #[test]
1130    fn wrap_around() {
1131        let mut rb: Queue<i32, 4> = Queue::new();
1132
1133        rb.enqueue(0).unwrap();
1134        rb.enqueue(1).unwrap();
1135        rb.enqueue(2).unwrap();
1136        rb.dequeue().unwrap();
1137        rb.dequeue().unwrap();
1138        rb.dequeue().unwrap();
1139        rb.enqueue(3).unwrap();
1140        rb.enqueue(4).unwrap();
1141
1142        assert_eq!(rb.len(), 2);
1143    }
1144
1145    #[test]
1146    fn ready_flag() {
1147        let mut rb: Queue<i32, 3> = Queue::new();
1148        let (mut p, mut c) = rb.split();
1149        assert!(!c.ready());
1150        assert!(p.ready());
1151
1152        p.enqueue(0).unwrap();
1153
1154        assert!(c.ready());
1155        assert!(p.ready());
1156
1157        p.enqueue(1).unwrap();
1158
1159        assert!(c.ready());
1160        assert!(!p.ready());
1161
1162        c.dequeue().unwrap();
1163
1164        assert!(c.ready());
1165        assert!(p.ready());
1166
1167        c.dequeue().unwrap();
1168
1169        assert!(!c.ready());
1170        assert!(p.ready());
1171    }
1172
1173    #[test]
1174    fn clone() {
1175        let mut rb1: Queue<i32, 4> = Queue::new();
1176        rb1.enqueue(0).unwrap();
1177        rb1.enqueue(0).unwrap();
1178        rb1.dequeue().unwrap();
1179        rb1.enqueue(0).unwrap();
1180        let rb2 = rb1.clone();
1181        assert_eq!(rb1.capacity(), rb2.capacity());
1182        assert_eq!(rb1.len(), rb2.len());
1183        assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
1184    }
1185
1186    #[test]
1187    fn eq() {
1188        // generate two queues with same content
1189        // but different buffer alignment
1190        let mut rb1: Queue<i32, 4> = Queue::new();
1191        rb1.enqueue(0).unwrap();
1192        rb1.enqueue(0).unwrap();
1193        rb1.dequeue().unwrap();
1194        rb1.enqueue(0).unwrap();
1195        let mut rb2: Queue<i32, 4> = Queue::new();
1196        rb2.enqueue(0).unwrap();
1197        rb2.enqueue(0).unwrap();
1198        assert!(rb1 == rb2);
1199        // test for symmetry
1200        assert!(rb2 == rb1);
1201        // test for changes in content
1202        rb1.enqueue(0).unwrap();
1203        assert!(rb1 != rb2);
1204        rb2.enqueue(1).unwrap();
1205        assert!(rb1 != rb2);
1206        // test for refexive relation
1207        assert!(rb1 == rb1);
1208        assert!(rb2 == rb2);
1209    }
1210
1211    #[test]
1212    fn hash_equality() {
1213        // generate two queues with same content
1214        // but different buffer alignment
1215        let rb1 = {
1216            let mut rb1: Queue<i32, 4> = Queue::new();
1217            rb1.enqueue(0).unwrap();
1218            rb1.enqueue(0).unwrap();
1219            rb1.dequeue().unwrap();
1220            rb1.enqueue(0).unwrap();
1221            rb1
1222        };
1223        let rb2 = {
1224            let mut rb2: Queue<i32, 4> = Queue::new();
1225            rb2.enqueue(0).unwrap();
1226            rb2.enqueue(0).unwrap();
1227            rb2
1228        };
1229        let hash1 = {
1230            let mut hasher1 = hash32::FnvHasher::default();
1231            rb1.hash(&mut hasher1);
1232            hasher1.finish()
1233        };
1234        let hash2 = {
1235            let mut hasher2 = hash32::FnvHasher::default();
1236            rb2.hash(&mut hasher2);
1237            hasher2.finish()
1238        };
1239        assert_eq!(hash1, hash2);
1240    }
1241}