1use core::{cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
98
99#[cfg(not(feature = "portable-atomic"))]
100use core::sync::atomic;
101#[cfg(feature = "portable-atomic")]
102use portable_atomic as atomic;
103
104use atomic::{AtomicUsize, Ordering};
105
106pub struct Queue<T, const N: usize> {
111 pub(crate) head: AtomicUsize,
113
114 pub(crate) tail: AtomicUsize,
116
117 pub(crate) buffer: [UnsafeCell<MaybeUninit<T>>; N],
118}
119
120impl<T, const N: usize> Queue<T, N> {
121 const INIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
122
123 #[inline]
124 fn increment(val: usize) -> usize {
125 (val + 1) % N
126 }
127
128 pub const fn new() -> Self {
130 crate::sealed::greater_than_1::<N>();
132
133 Queue {
134 head: AtomicUsize::new(0),
135 tail: AtomicUsize::new(0),
136 buffer: [Self::INIT; N],
137 }
138 }
139
140 #[inline]
142 pub const fn capacity(&self) -> usize {
143 N - 1
144 }
145
146 #[inline]
148 pub fn len(&self) -> usize {
149 let current_head = self.head.load(Ordering::Relaxed);
150 let current_tail = self.tail.load(Ordering::Relaxed);
151
152 current_tail.wrapping_sub(current_head).wrapping_add(N) % N
153 }
154
155 #[inline]
157 pub fn is_empty(&self) -> bool {
158 self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
159 }
160
161 #[inline]
163 pub fn is_full(&self) -> bool {
164 Self::increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
165 }
166
167 pub fn iter(&self) -> Iter<'_, T, N> {
169 Iter {
170 rb: self,
171 index: 0,
172 len: self.len(),
173 }
174 }
175
176 pub fn iter_mut(&mut self) -> IterMut<'_, T, N> {
178 let len = self.len();
179 IterMut {
180 rb: self,
181 index: 0,
182 len,
183 }
184 }
185
186 #[inline]
190 pub fn enqueue(&mut self, val: T) -> Result<(), T> {
191 unsafe { self.inner_enqueue(val) }
192 }
193
194 #[inline]
196 pub fn dequeue(&mut self) -> Option<T> {
197 unsafe { self.inner_dequeue() }
198 }
199
200 pub fn peek(&self) -> Option<&T> {
216 if !self.is_empty() {
217 let head = self.head.load(Ordering::Relaxed);
218 Some(unsafe { &*(self.buffer.get_unchecked(head).get() as *const T) })
219 } else {
220 None
221 }
222 }
223
224 unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
228 let current_tail = self.tail.load(Ordering::Relaxed);
229 let next_tail = Self::increment(current_tail);
230
231 if next_tail != self.head.load(Ordering::Acquire) {
232 (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
233 self.tail.store(next_tail, Ordering::Release);
234
235 Ok(())
236 } else {
237 Err(val)
238 }
239 }
240
241 unsafe fn inner_enqueue_unchecked(&self, val: T) {
245 let current_tail = self.tail.load(Ordering::Relaxed);
246
247 (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
248 self.tail
249 .store(Self::increment(current_tail), Ordering::Release);
250 }
251
252 pub unsafe fn enqueue_unchecked(&mut self, val: T) {
261 self.inner_enqueue_unchecked(val)
262 }
263
264 unsafe fn inner_dequeue(&self) -> Option<T> {
268 let current_head = self.head.load(Ordering::Relaxed);
269
270 if current_head == self.tail.load(Ordering::Acquire) {
271 None
272 } else {
273 let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
274
275 self.head
276 .store(Self::increment(current_head), Ordering::Release);
277
278 Some(v)
279 }
280 }
281
282 unsafe fn inner_dequeue_unchecked(&self) -> T {
286 let current_head = self.head.load(Ordering::Relaxed);
287 let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
288
289 self.head
290 .store(Self::increment(current_head), Ordering::Release);
291
292 v
293 }
294
295 pub unsafe fn dequeue_unchecked(&mut self) -> T {
302 self.inner_dequeue_unchecked()
303 }
304
305 pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) {
307 (Producer { rb: self }, Consumer { rb: self })
308 }
309}
310
311impl<T, const N: usize> Default for Queue<T, N> {
312 fn default() -> Self {
313 Self::new()
314 }
315}
316
317impl<T, const N: usize> Clone for Queue<T, N>
318where
319 T: Clone,
320{
321 fn clone(&self) -> Self {
322 let mut new: Queue<T, N> = Queue::new();
323
324 for s in self.iter() {
325 unsafe {
326 new.enqueue_unchecked(s.clone());
329 }
330 }
331
332 new
333 }
334}
335
336impl<T, const N: usize, const N2: usize> PartialEq<Queue<T, N2>> for Queue<T, N>
337where
338 T: PartialEq,
339{
340 fn eq(&self, other: &Queue<T, N2>) -> bool {
341 self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
342 }
343}
344
345impl<T, const N: usize> Eq for Queue<T, N> where T: Eq {}
346
347pub struct Iter<'a, T, const N: usize> {
349 rb: &'a Queue<T, N>,
350 index: usize,
351 len: usize,
352}
353
354impl<'a, T, const N: usize> Clone for Iter<'a, T, N> {
355 fn clone(&self) -> Self {
356 Self {
357 rb: self.rb,
358 index: self.index,
359 len: self.len,
360 }
361 }
362}
363
364pub struct IterMut<'a, T, const N: usize> {
366 rb: &'a mut Queue<T, N>,
367 index: usize,
368 len: usize,
369}
370
371impl<'a, T, const N: usize> Iterator for Iter<'a, T, N> {
372 type Item = &'a T;
373
374 fn next(&mut self) -> Option<Self::Item> {
375 if self.index < self.len {
376 let head = self.rb.head.load(Ordering::Relaxed);
377
378 let i = (head + self.index) % N;
379 self.index += 1;
380
381 Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
382 } else {
383 None
384 }
385 }
386}
387
388impl<'a, T, const N: usize> Iterator for IterMut<'a, T, N> {
389 type Item = &'a mut T;
390
391 fn next(&mut self) -> Option<Self::Item> {
392 if self.index < self.len {
393 let head = self.rb.head.load(Ordering::Relaxed);
394
395 let i = (head + self.index) % N;
396 self.index += 1;
397
398 Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
399 } else {
400 None
401 }
402 }
403}
404
405impl<'a, T, const N: usize> DoubleEndedIterator for Iter<'a, T, N> {
406 fn next_back(&mut self) -> Option<Self::Item> {
407 if self.index < self.len {
408 let head = self.rb.head.load(Ordering::Relaxed);
409
410 let i = (head + self.len - 1) % N;
412 self.len -= 1;
413 Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
414 } else {
415 None
416 }
417 }
418}
419
420impl<'a, T, const N: usize> DoubleEndedIterator for IterMut<'a, T, N> {
421 fn next_back(&mut self) -> Option<Self::Item> {
422 if self.index < self.len {
423 let head = self.rb.head.load(Ordering::Relaxed);
424
425 let i = (head + self.len - 1) % N;
427 self.len -= 1;
428 Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
429 } else {
430 None
431 }
432 }
433}
434
435impl<T, const N: usize> Drop for Queue<T, N> {
436 fn drop(&mut self) {
437 for item in self {
438 unsafe {
439 ptr::drop_in_place(item);
440 }
441 }
442 }
443}
444
445impl<T, const N: usize> fmt::Debug for Queue<T, N>
446where
447 T: fmt::Debug,
448{
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 f.debug_list().entries(self.iter()).finish()
451 }
452}
453
454impl<T, const N: usize> hash::Hash for Queue<T, N>
455where
456 T: hash::Hash,
457{
458 fn hash<H: hash::Hasher>(&self, state: &mut H) {
459 for t in self.iter() {
461 hash::Hash::hash(t, state);
462 }
463 }
464}
465
466impl<'a, T, const N: usize> IntoIterator for &'a Queue<T, N> {
467 type Item = &'a T;
468 type IntoIter = Iter<'a, T, N>;
469
470 fn into_iter(self) -> Self::IntoIter {
471 self.iter()
472 }
473}
474
475impl<'a, T, const N: usize> IntoIterator for &'a mut Queue<T, N> {
476 type Item = &'a mut T;
477 type IntoIter = IterMut<'a, T, N>;
478
479 fn into_iter(self) -> Self::IntoIter {
480 self.iter_mut()
481 }
482}
483
484pub struct Consumer<'a, T, const N: usize> {
487 rb: &'a Queue<T, N>,
488}
489
490unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> where T: Send {}
491
492pub struct Producer<'a, T, const N: usize> {
495 rb: &'a Queue<T, N>,
496}
497
498unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> where T: Send {}
499
500impl<'a, T, const N: usize> Consumer<'a, T, N> {
501 #[inline]
503 pub fn dequeue(&mut self) -> Option<T> {
504 unsafe { self.rb.inner_dequeue() }
505 }
506
507 #[inline]
512 pub unsafe fn dequeue_unchecked(&mut self) -> T {
513 self.rb.inner_dequeue_unchecked()
514 }
515
516 #[inline]
519 pub fn ready(&self) -> bool {
520 !self.rb.is_empty()
521 }
522
523 #[inline]
525 pub fn len(&self) -> usize {
526 self.rb.len()
527 }
528
529 #[inline]
531 pub fn capacity(&self) -> usize {
532 self.rb.capacity()
533 }
534
535 #[inline]
551 pub fn peek(&self) -> Option<&T> {
552 self.rb.peek()
553 }
554}
555
556impl<'a, T, const N: usize> Producer<'a, T, N> {
557 #[inline]
559 pub fn enqueue(&mut self, val: T) -> Result<(), T> {
560 unsafe { self.rb.inner_enqueue(val) }
561 }
562
563 #[inline]
567 pub unsafe fn enqueue_unchecked(&mut self, val: T) {
568 self.rb.inner_enqueue_unchecked(val)
569 }
570
571 #[inline]
574 pub fn ready(&self) -> bool {
575 !self.rb.is_full()
576 }
577
578 #[inline]
580 pub fn len(&self) -> usize {
581 self.rb.len()
582 }
583
584 #[inline]
586 pub fn capacity(&self) -> usize {
587 self.rb.capacity()
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use std::hash::{Hash, Hasher};
594
595 use crate::spsc::Queue;
596
597 #[test]
598 fn full() {
599 let mut rb: Queue<i32, 3> = Queue::new();
600
601 assert_eq!(rb.is_full(), false);
602
603 rb.enqueue(1).unwrap();
604 assert_eq!(rb.is_full(), false);
605
606 rb.enqueue(2).unwrap();
607 assert_eq!(rb.is_full(), true);
608 }
609
610 #[test]
611 fn empty() {
612 let mut rb: Queue<i32, 3> = Queue::new();
613
614 assert_eq!(rb.is_empty(), true);
615
616 rb.enqueue(1).unwrap();
617 assert_eq!(rb.is_empty(), false);
618
619 rb.enqueue(2).unwrap();
620 assert_eq!(rb.is_empty(), false);
621 }
622
623 #[test]
624 #[cfg_attr(miri, ignore)] fn len() {
626 let mut rb: Queue<i32, 3> = Queue::new();
627
628 assert_eq!(rb.len(), 0);
629
630 rb.enqueue(1).unwrap();
631 assert_eq!(rb.len(), 1);
632
633 rb.enqueue(2).unwrap();
634 assert_eq!(rb.len(), 2);
635
636 for _ in 0..1_000_000 {
637 let v = rb.dequeue().unwrap();
638 println!("{}", v);
639 rb.enqueue(v).unwrap();
640 assert_eq!(rb.len(), 2);
641 }
642 }
643
644 #[test]
645 #[cfg_attr(miri, ignore)] fn try_overflow() {
647 const N: usize = 23;
648 let mut rb: Queue<i32, N> = Queue::new();
649
650 for i in 0..N as i32 - 1 {
651 rb.enqueue(i).unwrap();
652 }
653
654 for _ in 0..1_000_000 {
655 for i in 0..N as i32 - 1 {
656 let d = rb.dequeue().unwrap();
657 assert_eq!(d, i);
658 rb.enqueue(i).unwrap();
659 }
660 }
661 }
662
663 #[test]
664 fn sanity() {
665 let mut rb: Queue<i32, 10> = Queue::new();
666
667 let (mut p, mut c) = rb.split();
668
669 assert_eq!(p.ready(), true);
670
671 assert_eq!(c.ready(), false);
672
673 assert_eq!(c.dequeue(), None);
674
675 p.enqueue(0).unwrap();
676
677 assert_eq!(c.dequeue(), Some(0));
678 }
679
680 #[test]
681 fn static_new() {
682 static mut _Q: Queue<i32, 4> = Queue::new();
683 }
684
685 #[test]
686 fn drop() {
687 struct Droppable;
688 impl Droppable {
689 fn new() -> Self {
690 unsafe {
691 COUNT += 1;
692 }
693 Droppable
694 }
695 }
696
697 impl Drop for Droppable {
698 fn drop(&mut self) {
699 unsafe {
700 COUNT -= 1;
701 }
702 }
703 }
704
705 static mut COUNT: i32 = 0;
706
707 {
708 let mut v: Queue<Droppable, 4> = Queue::new();
709 v.enqueue(Droppable::new()).ok().unwrap();
710 v.enqueue(Droppable::new()).ok().unwrap();
711 v.dequeue().unwrap();
712 }
713
714 assert_eq!(unsafe { COUNT }, 0);
715
716 {
717 let mut v: Queue<Droppable, 4> = Queue::new();
718 v.enqueue(Droppable::new()).ok().unwrap();
719 v.enqueue(Droppable::new()).ok().unwrap();
720 }
721
722 assert_eq!(unsafe { COUNT }, 0);
723 }
724
725 #[test]
726 fn iter() {
727 let mut rb: Queue<i32, 4> = Queue::new();
728
729 rb.enqueue(0).unwrap();
730 rb.dequeue().unwrap();
731 rb.enqueue(1).unwrap();
732 rb.enqueue(2).unwrap();
733 rb.enqueue(3).unwrap();
734
735 let mut items = rb.iter();
736
737 assert_eq!(items.next(), Some(&1));
739 assert_eq!(items.next(), Some(&2));
740 assert_eq!(items.next(), Some(&3));
741 assert_eq!(items.next(), None);
742 }
743
744 #[test]
745 fn iter_double_ended() {
746 let mut rb: Queue<i32, 4> = Queue::new();
747
748 rb.enqueue(0).unwrap();
749 rb.enqueue(1).unwrap();
750 rb.enqueue(2).unwrap();
751
752 let mut items = rb.iter();
753
754 assert_eq!(items.next(), Some(&0));
755 assert_eq!(items.next_back(), Some(&2));
756 assert_eq!(items.next(), Some(&1));
757 assert_eq!(items.next(), None);
758 assert_eq!(items.next_back(), None);
759 }
760
761 #[test]
762 fn iter_mut() {
763 let mut rb: Queue<i32, 4> = Queue::new();
764
765 rb.enqueue(0).unwrap();
766 rb.enqueue(1).unwrap();
767 rb.enqueue(2).unwrap();
768
769 let mut items = rb.iter_mut();
770
771 assert_eq!(items.next(), Some(&mut 0));
772 assert_eq!(items.next(), Some(&mut 1));
773 assert_eq!(items.next(), Some(&mut 2));
774 assert_eq!(items.next(), None);
775 }
776
777 #[test]
778 fn iter_mut_double_ended() {
779 let mut rb: Queue<i32, 4> = Queue::new();
780
781 rb.enqueue(0).unwrap();
782 rb.enqueue(1).unwrap();
783 rb.enqueue(2).unwrap();
784
785 let mut items = rb.iter_mut();
786
787 assert_eq!(items.next(), Some(&mut 0));
788 assert_eq!(items.next_back(), Some(&mut 2));
789 assert_eq!(items.next(), Some(&mut 1));
790 assert_eq!(items.next(), None);
791 assert_eq!(items.next_back(), None);
792 }
793
794 #[test]
795 fn wrap_around() {
796 let mut rb: Queue<i32, 4> = Queue::new();
797
798 rb.enqueue(0).unwrap();
799 rb.enqueue(1).unwrap();
800 rb.enqueue(2).unwrap();
801 rb.dequeue().unwrap();
802 rb.dequeue().unwrap();
803 rb.dequeue().unwrap();
804 rb.enqueue(3).unwrap();
805 rb.enqueue(4).unwrap();
806
807 assert_eq!(rb.len(), 2);
808 }
809
810 #[test]
811 fn ready_flag() {
812 let mut rb: Queue<i32, 3> = Queue::new();
813 let (mut p, mut c) = rb.split();
814 assert_eq!(c.ready(), false);
815 assert_eq!(p.ready(), true);
816
817 p.enqueue(0).unwrap();
818
819 assert_eq!(c.ready(), true);
820 assert_eq!(p.ready(), true);
821
822 p.enqueue(1).unwrap();
823
824 assert_eq!(c.ready(), true);
825 assert_eq!(p.ready(), false);
826
827 c.dequeue().unwrap();
828
829 assert_eq!(c.ready(), true);
830 assert_eq!(p.ready(), true);
831
832 c.dequeue().unwrap();
833
834 assert_eq!(c.ready(), false);
835 assert_eq!(p.ready(), true);
836 }
837
838 #[test]
839 fn clone() {
840 let mut rb1: Queue<i32, 4> = Queue::new();
841 rb1.enqueue(0).unwrap();
842 rb1.enqueue(0).unwrap();
843 rb1.dequeue().unwrap();
844 rb1.enqueue(0).unwrap();
845 let rb2 = rb1.clone();
846 assert_eq!(rb1.capacity(), rb2.capacity());
847 assert_eq!(rb1.len(), rb2.len());
848 assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
849 }
850
851 #[test]
852 fn eq() {
853 let mut rb1: Queue<i32, 4> = Queue::new();
856 rb1.enqueue(0).unwrap();
857 rb1.enqueue(0).unwrap();
858 rb1.dequeue().unwrap();
859 rb1.enqueue(0).unwrap();
860 let mut rb2: Queue<i32, 4> = Queue::new();
861 rb2.enqueue(0).unwrap();
862 rb2.enqueue(0).unwrap();
863 assert!(rb1 == rb2);
864 assert!(rb2 == rb1);
866 rb1.enqueue(0).unwrap();
868 assert!(rb1 != rb2);
869 rb2.enqueue(1).unwrap();
870 assert!(rb1 != rb2);
871 assert!(rb1 == rb1);
873 assert!(rb2 == rb2);
874 }
875
876 #[test]
877 fn hash_equality() {
878 let rb1 = {
881 let mut rb1: Queue<i32, 4> = Queue::new();
882 rb1.enqueue(0).unwrap();
883 rb1.enqueue(0).unwrap();
884 rb1.dequeue().unwrap();
885 rb1.enqueue(0).unwrap();
886 rb1
887 };
888 let rb2 = {
889 let mut rb2: Queue<i32, 4> = Queue::new();
890 rb2.enqueue(0).unwrap();
891 rb2.enqueue(0).unwrap();
892 rb2
893 };
894 let hash1 = {
895 let mut hasher1 = hash32::FnvHasher::default();
896 rb1.hash(&mut hasher1);
897 let hash1 = hasher1.finish();
898 hash1
899 };
900 let hash2 = {
901 let mut hasher2 = hash32::FnvHasher::default();
902 rb2.hash(&mut hasher2);
903 let hash2 = hasher2.finish();
904 hash2
905 };
906 assert_eq!(hash1, hash2);
907 }
908}