heapless/spsc.rs
1//! A fixed capacity single-producer, single-consumer (SPSC) lock-free queue.
2//!
3//! *Note:* This module requires atomic load and store instructions. On
4//! targets where they're not natively available, they are emulated by the
5//! [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
6//!
7//! # Examples
8//!
9//! [`Queue`] can be used as a plain queue:
10//!
11//! ```
12//! use heapless::spsc::Queue;
13//!
14//! let mut queue: Queue<u8, 4> = Queue::new();
15//!
16//! assert_eq!(queue.enqueue(0), Ok(()));
17//! assert_eq!(queue.enqueue(1), Ok(()));
18//! assert_eq!(queue.enqueue(2), Ok(()));
19//! assert_eq!(queue.enqueue(3), Err(3)); // Queue is full.
20//!
21//! assert_eq!(queue.dequeue(), Some(0));
22//! ```
23//!
24//! [`Queue::split`] can be used to split the queue into a [`Producer`]/[`Consumer`] pair.
25//!
26//! After splitting a `&'static mut Queue`, the resulting [`Producer`] and [`Consumer`]
27//! can be moved into different execution contexts, e.g. threads, interrupt handlers, etc.
28//!
29//!
30//! ```
31//! use heapless::spsc::{Producer, Queue};
32//!
33//! #[derive(Debug)]
34//! enum Event {
35//! A,
36//! B,
37//! }
38//!
39//! fn main() {
40//! let queue: &'static mut Queue<Event, 4> = {
41//! static mut Q: Queue<Event, 4> = Queue::new();
42//! // SAFETY: `Q` is only accessible in this scope
43//! // and `main` is only called once.
44//! unsafe { &mut Q }
45//! };
46//!
47//! let (producer, mut consumer) = queue.split();
48//!
49//! // `producer` can be moved into `interrupt_handler` using a static mutex or the mechanism
50//! // provided by the concurrency framework you are using, e.g. a resource in RTIC.
51//! # let mut producer = producer;
52//! # interrupt_handler(&mut producer);
53//!
54//! loop {
55//! match consumer.dequeue() {
56//! Some(Event::A) => { /* .. */ }
57//! Some(Event::B) => { /* .. */ }
58//! None => { /* Sleep. */ }
59//! }
60//! # break
61//! }
62//! }
63//!
64//! // This is a different execution context that can preempt `main`.
65//! fn interrupt_handler(producer: &mut Producer<'static, Event>) {
66//! # let condition = true;
67//!
68//! // ..
69//!
70//! if condition {
71//! producer.enqueue(Event::A).unwrap();
72//! } else {
73//! producer.enqueue(Event::B).unwrap();
74//! }
75//!
76//! // ..
77//! }
78//! ```
79//!
80//! # Benchmarks
81//!
82//! Measured on an ARM Cortex-M3 core running at 8 MHz and with zero flash wait cycles, compiled with `-C opt-level=3`:
83//!
84//! | Method | Time |
85//! |:-------------------------------|-----:|
86//! | `Producer::<u8, _>::enqueue()` | 16 |
87//! | `Queue::<u8, _>::enqueue()` | 14 |
88//! | `Consumer::<u8, _>::dequeue()` | 15 |
89//! | `Queue::<u8, _>::dequeue()` | 12 |
90//!
91//! - All execution times are in clock cycles (1 clock cycle = 125 ns).
92//! - Execution time is *dependent* on `mem::size_of::<T>()`, as both operations include
93//! `ptr::read::<T>()` or `ptr::write::<T>()` in their successful path.
94//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
95//! and `enqueue` returning `Ok`.
96//!
97//! # References
98//!
99//! This is an implementation based on [https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular](
100//! https://web.archive.org/web/20250117082625/https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular
101//! ).
102
103use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
104
105#[cfg(not(feature = "portable-atomic"))]
106use core::sync::atomic;
107#[cfg(feature = "portable-atomic")]
108use portable_atomic as atomic;
109
110use atomic::{AtomicUsize, Ordering};
111
112use crate::storage::{OwnedStorage, Storage, ViewStorage};
113
114/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
115///
116/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
117/// struct if you want to write code that's generic over both.
118pub struct QueueInner<T, S: Storage> {
119 // this is from where we dequeue items
120 pub(crate) head: AtomicUsize,
121
122 // this is where we enqueue new items
123 pub(crate) tail: AtomicUsize,
124
125 pub(crate) buffer: S::Buffer<UnsafeCell<MaybeUninit<T>>>,
126}
127
128/// A statically allocated single-producer, single-consumer queue with a capacity of `N - 1` elements.
129///
130/// <div class="warning">
131///
132/// To get better performance, use a value for `N` that is a power of 2.
133///
134/// </div>
135///
136/// You will likely want to use [`split`](QueueInner::split) to create a producer-consumer pair.
137pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
138
139/// A [`Queue`] with dynamic capacity.
140///
141/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
142pub type QueueView<T> = QueueInner<T, ViewStorage>;
143
144impl<T, const N: usize> Queue<T, N> {
145 /// Creates an empty queue.
146 pub const fn new() -> Self {
147 const {
148 assert!(N > 1);
149 }
150
151 Queue {
152 head: AtomicUsize::new(0),
153 tail: AtomicUsize::new(0),
154 buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
155 }
156 }
157
158 /// Used in `Storage` implementation
159 pub(crate) fn as_view_private(&self) -> &QueueView<T> {
160 self
161 }
162
163 /// Used in `Storage` implementation
164 pub(crate) fn as_mut_view_private(&mut self) -> &mut QueueView<T> {
165 self
166 }
167}
168
169impl<T, S: Storage> QueueInner<T, S> {
170 /// Get a reference to the `Queue`, erasing the `N` const-generic.
171 pub fn as_view(&self) -> &QueueView<T> {
172 S::as_queue_view(self)
173 }
174
175 /// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
176 pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
177 S::as_mut_queue_view(self)
178 }
179
180 #[inline]
181 fn increment(&self, val: usize) -> usize {
182 (val + 1) % self.n()
183 }
184
185 #[inline]
186 fn n(&self) -> usize {
187 self.buffer.borrow().len()
188 }
189
190 /// Returns the maximum number of elements the queue can hold.
191 #[inline]
192 pub fn capacity(&self) -> usize {
193 self.n() - 1
194 }
195
196 /// Returns the number of elements in the queue.
197 #[inline]
198 pub fn len(&self) -> usize {
199 let current_head = self.head.load(Ordering::Relaxed);
200 let current_tail = self.tail.load(Ordering::Relaxed);
201
202 current_tail
203 .wrapping_sub(current_head)
204 .wrapping_add(self.n())
205 % self.n()
206 }
207
208 /// Returns whether the queue is empty.
209 #[inline]
210 pub fn is_empty(&self) -> bool {
211 self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
212 }
213
214 /// Returns whether the queue is full.
215 #[inline]
216 pub fn is_full(&self) -> bool {
217 self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
218 }
219
220 /// Iterates from the front of the queue to the back.
221 pub fn iter(&self) -> Iter<'_, T> {
222 Iter {
223 rb: self.as_view(),
224 index: 0,
225 len: self.len(),
226 }
227 }
228
229 /// Returns an iterator that allows modifying each value.
230 pub fn iter_mut(&mut self) -> IterMut<'_, T> {
231 let len = self.len();
232 IterMut {
233 rb: self.as_view(),
234 index: 0,
235 len,
236 }
237 }
238
239 /// Adds an `item` to the end of the queue.
240 ///
241 /// Returns back the `item` if the queue is full.
242 #[inline]
243 pub fn enqueue(&mut self, item: T) -> Result<(), T> {
244 unsafe { self.inner_enqueue(item) }
245 }
246
247 /// Returns the item in the front of the queue, or `None` if the queue is empty.
248 #[inline]
249 pub fn dequeue(&mut self) -> Option<T> {
250 unsafe { self.inner_dequeue() }
251 }
252
253 /// Returns a reference to the item in the front of the queue without dequeuing it, or
254 /// `None` if the queue is empty.
255 ///
256 /// # Examples
257 /// ```
258 /// use heapless::spsc::Queue;
259 ///
260 /// let mut queue: Queue<u8, 235> = Queue::new();
261 /// let (mut producer, mut consumer) = queue.split();
262 /// assert_eq!(None, consumer.peek());
263 /// producer.enqueue(1);
264 /// assert_eq!(Some(&1), consumer.peek());
265 /// assert_eq!(Some(1), consumer.dequeue());
266 /// assert_eq!(None, consumer.peek());
267 /// ```
268 pub fn peek(&self) -> Option<&T> {
269 if self.is_empty() {
270 None
271 } else {
272 let head = self.head.load(Ordering::Relaxed);
273 Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) })
274 }
275 }
276
277 // The memory for enqueueing is "owned" by the tail pointer.
278 //
279 // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
280 // items without doing pointer arithmetic and accessing internal fields of this type.
281 unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
282 let current_tail = self.tail.load(Ordering::Relaxed);
283 let next_tail = self.increment(current_tail);
284
285 if next_tail == self.head.load(Ordering::Acquire) {
286 Err(val)
287 } else {
288 (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
289 self.tail.store(next_tail, Ordering::Release);
290
291 Ok(())
292 }
293 }
294
295 // The memory for enqueueing is "owned" by the tail pointer.
296 //
297 // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue
298 // items without doing pointer arithmetic and accessing internal fields of this type.
299 unsafe fn inner_enqueue_unchecked(&self, val: T) {
300 let current_tail = self.tail.load(Ordering::Relaxed);
301
302 (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
303 self.tail
304 .store(self.increment(current_tail), Ordering::Release);
305 }
306
307 /// Adds an `item` to the end of the queue, without checking if it's full.
308 ///
309 /// # Safety
310 ///
311 /// If the queue is full, this operation will leak a value (`T`'s destructor won't run on
312 /// the value that got overwritten by `item`), *and* will allow the `dequeue` operation
313 /// to create a copy of `item`, which could result in `T`'s destructor running on `item`
314 /// twice.
315 pub unsafe fn enqueue_unchecked(&mut self, item: T) {
316 self.inner_enqueue_unchecked(item);
317 }
318
319 // The memory for dequeuing is "owned" by the head pointer.
320 //
321 // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
322 // items without doing pointer arithmetic and accessing internal fields of this type.
323 unsafe fn inner_dequeue(&self) -> Option<T> {
324 let current_head = self.head.load(Ordering::Relaxed);
325
326 if current_head == self.tail.load(Ordering::Acquire) {
327 None
328 } else {
329 let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
330
331 self.head
332 .store(self.increment(current_head), Ordering::Release);
333
334 Some(v)
335 }
336 }
337
338 // The memory for dequeuing is "owned" by the head pointer.
339 //
340 // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue
341 // items without doing pointer arithmetic and accessing internal fields of this type.
342 unsafe fn inner_dequeue_unchecked(&self) -> T {
343 let current_head = self.head.load(Ordering::Relaxed);
344 let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
345
346 self.head
347 .store(self.increment(current_head), Ordering::Release);
348
349 v
350 }
351
352 /// Returns the item in the front of the queue, without checking if there is something in the
353 /// queue.
354 ///
355 /// # Safety
356 ///
357 /// The queue must not be empty. Calling this on an empty queue causes [undefined behavior].
358 ///
359 /// [undefined behavior]: https://doc.rust-lang.org/reference/behavior-considered-undefined.html
360 pub unsafe fn dequeue_unchecked(&mut self) -> T {
361 self.inner_dequeue_unchecked()
362 }
363
364 /// Splits a queue into producer and consumer endpoints.
365 ///
366 /// If you need this function in a `const` context,
367 /// check out [`Queue::split_const`] and [`QueueView::split_const`].
368 ///
369 /// # Examples
370 ///
371 /// Create a queue and split it at runtime
372 ///
373 /// ```
374 /// # use heapless::spsc::Queue;
375 /// let mut queue: Queue<(), 4> = Queue::new();
376 /// let (mut producer, mut consumer) = queue.split();
377 /// producer.enqueue(()).unwrap();
378 /// assert_eq!(consumer.dequeue(), Some(()));
379 /// ```
380 ///
381 /// Create a queue at compile time, split it at runtime,
382 /// and pass it to an interrupt handler via a mutex.
383 ///
384 /// ```
385 /// use core::cell::RefCell;
386 /// use critical_section::Mutex;
387 /// use heapless::spsc::{Producer, Queue};
388 ///
389 /// static PRODUCER: Mutex<RefCell<Option<Producer<'static, ()>>>> =
390 /// { Mutex::new(RefCell::new(None)) };
391 ///
392 /// fn interrupt() {
393 /// let mut producer = {
394 /// static mut P: Option<Producer<'static, ()>> = None;
395 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
396 /// // and `interrupt` cannot be called directly or preempt itself.
397 /// unsafe { &mut P }
398 /// }
399 /// .get_or_insert_with(|| {
400 /// critical_section::with(|cs| PRODUCER.borrow_ref_mut(cs).take().unwrap())
401 /// });
402 ///
403 /// producer.enqueue(()).unwrap();
404 /// }
405 ///
406 /// fn main() {
407 /// let mut consumer = {
408 /// let (p, c) = {
409 /// static mut Q: Queue<(), 4> = Queue::new();
410 /// // SAFETY: `Q` is only accessible in this scope
411 /// // and `main` is only called once.
412 /// #[allow(static_mut_refs)]
413 /// unsafe {
414 /// Q.split()
415 /// }
416 /// };
417 ///
418 /// critical_section::with(move |cs| {
419 /// let mut producer = PRODUCER.borrow_ref_mut(cs);
420 /// *producer = Some(p);
421 /// });
422 ///
423 /// c
424 /// };
425 ///
426 /// // Interrupt occurs.
427 /// # interrupt();
428 ///
429 /// consumer.dequeue().unwrap();
430 /// }
431 /// ```
432 pub fn split(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
433 (
434 Producer { rb: self.as_view() },
435 Consumer { rb: self.as_view() },
436 )
437 }
438}
439
440impl<T, const N: usize> Queue<T, N> {
441 /// Splits a queue into producer and consumer endpoints.
442 ///
443 /// Unlike [`Queue::split`](), this method can be used in a `const` context
444 ///
445 /// # Example
446 ///
447 /// Create and split a queue at compile time, and pass it to the main
448 /// function and an interrupt handler via a mutex at runtime.
449 ///
450 /// ```
451 /// use core::cell::RefCell;
452 ///
453 /// use critical_section::Mutex;
454 /// use heapless::spsc::{Consumer, Producer, Queue};
455 ///
456 /// static PC: (
457 /// Mutex<RefCell<Option<Producer<'_, ()>>>>,
458 /// Mutex<RefCell<Option<Consumer<'_, ()>>>>,
459 /// ) = {
460 /// static mut Q: Queue<(), 4> = Queue::new();
461 /// // SAFETY: `Q` is only accessible in this scope.
462 /// #[allow(static_mut_refs)]
463 /// let (p, c) = unsafe { Q.split_const() };
464 ///
465 /// (
466 /// Mutex::new(RefCell::new(Some(p))),
467 /// Mutex::new(RefCell::new(Some(c))),
468 /// )
469 /// };
470 ///
471 /// fn interrupt() {
472 /// let mut producer = {
473 /// static mut P: Option<Producer<'_, ()>> = None;
474 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
475 /// // and `interrupt` cannot be called directly or preempt itself.
476 /// unsafe { &mut P }
477 /// }
478 /// .get_or_insert_with(|| {
479 /// critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
480 /// });
481 ///
482 /// producer.enqueue(()).unwrap();
483 /// }
484 ///
485 /// fn main() {
486 /// let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
487 ///
488 /// // Interrupt occurs.
489 /// # interrupt();
490 ///
491 /// consumer.dequeue().unwrap();
492 /// }
493 /// ```
494 pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
495 (Producer { rb: self }, Consumer { rb: self })
496 }
497}
498
499impl<T> QueueView<T> {
500 /// Splits a queue into producer and consumer endpoints.
501 ///
502 /// Unlike [`Queue::split`](), this method can be used in a `const` context
503 ///
504 /// # Example
505 ///
506 /// Create and split a queue at compile time, and pass it to the main
507 /// function and an interrupt handler via a mutex at runtime.
508 ///
509 /// ```
510 /// use core::cell::RefCell;
511 ///
512 /// use critical_section::Mutex;
513 /// use heapless::spsc::{Consumer, Producer, Queue, QueueView};
514 ///
515 /// static PC: (
516 /// Mutex<RefCell<Option<Producer<'_, ()>>>>,
517 /// Mutex<RefCell<Option<Consumer<'_, ()>>>>,
518 /// ) = {
519 /// static mut Q: &mut QueueView<()> = &mut Queue::<(), 4>::new();
520 /// // SAFETY: `Q` is only accessible in this scope.
521 /// #[allow(static_mut_refs)]
522 /// let (p, c) = unsafe { Q.split_const() };
523 ///
524 /// (
525 /// Mutex::new(RefCell::new(Some(p))),
526 /// Mutex::new(RefCell::new(Some(c))),
527 /// )
528 /// };
529 ///
530 /// fn interrupt() {
531 /// let mut producer = {
532 /// static mut P: Option<Producer<'_, ()>> = None;
533 /// // SAFETY: Mutable access to `P` is allowed exclusively in this scope
534 /// // and `interrupt` cannot be called directly or preempt itself.
535 /// unsafe { &mut P }
536 /// }
537 /// .get_or_insert_with(|| {
538 /// critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap())
539 /// });
540 ///
541 /// producer.enqueue(()).unwrap();
542 /// }
543 ///
544 /// fn main() {
545 /// let mut consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
546 ///
547 /// // Interrupt occurs.
548 /// # interrupt();
549 ///
550 /// consumer.dequeue().unwrap();
551 /// }
552 /// ```
553 pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
554 (Producer { rb: self }, Consumer { rb: self })
555 }
556}
557
558impl<T, const N: usize> Default for Queue<T, N> {
559 fn default() -> Self {
560 Self::new()
561 }
562}
563
564impl<T, const N: usize> Clone for Queue<T, N>
565where
566 T: Clone,
567{
568 fn clone(&self) -> Self {
569 let mut new: Self = Self::new();
570
571 for s in self.iter() {
572 // SAFETY: `new.capacity() == self.capacity() >= self.len()`,
573 // so no overflow is possible.
574 unsafe {
575 new.enqueue_unchecked(s.clone());
576 }
577 }
578
579 new
580 }
581}
582
583impl<T, S, S2> PartialEq<QueueInner<T, S2>> for QueueInner<T, S>
584where
585 T: PartialEq,
586 S: Storage,
587 S2: Storage,
588{
589 fn eq(&self, other: &QueueInner<T, S2>) -> bool {
590 self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
591 }
592}
593
594impl<T, S: Storage> Eq for QueueInner<T, S> where T: Eq {}
595
596/// An iterator over the items of a queue.
597pub struct Iter<'a, T> {
598 rb: &'a QueueView<T>,
599 index: usize,
600 len: usize,
601}
602
603impl<T> Clone for Iter<'_, T> {
604 fn clone(&self) -> Self {
605 Self {
606 rb: self.rb,
607 index: self.index,
608 len: self.len,
609 }
610 }
611}
612
613/// An iterator over the items of a queue.
614pub struct IterMut<'a, T> {
615 rb: &'a QueueView<T>,
616 index: usize,
617 len: usize,
618}
619impl<'a, T> Iterator for Iter<'a, T> {
620 type Item = &'a T;
621
622 fn next(&mut self) -> Option<Self::Item> {
623 if self.index < self.len {
624 let head = self.rb.head.load(Ordering::Relaxed);
625
626 let i = (head + self.index) % self.rb.n();
627 self.index += 1;
628
629 Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
630 } else {
631 None
632 }
633 }
634}
635
636impl<'a, T> Iterator for IterMut<'a, T> {
637 type Item = &'a mut T;
638
639 fn next(&mut self) -> Option<Self::Item> {
640 if self.index < self.len {
641 let head = self.rb.head.load(Ordering::Relaxed);
642
643 let i = (head + self.index) % self.rb.n();
644 self.index += 1;
645
646 Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
647 } else {
648 None
649 }
650 }
651}
652
653impl<T> DoubleEndedIterator for Iter<'_, T> {
654 fn next_back(&mut self) -> Option<Self::Item> {
655 if self.index < self.len {
656 let head = self.rb.head.load(Ordering::Relaxed);
657
658 // self.len > 0, since it's larger than self.index > 0
659 let i = (head + self.len - 1) % self.rb.n();
660 self.len -= 1;
661 Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
662 } else {
663 None
664 }
665 }
666}
667
668impl<T> DoubleEndedIterator for IterMut<'_, T> {
669 fn next_back(&mut self) -> Option<Self::Item> {
670 if self.index < self.len {
671 let head = self.rb.head.load(Ordering::Relaxed);
672
673 // self.len > 0, since it's larger than self.index > 0
674 let i = (head + self.len - 1) % self.rb.n();
675 self.len -= 1;
676 Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
677 } else {
678 None
679 }
680 }
681}
682
683impl<T, S: Storage> Drop for QueueInner<T, S> {
684 fn drop(&mut self) {
685 for item in self {
686 unsafe {
687 ptr::drop_in_place(item);
688 }
689 }
690 }
691}
692
693impl<T, S> fmt::Debug for QueueInner<T, S>
694where
695 T: fmt::Debug,
696 S: Storage,
697{
698 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699 f.debug_list().entries(self.iter()).finish()
700 }
701}
702
703impl<T, S> hash::Hash for QueueInner<T, S>
704where
705 T: hash::Hash,
706 S: Storage,
707{
708 fn hash<H: hash::Hasher>(&self, state: &mut H) {
709 // iterate over self in order
710 for t in self.iter() {
711 hash::Hash::hash(t, state);
712 }
713 }
714}
715
716impl<'a, T, S: Storage> IntoIterator for &'a QueueInner<T, S> {
717 type Item = &'a T;
718 type IntoIter = Iter<'a, T>;
719
720 fn into_iter(self) -> Self::IntoIter {
721 self.iter()
722 }
723}
724
725impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner<T, S> {
726 type Item = &'a mut T;
727 type IntoIter = IterMut<'a, T>;
728
729 fn into_iter(self) -> Self::IntoIter {
730 self.iter_mut()
731 }
732}
733
734/// A consumer; it can dequeue items from the queue.
735///
736/// **Note:** The consumer semantically owns the `head` pointer of the queue.
737pub struct Consumer<'a, T> {
738 rb: &'a QueueView<T>,
739}
740
741unsafe impl<T> Send for Consumer<'_, T> where T: Send {}
742
743/// A producer; it can enqueue items into the queue.
744///
745/// **Note:** The producer semantically owns the `tail` pointer of the queue.
746pub struct Producer<'a, T> {
747 rb: &'a QueueView<T>,
748}
749
750unsafe impl<T> Send for Producer<'_, T> where T: Send {}
751
752impl<T> Consumer<'_, T> {
753 /// Returns the item in the front of the queue, or `None` if the queue is empty.
754 #[inline]
755 pub fn dequeue(&mut self) -> Option<T> {
756 unsafe { self.rb.inner_dequeue() }
757 }
758
759 /// Returns the item in the front of the queue, without checking if there are elements in the
760 /// queue.
761 ///
762 /// # Safety
763 ///
764 /// See [`Queue::dequeue_unchecked`].
765 #[inline]
766 pub unsafe fn dequeue_unchecked(&mut self) -> T {
767 self.rb.inner_dequeue_unchecked()
768 }
769
770 /// Returns if there are any items to dequeue. When this returns `true`, at least the
771 /// first subsequent dequeue will succeed.
772 #[inline]
773 pub fn ready(&self) -> bool {
774 !self.rb.is_empty()
775 }
776
777 /// Returns the number of elements in the queue.
778 #[inline]
779 pub fn len(&self) -> usize {
780 self.rb.len()
781 }
782
783 /// Returns whether the queue is empty.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use heapless::spsc::Queue;
789 ///
790 /// let mut queue: Queue<u8, 235> = Queue::new();
791 /// let (mut producer, mut consumer) = queue.split();
792 /// assert!(consumer.is_empty());
793 /// ```
794 #[inline]
795 pub fn is_empty(&self) -> bool {
796 self.len() == 0
797 }
798
799 /// Returns the maximum number of elements the queue can hold.
800 #[inline]
801 pub fn capacity(&self) -> usize {
802 self.rb.capacity()
803 }
804
805 /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is
806 /// empty.
807 ///
808 /// # Examples
809 ///
810 /// ```
811 /// use heapless::spsc::Queue;
812 ///
813 /// let mut queue: Queue<u8, 235> = Queue::new();
814 /// let (mut producer, mut consumer) = queue.split();
815 /// assert_eq!(None, consumer.peek());
816 /// producer.enqueue(1);
817 /// assert_eq!(Some(&1), consumer.peek());
818 /// assert_eq!(Some(1), consumer.dequeue());
819 /// assert_eq!(None, consumer.peek());
820 /// ```
821 #[inline]
822 pub fn peek(&self) -> Option<&T> {
823 self.rb.peek()
824 }
825}
826
827impl<T> Producer<'_, T> {
828 /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full.
829 #[inline]
830 pub fn enqueue(&mut self, item: T) -> Result<(), T> {
831 unsafe { self.rb.inner_enqueue(item) }
832 }
833
834 /// Adds an `item` to the end of the queue, without checking if the queue is full.
835 ///
836 /// # Safety
837 ///
838 /// See [`Queue::enqueue_unchecked`].
839 #[inline]
840 pub unsafe fn enqueue_unchecked(&mut self, item: T) {
841 self.rb.inner_enqueue_unchecked(item);
842 }
843
844 /// Returns if there is any space to enqueue a new item. When this returns true, at
845 /// least the first subsequent enqueue will succeed.
846 #[inline]
847 pub fn ready(&self) -> bool {
848 !self.rb.is_full()
849 }
850
851 /// Returns the number of elements in the queue.
852 #[inline]
853 pub fn len(&self) -> usize {
854 self.rb.len()
855 }
856
857 /// Returns whether the queue is empty.
858 ///
859 /// # Examples
860 ///
861 /// ```
862 /// use heapless::spsc::Queue;
863 ///
864 /// let mut queue: Queue<u8, 235> = Queue::new();
865 /// let (mut producer, mut consumer) = queue.split();
866 /// assert!(producer.is_empty());
867 /// ```
868 #[inline]
869 pub fn is_empty(&self) -> bool {
870 self.len() == 0
871 }
872
873 /// Returns the maximum number of elements the queue can hold.
874 #[inline]
875 pub fn capacity(&self) -> usize {
876 self.rb.capacity()
877 }
878}
879
880#[cfg(test)]
881mod tests {
882 use std::hash::{Hash, Hasher};
883
884 use super::{Consumer, Producer, Queue};
885
886 use static_assertions::assert_not_impl_any;
887
888 // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
889 assert_not_impl_any!(Queue<*const (), 4>: Send);
890
891 // Ensure a `Producer` containing `!Send` values stays `!Send` itself.
892 assert_not_impl_any!(Producer<*const ()>: Send);
893
894 // Ensure a `Consumer` containing `!Send` values stays `!Send` itself.
895 assert_not_impl_any!(Consumer<*const ()>: Send);
896
897 #[test]
898 fn const_split() {
899 use critical_section::Mutex;
900 use std::cell::RefCell;
901
902 use super::{Consumer, Producer};
903
904 #[allow(clippy::type_complexity)]
905 static PC: (
906 Mutex<RefCell<Option<Producer<'_, ()>>>>,
907 Mutex<RefCell<Option<Consumer<'_, ()>>>>,
908 ) = {
909 static mut Q: Queue<(), 4> = Queue::new();
910 // SAFETY: `Q` is only accessible in this scope.
911 #[allow(static_mut_refs)]
912 let (p, c) = unsafe { Q.split_const() };
913
914 (
915 Mutex::new(RefCell::new(Some(p))),
916 Mutex::new(RefCell::new(Some(c))),
917 )
918 };
919 let producer = critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap());
920 let consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
921
922 let mut producer: Producer<'static, ()> = producer;
923 let mut consumer: Consumer<'static, ()> = consumer;
924
925 assert_eq!(producer.enqueue(()), Ok(()));
926 assert_eq!(consumer.dequeue(), Some(()));
927 }
928
929 #[test]
930 fn full() {
931 let mut rb: Queue<i32, 3> = Queue::new();
932
933 assert!(!rb.is_full());
934
935 rb.enqueue(1).unwrap();
936 assert!(!rb.is_full());
937
938 rb.enqueue(2).unwrap();
939 assert!(rb.is_full());
940 }
941
942 #[test]
943 fn empty() {
944 let mut rb: Queue<i32, 3> = Queue::new();
945
946 assert!(rb.is_empty());
947
948 rb.enqueue(1).unwrap();
949 assert!(!rb.is_empty());
950
951 rb.enqueue(2).unwrap();
952 assert!(!rb.is_empty());
953 }
954
955 #[test]
956 #[cfg_attr(miri, ignore)] // too slow
957 fn len() {
958 let mut rb: Queue<i32, 3> = Queue::new();
959
960 assert_eq!(rb.len(), 0);
961
962 rb.enqueue(1).unwrap();
963 assert_eq!(rb.len(), 1);
964
965 rb.enqueue(2).unwrap();
966 assert_eq!(rb.len(), 2);
967
968 for _ in 0..1_000_000 {
969 let v = rb.dequeue().unwrap();
970 println!("{v}");
971 rb.enqueue(v).unwrap();
972 assert_eq!(rb.len(), 2);
973 }
974 }
975
976 #[test]
977 #[cfg_attr(miri, ignore)] // too slow
978 fn try_overflow() {
979 const N: usize = 23;
980 let mut rb: Queue<i32, N> = Queue::new();
981
982 for i in 0..N as i32 - 1 {
983 rb.enqueue(i).unwrap();
984 }
985
986 for _ in 0..1_000_000 {
987 for i in 0..N as i32 - 1 {
988 let d = rb.dequeue().unwrap();
989 assert_eq!(d, i);
990 rb.enqueue(i).unwrap();
991 }
992 }
993 }
994
995 #[test]
996 fn sanity() {
997 let mut rb: Queue<i32, 10> = Queue::new();
998
999 let (mut p, mut c) = rb.split();
1000
1001 assert!(p.ready());
1002
1003 assert!(!c.ready());
1004
1005 assert_eq!(c.dequeue(), None);
1006
1007 p.enqueue(0).unwrap();
1008
1009 assert_eq!(c.dequeue(), Some(0));
1010 }
1011
1012 #[test]
1013 fn static_new() {
1014 static mut _Q: Queue<i32, 4> = Queue::new();
1015 }
1016
1017 #[test]
1018 fn drop() {
1019 struct Droppable;
1020 impl Droppable {
1021 fn new() -> Self {
1022 unsafe {
1023 COUNT += 1;
1024 }
1025 Self
1026 }
1027 }
1028
1029 impl Drop for Droppable {
1030 fn drop(&mut self) {
1031 unsafe {
1032 COUNT -= 1;
1033 }
1034 }
1035 }
1036
1037 static mut COUNT: i32 = 0;
1038
1039 {
1040 let mut v: Queue<Droppable, 4> = Queue::new();
1041 v.enqueue(Droppable::new()).ok().unwrap();
1042 v.enqueue(Droppable::new()).ok().unwrap();
1043 v.dequeue().unwrap();
1044 }
1045
1046 assert_eq!(unsafe { COUNT }, 0);
1047
1048 {
1049 let mut v: Queue<Droppable, 4> = Queue::new();
1050 v.enqueue(Droppable::new()).ok().unwrap();
1051 v.enqueue(Droppable::new()).ok().unwrap();
1052 }
1053
1054 assert_eq!(unsafe { COUNT }, 0);
1055 }
1056
1057 #[test]
1058 fn iter() {
1059 let mut rb: Queue<i32, 4> = Queue::new();
1060
1061 rb.enqueue(0).unwrap();
1062 rb.dequeue().unwrap();
1063 rb.enqueue(1).unwrap();
1064 rb.enqueue(2).unwrap();
1065 rb.enqueue(3).unwrap();
1066
1067 let mut items = rb.iter();
1068
1069 // assert_eq!(items.next(), Some(&0));
1070 assert_eq!(items.next(), Some(&1));
1071 assert_eq!(items.next(), Some(&2));
1072 assert_eq!(items.next(), Some(&3));
1073 assert_eq!(items.next(), None);
1074 }
1075
1076 #[test]
1077 fn iter_double_ended() {
1078 let mut rb: Queue<i32, 4> = Queue::new();
1079
1080 rb.enqueue(0).unwrap();
1081 rb.enqueue(1).unwrap();
1082 rb.enqueue(2).unwrap();
1083
1084 let mut items = rb.iter();
1085
1086 assert_eq!(items.next(), Some(&0));
1087 assert_eq!(items.next_back(), Some(&2));
1088 assert_eq!(items.next(), Some(&1));
1089 assert_eq!(items.next(), None);
1090 assert_eq!(items.next_back(), None);
1091 }
1092
1093 #[test]
1094 fn iter_mut() {
1095 let mut rb: Queue<i32, 4> = Queue::new();
1096
1097 rb.enqueue(0).unwrap();
1098 rb.enqueue(1).unwrap();
1099 rb.enqueue(2).unwrap();
1100
1101 let mut items = rb.iter_mut();
1102
1103 assert_eq!(items.next(), Some(&mut 0));
1104 assert_eq!(items.next(), Some(&mut 1));
1105 assert_eq!(items.next(), Some(&mut 2));
1106 assert_eq!(items.next(), None);
1107 }
1108
1109 #[test]
1110 fn iter_mut_double_ended() {
1111 let mut rb: Queue<i32, 4> = Queue::new();
1112
1113 rb.enqueue(0).unwrap();
1114 rb.enqueue(1).unwrap();
1115 rb.enqueue(2).unwrap();
1116
1117 let mut items = rb.iter_mut();
1118
1119 assert_eq!(items.next(), Some(&mut 0));
1120 assert_eq!(items.next_back(), Some(&mut 2));
1121 assert_eq!(items.next(), Some(&mut 1));
1122 assert_eq!(items.next(), None);
1123 assert_eq!(items.next_back(), None);
1124 }
1125
1126 #[test]
1127 fn wrap_around() {
1128 let mut rb: Queue<i32, 4> = Queue::new();
1129
1130 rb.enqueue(0).unwrap();
1131 rb.enqueue(1).unwrap();
1132 rb.enqueue(2).unwrap();
1133 rb.dequeue().unwrap();
1134 rb.dequeue().unwrap();
1135 rb.dequeue().unwrap();
1136 rb.enqueue(3).unwrap();
1137 rb.enqueue(4).unwrap();
1138
1139 assert_eq!(rb.len(), 2);
1140 }
1141
1142 #[test]
1143 fn ready_flag() {
1144 let mut rb: Queue<i32, 3> = Queue::new();
1145 let (mut p, mut c) = rb.split();
1146 assert!(!c.ready());
1147 assert!(p.ready());
1148
1149 p.enqueue(0).unwrap();
1150
1151 assert!(c.ready());
1152 assert!(p.ready());
1153
1154 p.enqueue(1).unwrap();
1155
1156 assert!(c.ready());
1157 assert!(!p.ready());
1158
1159 c.dequeue().unwrap();
1160
1161 assert!(c.ready());
1162 assert!(p.ready());
1163
1164 c.dequeue().unwrap();
1165
1166 assert!(!c.ready());
1167 assert!(p.ready());
1168 }
1169
1170 #[test]
1171 fn clone() {
1172 let mut rb1: Queue<i32, 4> = Queue::new();
1173 rb1.enqueue(0).unwrap();
1174 rb1.enqueue(0).unwrap();
1175 rb1.dequeue().unwrap();
1176 rb1.enqueue(0).unwrap();
1177 let rb2 = rb1.clone();
1178 assert_eq!(rb1.capacity(), rb2.capacity());
1179 assert_eq!(rb1.len(), rb2.len());
1180 assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
1181 }
1182
1183 #[test]
1184 fn eq() {
1185 // generate two queues with same content
1186 // but different buffer alignment
1187 let mut rb1: Queue<i32, 4> = Queue::new();
1188 rb1.enqueue(0).unwrap();
1189 rb1.enqueue(0).unwrap();
1190 rb1.dequeue().unwrap();
1191 rb1.enqueue(0).unwrap();
1192 let mut rb2: Queue<i32, 4> = Queue::new();
1193 rb2.enqueue(0).unwrap();
1194 rb2.enqueue(0).unwrap();
1195 assert!(rb1 == rb2);
1196 // test for symmetry
1197 assert!(rb2 == rb1);
1198 // test for changes in content
1199 rb1.enqueue(0).unwrap();
1200 assert!(rb1 != rb2);
1201 rb2.enqueue(1).unwrap();
1202 assert!(rb1 != rb2);
1203 // test for refexive relation
1204 assert!(rb1 == rb1);
1205 assert!(rb2 == rb2);
1206 }
1207
1208 #[test]
1209 fn hash_equality() {
1210 // generate two queues with same content
1211 // but different buffer alignment
1212 let rb1 = {
1213 let mut rb1: Queue<i32, 4> = Queue::new();
1214 rb1.enqueue(0).unwrap();
1215 rb1.enqueue(0).unwrap();
1216 rb1.dequeue().unwrap();
1217 rb1.enqueue(0).unwrap();
1218 rb1
1219 };
1220 let rb2 = {
1221 let mut rb2: Queue<i32, 4> = Queue::new();
1222 rb2.enqueue(0).unwrap();
1223 rb2.enqueue(0).unwrap();
1224 rb2
1225 };
1226 let hash1 = {
1227 let mut hasher1 = hash32::FnvHasher::default();
1228 rb1.hash(&mut hasher1);
1229 hasher1.finish()
1230 };
1231 let hash2 = {
1232 let mut hasher2 = hash32::FnvHasher::default();
1233 rb2.hash(&mut hasher2);
1234 hasher2.finish()
1235 };
1236 assert_eq!(hash1, hash2);
1237 }
1238}