heapless/
spsc.rs

1//! A fixed capacity single-producer, single-consumer (SPSC) lock-free queue.
2//!
3//! *Note:* This module requires atomic load and store instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Examples
8//!
9//! [`Queue`] can be used as a plain queue:
10//!
11//! ```
12//! use heapless::spsc::Queue;
13//!
14//! let mut queue: Queue<u8, 4> = Queue::new();
15//!
16//! assert_eq!(queue.enqueue(0), Ok(()));
17//! assert_eq!(queue.enqueue(1), Ok(()));
18//! assert_eq!(queue.enqueue(2), Ok(()));
19//! assert_eq!(queue.enqueue(3), Err(3)); // Queue is full.
20//!
21//! assert_eq!(queue.dequeue(), Some(0));
22//! ```
23//!
24//! [`Queue::split`] can be used to split the queue into a [`Producer`]/[`Consumer`] pair.
25//!
26//! After splitting a `&'static mut Queue`, the resulting [`Producer`] and [`Consumer`]
27//! can be moved into different execution contexts, e.g. threads, interrupt handlers, etc.
28//!
29//!
30//! ```
31//! use heapless::spsc::{Producer, Queue};
32//!
33//! #[derive(Debug)]
34//! enum Event {
35//!     A,
36//!     B,
37//! }
38//!
39//! fn main() {
40//!     let queue: &'static mut Queue<Event, 4> = {
41//!         static mut Q: Queue<Event, 4> = Queue::new();
42//!         // SAFETY: `Q` is only accessible in this scope
43//!         // and `main` is only called once.
44//!         unsafe { &mut Q }
45//!     };
46//!
47//!     let (producer, mut consumer) = queue.split();
48//!
49//!     // `producer` can be moved into `interrupt_handler` using a static mutex or the mechanism
50//!     // provided by the concurrency framework you are using, e.g. a resource in RTIC.
51//! #   let mut producer = producer;
52//! #   interrupt_handler(&mut producer);
53//!
54//!     loop {
55//!         match consumer.dequeue() {
56//!             Some(Event::A) => { /* .. */ }
57//!             Some(Event::B) => { /* .. */ }
58//!             None => { /* Sleep. */ }
59//!         }
60//! #       break
61//!     }
62//! }
63//!
64//! // This is a different execution context that can preempt `main`.
65//! fn interrupt_handler(producer: &mut Producer<'static, Event>) {
66//! #   let condition = true;
67//!
68//!     // ..
69//!
70//!     if condition {
71//!         producer.enqueue(Event::A).unwrap();
72//!     } else {
73//!         producer.enqueue(Event::B).unwrap();
74//!     }
75//!
76//!     // ..
77//! }
78//! ```
79//!
80//! # Benchmarks
81//!
82//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled with `-C opt-level=3`:
83//!
84//! | Method                         | Time |
85//! |:-------------------------------|-----:|
86//! | `Producer::<u8, _>::enqueue()` |   16 |
87//! | `Queue::<u8, _>::enqueue()`    |   14 |
88//! | `Consumer::<u8, _>::dequeue()` |   15 |
89//! | `Queue::<u8, _>::dequeue()`    |   12 |
90//!
91//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
92//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
93//!   `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
94//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
95//!   and `enqueue` returning `Ok`.
96//!
97//! # References
98//!
99//! This is an implementation based on [https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular](
100//!   https://web.archive.org/web/20250117082625/https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular
101//! ).
102
103use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
104
105#[cfg(not(feature = "portable-atomic"))]
106use core::sync::atomic;
107#[cfg(feature = "portable-atomic")]
108use portable_atomic as atomic;
109
110use atomic::{AtomicUsize, Ordering};
111
112use crate::storage::{OwnedStorage, Storage, ViewStorage};
113
114/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
115///
116/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
117/// struct if you want to write code that's generic over both.
118pub struct QueueInner<T, S: Storage> {
119    // this is from where we dequeue items
120    pub(crate) head: AtomicUsize,
121
122    // this is where we enqueue new items
123    pub(crate) tail: AtomicUsize,
124
125    pub(crate) buffer: S::Buffer<UnsafeCell<MaybeUninit<T>>>,
126}
127
128/// A statically allocated single-producer, single-consumer queue with a capacity of `N - 1` elements.
129///
130/// <div class="warning">
131///
132/// To get better performance, use a value for `N` that is a power of 2.
133///
134/// </div>
135///
136/// You will likely want to use [`split`](QueueInner::split) to create a producer-consumer pair.
137pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
138
139/// A [`Queue`] with dynamic capacity.
140///
141/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
142pub type QueueView<T> = QueueInner<T, ViewStorage>;
143
144impl<T, const N: usize> Queue<T, N> {
145    /// Creates an empty queue.
146    pub const fn new() -> Self {
147        const {
148            assert!(N > 1);
149        }
150
151        Queue {
152            head: AtomicUsize::new(0),
153            tail: AtomicUsize::new(0),
154            buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
155        }
156    }
157
158    /// Used in `Storage` implementation
159    pub(crate) fn as_view_private(&self) -> &QueueView<T> {
160        self
161    }
162
163    /// Used in `Storage` implementation
164    pub(crate) fn as_mut_view_private(&mut self) -> &mut QueueView<T> {
165        self
166    }
167}
168
169impl<T, S: Storage> QueueInner<T, S> {
170    /// Get a reference to the `Queue`, erasing the `N` const-generic.
171    pub fn as_view(&self) -> &QueueView<T> {
172        S::as_queue_view(self)
173    }
174
175    /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
176    pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
177        S::as_mut_queue_view(self)
178    }
179
180    #[inline]
181    fn increment(&self, val: usize) -> usize {
182        (val + 1) % self.n()
183    }
184
185    #[inline]
186    fn n(&self) -> usize {
187        self.buffer.borrow().len()
188    }
189
190    /// Returns the maximum number of elements the queue can hold.
191    #[inline]
192    pub fn capacity(&self) -> usize {
193        self.n() - 1
194    }
195
196    /// Returns the number of elements in the queue.
197    #[inline]
198    pub fn len(&self) -> usize {
199        let current_head = self.head.load(Ordering::Relaxed);
200        let current_tail = self.tail.load(Ordering::Relaxed);
201
202        current_tail
203            .wrapping_sub(current_head)
204            .wrapping_add(self.n())
205            % self.n()
206    }
207
208    /// Returns whether the queue is empty.
209    #[inline]
210    pub fn is_empty(&self) -> bool {
211        self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
212    }
213
214    /// Returns whether the queue is full.
215    #[inline]
216    pub fn is_full(&self) -> bool {
217        self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
218    }
219
220    /// Iterates from the front of the queue to the back.
221    pub fn iter(&self) -> Iter<'_, T> {
222        Iter {
223            rb: self.as_view(),
224            index: 0,
225            len: self.len(),
226        }
227    }
228
229    /// Returns an iterator that allows modifying each value.
230    pub fn iter_mut(&mut self) -> IterMut<'_, T> {
231        let len = self.len();
232        IterMut {
233            rb: self.as_view(),
234            index: 0,
235            len,
236        }
237    }
238
239    /// Adds an `item` to the end of the queue.
240    ///
241    /// Returns back the `item` if the queue is full.
242    #[inline]
243    pub fn enqueue(&mut self, item: T) -> Result<(), T> {
244        unsafe { self.inner_enqueue(item) }
245    }
246
247    /// Returns the item in the front of the queue, or `None` if the queue is empty.
248    #[inline]
249    pub fn dequeue(&mut self) -> Option<T> {
250        unsafe { self.inner_dequeue() }
251    }
252
253    /// Returns a reference to the item in the front of the queue without dequeuing it, or
254    /// `None` if the queue is empty.
255    ///
256    /// # Examples
257    /// ```
258    /// use heapless::spsc::Queue;
259    ///
260    /// let mut queue: Queue<u8, 235> = Queue::new();
261    /// let (mut producer, mut consumer) = queue.split();
262    /// assert_eq!(None, consumer.peek());
263    /// producer.enqueue(1);
264    /// assert_eq!(Some(&1), consumer.peek());
265    /// assert_eq!(Some(1), consumer.dequeue());
266    /// assert_eq!(None, consumer.peek());
267    /// ```
268    pub fn peek(&self) -> Option<&T> {
269        if self.is_empty() {
270            None
271        } else {
272            let head = self.head.load(Ordering::Relaxed);
273            Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) })
274        }
275    }
276
277    // The memory for enqueueing is "owned" by the tail pointer.
278    //
279    // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
280    // items without doing pointer arithmetic and accessing internal fields of this type.
281    unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
282        let current_tail = self.tail.load(Ordering::Relaxed);
283        let next_tail = self.increment(current_tail);
284
285        if next_tail == self.head.load(Ordering::Acquire) {
286            Err(val)
287        } else {
288            (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
289            self.tail.store(next_tail, Ordering::Release);
290
291            Ok(())
292        }
293    }
294
295    // The memory for enqueueing is "owned" by the tail pointer.
296    //
297    // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
298    // items without doing pointer arithmetic and accessing internal fields of this type.
299    unsafe fn inner_enqueue_unchecked(&self, val: T) {
300        let current_tail = self.tail.load(Ordering::Relaxed);
301
302        (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
303        self.tail
304            .store(self.increment(current_tail), Ordering::Release);
305    }
306
307    /// Adds an `item` to the end of the queue, without checking if it's full.
308    ///
309    /// # Safety
310    ///
311    /// If the queue is full, this operation will leak a value (`T`'s destructor won't run on
312    /// the value that got overwritten by `item`), *and* will allow the `dequeue` operation
313    /// to create a copy of `item`, which could result in `T`'s destructor running on `item`
314    /// twice.
315    pub unsafe fn enqueue_unchecked(&mut self, item: T) {
316        self.inner_enqueue_unchecked(item);
317    }
318
319    // The memory for dequeuing is "owned" by the head pointer.
320    //
321    // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
322    // items without doing pointer arithmetic and accessing internal fields of this type.
323    unsafe fn inner_dequeue(&self) -> Option<T> {
324        let current_head = self.head.load(Ordering::Relaxed);
325
326        if current_head == self.tail.load(Ordering::Acquire) {
327            None
328        } else {
329            let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
330
331            self.head
332                .store(self.increment(current_head), Ordering::Release);
333
334            Some(v)
335        }
336    }
337
338    // The memory for dequeuing is "owned" by the head pointer.
339    //
340    // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
341    // items without doing pointer arithmetic and accessing internal fields of this type.
342    unsafe fn inner_dequeue_unchecked(&self) -> T {
343        let current_head = self.head.load(Ordering::Relaxed);
344        let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
345
346        self.head
347            .store(self.increment(current_head), Ordering::Release);
348
349        v
350    }
351
352    /// Returns the item in the front of the queue, without checking if there is something in the
353    /// queue.
354    ///
355    /// # Safety
356    ///
357    /// The queue must not be empty. Calling this on an empty queue causes [undefined behavior].
358    ///
359    /// [undefined behavior]: https://doc.rust-lang.org/reference/behavior-considered-undefined.html
360    pub unsafe fn dequeue_unchecked(&mut self) -> T {
361        self.inner_dequeue_unchecked()
362    }
363
364    /// Splits a queue into producer and consumer endpoints.
365    ///
366    /// If you need this function in a `const` context,
367    /// check out [`Queue::split_const`] and [`QueueView::split_const`].
368    ///
369    /// # Examples
370    ///
371    /// Create a queue and split it at runtime
372    ///
373    /// ```
374    /// # use heapless::spsc::Queue;
375    /// let mut queue: Queue<(), 4> = Queue::new();
376    /// let (mut producer, mut consumer) = queue.split();
377    /// producer.enqueue(()).unwrap();
378    /// assert_eq!(consumer.dequeue(), Some(()));
379    /// ```
380    ///
381    /// Create a queue at compile time, split it at runtime,
382    /// and pass it to an interrupt handler via a mutex.
383    ///
384    /// ```
385    /// use core::cell::RefCell;
386    /// use critical_section::Mutex;
387    /// use heapless::spsc::{Producer, Queue};
388    ///
389    /// static PRODUCER: Mutex<RefCell<Option<Producer<'static, ()>>>> =
390    ///     { Mutex::new(RefCell::new(None)) };
391    ///
392    /// fn interrupt() {
393    ///     let mut producer = {
394    ///         static mut P: Option<Producer<'static, ()>> = None;
395    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
396    ///         // and `interrupt` cannot be called directly or preempt itself.
397    ///         unsafe { &mut P }
398    ///     }
399    ///     .get_or_insert_with(|| {
400    ///         critical_section::with(|cs| PRODUCER.borrow_ref_mut(cs).take().unwrap())
401    ///     });
402    ///
403    ///     producer.enqueue(()).unwrap();
404    /// }
405    ///
406    /// fn main() {
407    ///     let mut consumer = {
408    ///         let (p, c) = {
409    ///             static mut Q: Queue<(), 4> = Queue::new();
410    ///             // SAFETY: `Q` is only accessible in this scope
411    ///             // and `main` is only called once.
412    ///             #[allow(static_mut_refs)]
413    ///             unsafe {
414    ///                 Q.split()
415    ///             }
416    ///         };
417    ///
418    ///         critical_section::with(move |cs| {
419    ///             let mut producer = PRODUCER.borrow_ref_mut(cs);
420    ///             *producer = Some(p);
421    ///         });
422    ///
423    ///         c
424    ///     };
425    ///
426    ///     // Interrupt occurs.
427    /// #   interrupt();
428    ///
429    ///     consumer.dequeue().unwrap();
430    /// }
431    /// ```
432    pub fn split(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
433        (
434            Producer { rb: self.as_view() },
435            Consumer { rb: self.as_view() },
436        )
437    }
438}
439
440impl<T, const N: usize> Queue<T, N> {
441    /// Splits a queue into producer and consumer endpoints.
442    ///
443    /// Unlike [`Queue::split`](), this method can be used in a `const` context
444    ///
445    /// # Example
446    ///
447    /// Create and split a queue at compile time, and pass it to the main
448    /// function and an interrupt handler via a mutex at runtime.
449    ///
450    /// ```
451    /// use core::cell::RefCell;
452    ///
453    /// use critical_section::Mutex;
454    /// use heapless::spsc::{Consumer, Producer, Queue};
455    ///
456    /// static PC: (
457    ///     Mutex<RefCell<Option<Producer<'_, ()>>>>,
458    ///     Mutex<RefCell<Option<Consumer<'_, ()>>>>,
459    /// ) = {
460    ///     static mut Q: Queue<(), 4> = Queue::new();
461    ///     // SAFETY: `Q` is only accessible in this scope.
462    ///     #[allow(static_mut_refs)]
463    ///     let (p, c) = unsafe { Q.split_const() };
464    ///
465    ///     (
466    ///         Mutex::new(RefCell::new(Some(p))),
467    ///         Mutex::new(RefCell::new(Some(c))),
468    ///     )
469    /// };
470    ///
471    /// fn interrupt() {
472    ///     let mut producer = {
473    ///         static mut P: Option<Producer<'_, ()>> = None;
474    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
475    ///         // and `interrupt` cannot be called directly or preempt itself.
476    ///         unsafe { &mut P }
477    ///     }
478    ///     .get_or_insert_with(|| {
479    ///         critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
480    ///     });
481    ///
482    ///     producer.enqueue(()).unwrap();
483    /// }
484    ///
485    /// fn main() {
486    ///     let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
487    ///
488    ///     // Interrupt occurs.
489    /// #   interrupt();
490    ///
491    ///     consumer.dequeue().unwrap();
492    /// }
493    /// ```
494    pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
495        (Producer { rb: self }, Consumer { rb: self })
496    }
497}
498
499impl<T> QueueView<T> {
500    /// Splits a queue into producer and consumer endpoints.
501    ///
502    /// Unlike [`Queue::split`](), this method can be used in a `const` context
503    ///
504    /// # Example
505    ///
506    /// Create and split a queue at compile time, and pass it to the main
507    /// function and an interrupt handler via a mutex at runtime.
508    ///
509    /// ```
510    /// use core::cell::RefCell;
511    ///
512    /// use critical_section::Mutex;
513    /// use heapless::spsc::{Consumer, Producer, Queue, QueueView};
514    ///
515    /// static PC: (
516    ///     Mutex<RefCell<Option<Producer<'_, ()>>>>,
517    ///     Mutex<RefCell<Option<Consumer<'_, ()>>>>,
518    /// ) = {
519    ///     static mut Q: &mut QueueView<()> = &mut Queue::<(), 4>::new();
520    ///     // SAFETY: `Q` is only accessible in this scope.
521    ///     #[allow(static_mut_refs)]
522    ///     let (p, c) = unsafe { Q.split_const() };
523    ///
524    ///     (
525    ///         Mutex::new(RefCell::new(Some(p))),
526    ///         Mutex::new(RefCell::new(Some(c))),
527    ///     )
528    /// };
529    ///
530    /// fn interrupt() {
531    ///     let mut producer = {
532    ///         static mut P: Option<Producer<'_, ()>> = None;
533    ///         // SAFETY: Mutable access to `P` is allowed exclusively in this scope
534    ///         // and `interrupt` cannot be called directly or preempt itself.
535    ///         unsafe { &mut P }
536    ///     }
537    ///     .get_or_insert_with(|| {
538    ///         critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
539    ///     });
540    ///
541    ///     producer.enqueue(()).unwrap();
542    /// }
543    ///
544    /// fn main() {
545    ///     let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
546    ///
547    ///     // Interrupt occurs.
548    /// #   interrupt();
549    ///
550    ///     consumer.dequeue().unwrap();
551    /// }
552    /// ```
553    pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
554        (Producer { rb: self }, Consumer { rb: self })
555    }
556}
557
558impl<T, const N: usize> Default for Queue<T, N> {
559    fn default() -> Self {
560        Self::new()
561    }
562}
563
564impl<T, const N: usize> Clone for Queue<T, N>
565where
566    T: Clone,
567{
568    fn clone(&self) -> Self {
569        let mut new: Self = Self::new();
570
571        for s in self.iter() {
572            // SAFETY: `new.capacity() == self.capacity() >= self.len()`,
573            // so no overflow is possible.
574            unsafe {
575                new.enqueue_unchecked(s.clone());
576            }
577        }
578
579        new
580    }
581}
582
583impl<T, S, S2> PartialEq<QueueInner<T, S2>> for QueueInner<T, S>
584where
585    T: PartialEq,
586    S: Storage,
587    S2: Storage,
588{
589    fn eq(&self, other: &QueueInner<T, S2>) -> bool {
590        self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
591    }
592}
593
594impl<T, S: Storage> Eq for QueueInner<T, S> where T: Eq {}
595
596/// An iterator over the items of a queue.
597pub struct Iter<'a, T> {
598    rb: &'a QueueView<T>,
599    index: usize,
600    len: usize,
601}
602
603impl<T> Clone for Iter<'_, T> {
604    fn clone(&self) -> Self {
605        Self {
606            rb: self.rb,
607            index: self.index,
608            len: self.len,
609        }
610    }
611}
612
613/// An iterator over the items of a queue.
614pub struct IterMut<'a, T> {
615    rb: &'a QueueView<T>,
616    index: usize,
617    len: usize,
618}
619impl<'a, T> Iterator for Iter<'a, T> {
620    type Item = &'a T;
621
622    fn next(&mut self) -> Option<Self::Item> {
623        if self.index < self.len {
624            let head = self.rb.head.load(Ordering::Relaxed);
625
626            let i = (head + self.index) % self.rb.n();
627            self.index += 1;
628
629            Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
630        } else {
631            None
632        }
633    }
634}
635
636impl<'a, T> Iterator for IterMut<'a, T> {
637    type Item = &'a mut T;
638
639    fn next(&mut self) -> Option<Self::Item> {
640        if self.index < self.len {
641            let head = self.rb.head.load(Ordering::Relaxed);
642
643            let i = (head + self.index) % self.rb.n();
644            self.index += 1;
645
646            Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
647        } else {
648            None
649        }
650    }
651}
652
653impl<T> DoubleEndedIterator for Iter<'_, T> {
654    fn next_back(&mut self) -> Option<Self::Item> {
655        if self.index < self.len {
656            let head = self.rb.head.load(Ordering::Relaxed);
657
658            // self.len > 0, since it's larger than self.index > 0
659            let i = (head + self.len - 1) % self.rb.n();
660            self.len -= 1;
661            Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
662        } else {
663            None
664        }
665    }
666}
667
668impl<T> DoubleEndedIterator for IterMut<'_, T> {
669    fn next_back(&mut self) -> Option<Self::Item> {
670        if self.index < self.len {
671            let head = self.rb.head.load(Ordering::Relaxed);
672
673            // self.len > 0, since it's larger than self.index > 0
674            let i = (head + self.len - 1) % self.rb.n();
675            self.len -= 1;
676            Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
677        } else {
678            None
679        }
680    }
681}
682
683impl<T, S: Storage> Drop for QueueInner<T, S> {
684    fn drop(&mut self) {
685        for item in self {
686            unsafe {
687                ptr::drop_in_place(item);
688            }
689        }
690    }
691}
692
693impl<T, S> fmt::Debug for QueueInner<T, S>
694where
695    T: fmt::Debug,
696    S: Storage,
697{
698    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699        f.debug_list().entries(self.iter()).finish()
700    }
701}
702
703impl<T, S> hash::Hash for QueueInner<T, S>
704where
705    T: hash::Hash,
706    S: Storage,
707{
708    fn hash<H: hash::Hasher>(&self, state: &mut H) {
709        // iterate over self in order
710        for t in self.iter() {
711            hash::Hash::hash(t, state);
712        }
713    }
714}
715
716impl<'a, T, S: Storage> IntoIterator for &'a QueueInner<T, S> {
717    type Item = &'a T;
718    type IntoIter = Iter<'a, T>;
719
720    fn into_iter(self) -> Self::IntoIter {
721        self.iter()
722    }
723}
724
725impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner<T, S> {
726    type Item = &'a mut T;
727    type IntoIter = IterMut<'a, T>;
728
729    fn into_iter(self) -> Self::IntoIter {
730        self.iter_mut()
731    }
732}
733
734/// A consumer; it can dequeue items from the queue.
735///
736/// **Note:** The consumer semantically owns the `head` pointer of the queue.
737pub struct Consumer<'a, T> {
738    rb: &'a QueueView<T>,
739}
740
741unsafe impl<T> Send for Consumer<'_, T> where T: Send {}
742
743/// A producer; it can enqueue items into the queue.
744///
745/// **Note:** The producer semantically owns the `tail` pointer of the queue.
746pub struct Producer<'a, T> {
747    rb: &'a QueueView<T>,
748}
749
750unsafe impl<T> Send for Producer<'_, T> where T: Send {}
751
752impl<T> Consumer<'_, T> {
753    /// Returns the item in the front of the queue, or `None` if the queue is empty.
754    #[inline]
755    pub fn dequeue(&mut self) -> Option<T> {
756        unsafe { self.rb.inner_dequeue() }
757    }
758
759    /// Returns the item in the front of the queue, without checking if there are elements in the
760    /// queue.
761    ///
762    /// # Safety
763    ///
764    /// See [`Queue::dequeue_unchecked`].
765    #[inline]
766    pub unsafe fn dequeue_unchecked(&mut self) -> T {
767        self.rb.inner_dequeue_unchecked()
768    }
769
770    /// Returns if there are any items to dequeue. When this returns `true`, at least the
771    /// first subsequent dequeue will succeed.
772    #[inline]
773    pub fn ready(&self) -> bool {
774        !self.rb.is_empty()
775    }
776
777    /// Returns the number of elements in the queue.
778    #[inline]
779    pub fn len(&self) -> usize {
780        self.rb.len()
781    }
782
783    /// Returns whether the queue is empty.
784    ///
785    /// # Examples
786    ///
787    /// ```
788    /// use heapless::spsc::Queue;
789    ///
790    /// let mut queue: Queue<u8, 235> = Queue::new();
791    /// let (mut producer, mut consumer) = queue.split();
792    /// assert!(consumer.is_empty());
793    /// ```
794    #[inline]
795    pub fn is_empty(&self) -> bool {
796        self.len() == 0
797    }
798
799    /// Returns the maximum number of elements the queue can hold.
800    #[inline]
801    pub fn capacity(&self) -> usize {
802        self.rb.capacity()
803    }
804
805    /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is
806    /// empty.
807    ///
808    /// # Examples
809    ///
810    /// ```
811    /// use heapless::spsc::Queue;
812    ///
813    /// let mut queue: Queue<u8, 235> = Queue::new();
814    /// let (mut producer, mut consumer) = queue.split();
815    /// assert_eq!(None, consumer.peek());
816    /// producer.enqueue(1);
817    /// assert_eq!(Some(&1), consumer.peek());
818    /// assert_eq!(Some(1), consumer.dequeue());
819    /// assert_eq!(None, consumer.peek());
820    /// ```
821    #[inline]
822    pub fn peek(&self) -> Option<&T> {
823        self.rb.peek()
824    }
825}
826
827impl<T> Producer<'_, T> {
828    /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full.
829    #[inline]
830    pub fn enqueue(&mut self, item: T) -> Result<(), T> {
831        unsafe { self.rb.inner_enqueue(item) }
832    }
833
834    /// Adds an `item` to the end of the queue, without checking if the queue is full.
835    ///
836    /// # Safety
837    ///
838    /// See [`Queue::enqueue_unchecked`].
839    #[inline]
840    pub unsafe fn enqueue_unchecked(&mut self, item: T) {
841        self.rb.inner_enqueue_unchecked(item);
842    }
843
844    /// Returns if there is any space to enqueue a new item. When this returns true, at
845    /// least the first subsequent enqueue will succeed.
846    #[inline]
847    pub fn ready(&self) -> bool {
848        !self.rb.is_full()
849    }
850
851    /// Returns the number of elements in the queue.
852    #[inline]
853    pub fn len(&self) -> usize {
854        self.rb.len()
855    }
856
857    /// Returns whether the queue is empty.
858    ///
859    /// # Examples
860    ///
861    /// ```
862    /// use heapless::spsc::Queue;
863    ///
864    /// let mut queue: Queue<u8, 235> = Queue::new();
865    /// let (mut producer, mut consumer) = queue.split();
866    /// assert!(producer.is_empty());
867    /// ```
868    #[inline]
869    pub fn is_empty(&self) -> bool {
870        self.len() == 0
871    }
872
873    /// Returns the maximum number of elements the queue can hold.
874    #[inline]
875    pub fn capacity(&self) -> usize {
876        self.rb.capacity()
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use std::hash::{Hash, Hasher};
883
884    use super::{Consumer, Producer, Queue};
885
886    use static_assertions::assert_not_impl_any;
887
888    // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
889    assert_not_impl_any!(Queue<*const (), 4>: Send);
890
891    // Ensure a `Producer` containing `!Send` values stays `!Send` itself.
892    assert_not_impl_any!(Producer<*const ()>: Send);
893
894    // Ensure a `Consumer` containing `!Send` values stays `!Send` itself.
895    assert_not_impl_any!(Consumer<*const ()>: Send);
896
897    #[test]
898    fn const_split() {
899        use critical_section::Mutex;
900        use std::cell::RefCell;
901
902        use super::{Consumer, Producer};
903
904        #[allow(clippy::type_complexity)]
905        static PC: (
906            Mutex<RefCell<Option<Producer<'_, ()>>>>,
907            Mutex<RefCell<Option<Consumer<'_, ()>>>>,
908        ) = {
909            static mut Q: Queue<(), 4> = Queue::new();
910            // SAFETY: `Q` is only accessible in this scope.
911            #[allow(static_mut_refs)]
912            let (p, c) = unsafe { Q.split_const() };
913
914            (
915                Mutex::new(RefCell::new(Some(p))),
916                Mutex::new(RefCell::new(Some(c))),
917            )
918        };
919        let producer = critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap());
920        let consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
921
922        let mut producer: Producer<'static, ()> = producer;
923        let mut consumer: Consumer<'static, ()> = consumer;
924
925        assert_eq!(producer.enqueue(()), Ok(()));
926        assert_eq!(consumer.dequeue(), Some(()));
927    }
928
929    #[test]
930    fn full() {
931        let mut rb: Queue<i32, 3> = Queue::new();
932
933        assert!(!rb.is_full());
934
935        rb.enqueue(1).unwrap();
936        assert!(!rb.is_full());
937
938        rb.enqueue(2).unwrap();
939        assert!(rb.is_full());
940    }
941
942    #[test]
943    fn empty() {
944        let mut rb: Queue<i32, 3> = Queue::new();
945
946        assert!(rb.is_empty());
947
948        rb.enqueue(1).unwrap();
949        assert!(!rb.is_empty());
950
951        rb.enqueue(2).unwrap();
952        assert!(!rb.is_empty());
953    }
954
955    #[test]
956    #[cfg_attr(miri, ignore)] // too slow
957    fn len() {
958        let mut rb: Queue<i32, 3> = Queue::new();
959
960        assert_eq!(rb.len(), 0);
961
962        rb.enqueue(1).unwrap();
963        assert_eq!(rb.len(), 1);
964
965        rb.enqueue(2).unwrap();
966        assert_eq!(rb.len(), 2);
967
968        for _ in 0..1_000_000 {
969            let v = rb.dequeue().unwrap();
970            println!("{v}");
971            rb.enqueue(v).unwrap();
972            assert_eq!(rb.len(), 2);
973        }
974    }
975
976    #[test]
977    #[cfg_attr(miri, ignore)] // too slow
978    fn try_overflow() {
979        const N: usize = 23;
980        let mut rb: Queue<i32, N> = Queue::new();
981
982        for i in 0..N as i32 - 1 {
983            rb.enqueue(i).unwrap();
984        }
985
986        for _ in 0..1_000_000 {
987            for i in 0..N as i32 - 1 {
988                let d = rb.dequeue().unwrap();
989                assert_eq!(d, i);
990                rb.enqueue(i).unwrap();
991            }
992        }
993    }
994
995    #[test]
996    fn sanity() {
997        let mut rb: Queue<i32, 10> = Queue::new();
998
999        let (mut p, mut c) = rb.split();
1000
1001        assert!(p.ready());
1002
1003        assert!(!c.ready());
1004
1005        assert_eq!(c.dequeue(), None);
1006
1007        p.enqueue(0).unwrap();
1008
1009        assert_eq!(c.dequeue(), Some(0));
1010    }
1011
1012    #[test]
1013    fn static_new() {
1014        static mut _Q: Queue<i32, 4> = Queue::new();
1015    }
1016
1017    #[test]
1018    fn drop() {
1019        struct Droppable;
1020        impl Droppable {
1021            fn new() -> Self {
1022                unsafe {
1023                    COUNT += 1;
1024                }
1025                Self
1026            }
1027        }
1028
1029        impl Drop for Droppable {
1030            fn drop(&mut self) {
1031                unsafe {
1032                    COUNT -= 1;
1033                }
1034            }
1035        }
1036
1037        static mut COUNT: i32 = 0;
1038
1039        {
1040            let mut v: Queue<Droppable, 4> = Queue::new();
1041            v.enqueue(Droppable::new()).ok().unwrap();
1042            v.enqueue(Droppable::new()).ok().unwrap();
1043            v.dequeue().unwrap();
1044        }
1045
1046        assert_eq!(unsafe { COUNT }, 0);
1047
1048        {
1049            let mut v: Queue<Droppable, 4> = Queue::new();
1050            v.enqueue(Droppable::new()).ok().unwrap();
1051            v.enqueue(Droppable::new()).ok().unwrap();
1052        }
1053
1054        assert_eq!(unsafe { COUNT }, 0);
1055    }
1056
1057    #[test]
1058    fn iter() {
1059        let mut rb: Queue<i32, 4> = Queue::new();
1060
1061        rb.enqueue(0).unwrap();
1062        rb.dequeue().unwrap();
1063        rb.enqueue(1).unwrap();
1064        rb.enqueue(2).unwrap();
1065        rb.enqueue(3).unwrap();
1066
1067        let mut items = rb.iter();
1068
1069        // assert_eq!(items.next(), Some(&0));
1070        assert_eq!(items.next(), Some(&1));
1071        assert_eq!(items.next(), Some(&2));
1072        assert_eq!(items.next(), Some(&3));
1073        assert_eq!(items.next(), None);
1074    }
1075
1076    #[test]
1077    fn iter_double_ended() {
1078        let mut rb: Queue<i32, 4> = Queue::new();
1079
1080        rb.enqueue(0).unwrap();
1081        rb.enqueue(1).unwrap();
1082        rb.enqueue(2).unwrap();
1083
1084        let mut items = rb.iter();
1085
1086        assert_eq!(items.next(), Some(&0));
1087        assert_eq!(items.next_back(), Some(&2));
1088        assert_eq!(items.next(), Some(&1));
1089        assert_eq!(items.next(), None);
1090        assert_eq!(items.next_back(), None);
1091    }
1092
1093    #[test]
1094    fn iter_mut() {
1095        let mut rb: Queue<i32, 4> = Queue::new();
1096
1097        rb.enqueue(0).unwrap();
1098        rb.enqueue(1).unwrap();
1099        rb.enqueue(2).unwrap();
1100
1101        let mut items = rb.iter_mut();
1102
1103        assert_eq!(items.next(), Some(&mut 0));
1104        assert_eq!(items.next(), Some(&mut 1));
1105        assert_eq!(items.next(), Some(&mut 2));
1106        assert_eq!(items.next(), None);
1107    }
1108
1109    #[test]
1110    fn iter_mut_double_ended() {
1111        let mut rb: Queue<i32, 4> = Queue::new();
1112
1113        rb.enqueue(0).unwrap();
1114        rb.enqueue(1).unwrap();
1115        rb.enqueue(2).unwrap();
1116
1117        let mut items = rb.iter_mut();
1118
1119        assert_eq!(items.next(), Some(&mut 0));
1120        assert_eq!(items.next_back(), Some(&mut 2));
1121        assert_eq!(items.next(), Some(&mut 1));
1122        assert_eq!(items.next(), None);
1123        assert_eq!(items.next_back(), None);
1124    }
1125
1126    #[test]
1127    fn wrap_around() {
1128        let mut rb: Queue<i32, 4> = Queue::new();
1129
1130        rb.enqueue(0).unwrap();
1131        rb.enqueue(1).unwrap();
1132        rb.enqueue(2).unwrap();
1133        rb.dequeue().unwrap();
1134        rb.dequeue().unwrap();
1135        rb.dequeue().unwrap();
1136        rb.enqueue(3).unwrap();
1137        rb.enqueue(4).unwrap();
1138
1139        assert_eq!(rb.len(), 2);
1140    }
1141
1142    #[test]
1143    fn ready_flag() {
1144        let mut rb: Queue<i32, 3> = Queue::new();
1145        let (mut p, mut c) = rb.split();
1146        assert!(!c.ready());
1147        assert!(p.ready());
1148
1149        p.enqueue(0).unwrap();
1150
1151        assert!(c.ready());
1152        assert!(p.ready());
1153
1154        p.enqueue(1).unwrap();
1155
1156        assert!(c.ready());
1157        assert!(!p.ready());
1158
1159        c.dequeue().unwrap();
1160
1161        assert!(c.ready());
1162        assert!(p.ready());
1163
1164        c.dequeue().unwrap();
1165
1166        assert!(!c.ready());
1167        assert!(p.ready());
1168    }
1169
1170    #[test]
1171    fn clone() {
1172        let mut rb1: Queue<i32, 4> = Queue::new();
1173        rb1.enqueue(0).unwrap();
1174        rb1.enqueue(0).unwrap();
1175        rb1.dequeue().unwrap();
1176        rb1.enqueue(0).unwrap();
1177        let rb2 = rb1.clone();
1178        assert_eq!(rb1.capacity(), rb2.capacity());
1179        assert_eq!(rb1.len(), rb2.len());
1180        assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
1181    }
1182
1183    #[test]
1184    fn eq() {
1185        // generate two queues with same content
1186        // but different buffer alignment
1187        let mut rb1: Queue<i32, 4> = Queue::new();
1188        rb1.enqueue(0).unwrap();
1189        rb1.enqueue(0).unwrap();
1190        rb1.dequeue().unwrap();
1191        rb1.enqueue(0).unwrap();
1192        let mut rb2: Queue<i32, 4> = Queue::new();
1193        rb2.enqueue(0).unwrap();
1194        rb2.enqueue(0).unwrap();
1195        assert!(rb1 == rb2);
1196        // test for symmetry
1197        assert!(rb2 == rb1);
1198        // test for changes in content
1199        rb1.enqueue(0).unwrap();
1200        assert!(rb1 != rb2);
1201        rb2.enqueue(1).unwrap();
1202        assert!(rb1 != rb2);
1203        // test for refexive relation
1204        assert!(rb1 == rb1);
1205        assert!(rb2 == rb2);
1206    }
1207
1208    #[test]
1209    fn hash_equality() {
1210        // generate two queues with same content
1211        // but different buffer alignment
1212        let rb1 = {
1213            let mut rb1: Queue<i32, 4> = Queue::new();
1214            rb1.enqueue(0).unwrap();
1215            rb1.enqueue(0).unwrap();
1216            rb1.dequeue().unwrap();
1217            rb1.enqueue(0).unwrap();
1218            rb1
1219        };
1220        let rb2 = {
1221            let mut rb2: Queue<i32, 4> = Queue::new();
1222            rb2.enqueue(0).unwrap();
1223            rb2.enqueue(0).unwrap();
1224            rb2
1225        };
1226        let hash1 = {
1227            let mut hasher1 = hash32::FnvHasher::default();
1228            rb1.hash(&mut hasher1);
1229            hasher1.finish()
1230        };
1231        let hash2 = {
1232            let mut hasher2 = hash32::FnvHasher::default();
1233            rb2.hash(&mut hasher2);
1234            hasher2.finish()
1235        };
1236        assert_eq!(hash1, hash2);
1237    }
1238}