1use core::cmp;
6use managed::ManagedSlice;
7
8use crate::storage::Resettable;
9
10use super::{Empty, Full};
11
12#[derive(Debug)]
27pub struct RingBuffer<'a, T: 'a> {
28 storage: ManagedSlice<'a, T>,
29 read_at: usize,
30 length: usize,
31}
32
33impl<'a, T: 'a> RingBuffer<'a, T> {
34 pub fn new<S>(storage: S) -> RingBuffer<'a, T>
38 where
39 S: Into<ManagedSlice<'a, T>>,
40 {
41 RingBuffer {
42 storage: storage.into(),
43 read_at: 0,
44 length: 0,
45 }
46 }
47
48 pub fn clear(&mut self) {
50 self.read_at = 0;
51 self.length = 0;
52 }
53
54 pub fn capacity(&self) -> usize {
56 self.storage.len()
57 }
58
59 pub fn reset(&mut self)
61 where
62 T: Resettable,
63 {
64 self.clear();
65 for elem in self.storage.iter_mut() {
66 elem.reset();
67 }
68 }
69
70 pub fn len(&self) -> usize {
72 self.length
73 }
74
75 pub fn window(&self) -> usize {
77 self.capacity() - self.len()
78 }
79
80 pub fn contiguous_window(&self) -> usize {
83 cmp::min(self.window(), self.capacity() - self.get_idx(self.length))
84 }
85
86 pub fn is_empty(&self) -> bool {
88 self.len() == 0
89 }
90
91 pub fn is_full(&self) -> bool {
93 self.window() == 0
94 }
95
96 fn get_idx(&self, idx: usize) -> usize {
99 let len = self.capacity();
100 if len > 0 {
101 (self.read_at + idx) % len
102 } else {
103 0
104 }
105 }
106
107 fn get_idx_unchecked(&self, idx: usize) -> usize {
110 (self.read_at + idx) % self.capacity()
111 }
112}
113
114impl<'a, T: 'a> RingBuffer<'a, T> {
117 pub fn enqueue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Full>
120 where
121 F: FnOnce(&'b mut T) -> Result<R, E>,
122 {
123 if self.is_full() {
124 return Err(Full);
125 }
126
127 let index = self.get_idx_unchecked(self.length);
128 let res = f(&mut self.storage[index]);
129 if res.is_ok() {
130 self.length += 1;
131 }
132 Ok(res)
133 }
134
135 pub fn enqueue_one(&mut self) -> Result<&mut T, Full> {
140 self.enqueue_one_with(Ok)?
141 }
142
143 pub fn dequeue_one_with<'b, R, E, F>(&'b mut self, f: F) -> Result<Result<R, E>, Empty>
146 where
147 F: FnOnce(&'b mut T) -> Result<R, E>,
148 {
149 if self.is_empty() {
150 return Err(Empty);
151 }
152
153 let next_at = self.get_idx_unchecked(1);
154 let res = f(&mut self.storage[self.read_at]);
155
156 if res.is_ok() {
157 self.length -= 1;
158 self.read_at = next_at;
159 }
160 Ok(res)
161 }
162
163 pub fn dequeue_one(&mut self) -> Result<&mut T, Empty> {
168 self.dequeue_one_with(Ok)?
169 }
170}
171
172impl<'a, T: 'a> RingBuffer<'a, T> {
175 pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
182 where
183 F: FnOnce(&'b mut [T]) -> (usize, R),
184 {
185 if self.length == 0 {
186 self.read_at = 0;
189 }
190
191 let write_at = self.get_idx(self.length);
192 let max_size = self.contiguous_window();
193 let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
194 assert!(size <= max_size);
195 self.length += size;
196 (size, result)
197 }
198
199 #[must_use]
205 pub fn enqueue_many(&mut self, size: usize) -> &mut [T] {
206 self.enqueue_many_with(|buf| {
207 let size = cmp::min(size, buf.len());
208 (size, &mut buf[..size])
209 })
210 .1
211 }
212
213 #[must_use]
216 pub fn enqueue_slice(&mut self, data: &[T]) -> usize
217 where
218 T: Copy,
219 {
220 let (size_1, data) = self.enqueue_many_with(|buf| {
221 let size = cmp::min(buf.len(), data.len());
222 buf[..size].copy_from_slice(&data[..size]);
223 (size, &data[size..])
224 });
225 let (size_2, ()) = self.enqueue_many_with(|buf| {
226 let size = cmp::min(buf.len(), data.len());
227 buf[..size].copy_from_slice(&data[..size]);
228 (size, ())
229 });
230 size_1 + size_2
231 }
232
233 pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
240 where
241 F: FnOnce(&'b mut [T]) -> (usize, R),
242 {
243 let capacity = self.capacity();
244 let max_size = cmp::min(self.len(), capacity - self.read_at);
245 let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
246 assert!(size <= max_size);
247 self.read_at = if capacity > 0 {
248 (self.read_at + size) % capacity
249 } else {
250 0
251 };
252 self.length -= size;
253 (size, result)
254 }
255
256 #[must_use]
262 pub fn dequeue_many(&mut self, size: usize) -> &mut [T] {
263 self.dequeue_many_with(|buf| {
264 let size = cmp::min(size, buf.len());
265 (size, &mut buf[..size])
266 })
267 .1
268 }
269
270 #[must_use]
273 pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
274 where
275 T: Copy,
276 {
277 let (size_1, data) = self.dequeue_many_with(|buf| {
278 let size = cmp::min(buf.len(), data.len());
279 data[..size].copy_from_slice(&buf[..size]);
280 (size, &mut data[size..])
281 });
282 let (size_2, ()) = self.dequeue_many_with(|buf| {
283 let size = cmp::min(buf.len(), data.len());
284 data[..size].copy_from_slice(&buf[..size]);
285 (size, ())
286 });
287 size_1 + size_2
288 }
289}
290
291impl<'a, T: 'a> RingBuffer<'a, T> {
294 #[must_use]
297 pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
298 let start_at = self.get_idx(self.length + offset);
299 if offset > self.window() {
301 return &mut [];
302 }
303 let clamped_window = self.window() - offset;
305 if size > clamped_window {
306 size = clamped_window
307 }
308 let until_end = self.capacity() - start_at;
310 if size > until_end {
311 size = until_end
312 }
313
314 &mut self.storage[start_at..start_at + size]
315 }
316
317 #[must_use]
321 pub fn write_unallocated(&mut self, offset: usize, data: &[T]) -> usize
322 where
323 T: Copy,
324 {
325 let (size_1, offset, data) = {
326 let slice = self.get_unallocated(offset, data.len());
327 let slice_len = slice.len();
328 slice.copy_from_slice(&data[..slice_len]);
329 (slice_len, offset + slice_len, &data[slice_len..])
330 };
331 let size_2 = {
332 let slice = self.get_unallocated(offset, data.len());
333 let slice_len = slice.len();
334 slice.copy_from_slice(&data[..slice_len]);
335 slice_len
336 };
337 size_1 + size_2
338 }
339
340 pub fn enqueue_unallocated(&mut self, count: usize) {
345 assert!(count <= self.window());
346 self.length += count;
347 }
348
349 #[must_use]
352 pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
353 let start_at = self.get_idx(offset);
354 if offset > self.length {
356 return &mut [];
357 }
358 let clamped_length = self.length - offset;
360 if size > clamped_length {
361 size = clamped_length
362 }
363 let until_end = self.capacity() - start_at;
365 if size > until_end {
366 size = until_end
367 }
368
369 &self.storage[start_at..start_at + size]
370 }
371
372 #[must_use]
376 pub fn read_allocated(&mut self, offset: usize, data: &mut [T]) -> usize
377 where
378 T: Copy,
379 {
380 let (size_1, offset, data) = {
381 let slice = self.get_allocated(offset, data.len());
382 data[..slice.len()].copy_from_slice(slice);
383 (slice.len(), offset + slice.len(), &mut data[slice.len()..])
384 };
385 let size_2 = {
386 let slice = self.get_allocated(offset, data.len());
387 data[..slice.len()].copy_from_slice(slice);
388 slice.len()
389 };
390 size_1 + size_2
391 }
392
393 pub fn dequeue_allocated(&mut self, count: usize) {
398 assert!(count <= self.len());
399 self.length -= count;
400 self.read_at = self.get_idx(count);
401 }
402}
403
404impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
405 fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
406 RingBuffer::new(slice)
407 }
408}
409
410#[cfg(test)]
411mod test {
412 use super::*;
413
414 #[test]
415 fn test_buffer_length_changes() {
416 let mut ring = RingBuffer::new(vec![0; 2]);
417 assert!(ring.is_empty());
418 assert!(!ring.is_full());
419 assert_eq!(ring.len(), 0);
420 assert_eq!(ring.capacity(), 2);
421 assert_eq!(ring.window(), 2);
422
423 ring.length = 1;
424 assert!(!ring.is_empty());
425 assert!(!ring.is_full());
426 assert_eq!(ring.len(), 1);
427 assert_eq!(ring.capacity(), 2);
428 assert_eq!(ring.window(), 1);
429
430 ring.length = 2;
431 assert!(!ring.is_empty());
432 assert!(ring.is_full());
433 assert_eq!(ring.len(), 2);
434 assert_eq!(ring.capacity(), 2);
435 assert_eq!(ring.window(), 0);
436 }
437
438 #[test]
439 fn test_buffer_enqueue_dequeue_one_with() {
440 let mut ring = RingBuffer::new(vec![0; 5]);
441 assert_eq!(
442 ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
443 Err(Empty)
444 );
445
446 ring.enqueue_one_with(Ok::<_, ()>).unwrap().unwrap();
447 assert!(!ring.is_empty());
448 assert!(!ring.is_full());
449
450 for i in 1..5 {
451 ring.enqueue_one_with(|e| Ok::<_, ()>(*e = i))
452 .unwrap()
453 .unwrap();
454 assert!(!ring.is_empty());
455 }
456 assert!(ring.is_full());
457 assert_eq!(
458 ring.enqueue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
459 Err(Full)
460 );
461
462 for i in 0..5 {
463 assert_eq!(
464 ring.dequeue_one_with(|e| Ok::<_, ()>(*e)).unwrap().unwrap(),
465 i
466 );
467 assert!(!ring.is_full());
468 }
469 assert_eq!(
470 ring.dequeue_one_with(|_| -> Result::<(), ()> { unreachable!() }),
471 Err(Empty)
472 );
473 assert!(ring.is_empty());
474 }
475
476 #[test]
477 fn test_buffer_enqueue_dequeue_one() {
478 let mut ring = RingBuffer::new(vec![0; 5]);
479 assert_eq!(ring.dequeue_one(), Err(Empty));
480
481 ring.enqueue_one().unwrap();
482 assert!(!ring.is_empty());
483 assert!(!ring.is_full());
484
485 for i in 1..5 {
486 *ring.enqueue_one().unwrap() = i;
487 assert!(!ring.is_empty());
488 }
489 assert!(ring.is_full());
490 assert_eq!(ring.enqueue_one(), Err(Full));
491
492 for i in 0..5 {
493 assert_eq!(*ring.dequeue_one().unwrap(), i);
494 assert!(!ring.is_full());
495 }
496 assert_eq!(ring.dequeue_one(), Err(Empty));
497 assert!(ring.is_empty());
498 }
499
500 #[test]
501 fn test_buffer_enqueue_many_with() {
502 let mut ring = RingBuffer::new(vec![b'.'; 12]);
503
504 assert_eq!(
505 ring.enqueue_many_with(|buf| {
506 assert_eq!(buf.len(), 12);
507 buf[0..2].copy_from_slice(b"ab");
508 (2, true)
509 }),
510 (2, true)
511 );
512 assert_eq!(ring.len(), 2);
513 assert_eq!(&ring.storage[..], b"ab..........");
514
515 ring.enqueue_many_with(|buf| {
516 assert_eq!(buf.len(), 12 - 2);
517 buf[0..4].copy_from_slice(b"cdXX");
518 (2, ())
519 });
520 assert_eq!(ring.len(), 4);
521 assert_eq!(&ring.storage[..], b"abcdXX......");
522
523 ring.enqueue_many_with(|buf| {
524 assert_eq!(buf.len(), 12 - 4);
525 buf[0..4].copy_from_slice(b"efgh");
526 (4, ())
527 });
528 assert_eq!(ring.len(), 8);
529 assert_eq!(&ring.storage[..], b"abcdefgh....");
530
531 for _ in 0..4 {
532 *ring.dequeue_one().unwrap() = b'.';
533 }
534 assert_eq!(ring.len(), 4);
535 assert_eq!(&ring.storage[..], b"....efgh....");
536
537 ring.enqueue_many_with(|buf| {
538 assert_eq!(buf.len(), 12 - 8);
539 buf[0..4].copy_from_slice(b"ijkl");
540 (4, ())
541 });
542 assert_eq!(ring.len(), 8);
543 assert_eq!(&ring.storage[..], b"....efghijkl");
544
545 ring.enqueue_many_with(|buf| {
546 assert_eq!(buf.len(), 4);
547 buf[0..4].copy_from_slice(b"abcd");
548 (4, ())
549 });
550 assert_eq!(ring.len(), 12);
551 assert_eq!(&ring.storage[..], b"abcdefghijkl");
552
553 for _ in 0..4 {
554 *ring.dequeue_one().unwrap() = b'.';
555 }
556 assert_eq!(ring.len(), 8);
557 assert_eq!(&ring.storage[..], b"abcd....ijkl");
558 }
559
560 #[test]
561 fn test_buffer_enqueue_many() {
562 let mut ring = RingBuffer::new(vec![b'.'; 12]);
563
564 ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
565 assert_eq!(ring.len(), 8);
566 assert_eq!(&ring.storage[..], b"abcdefgh....");
567
568 ring.enqueue_many(8).copy_from_slice(b"ijkl");
569 assert_eq!(ring.len(), 12);
570 assert_eq!(&ring.storage[..], b"abcdefghijkl");
571 }
572
573 #[test]
574 fn test_buffer_enqueue_slice() {
575 let mut ring = RingBuffer::new(vec![b'.'; 12]);
576
577 assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
578 assert_eq!(ring.len(), 8);
579 assert_eq!(&ring.storage[..], b"abcdefgh....");
580
581 for _ in 0..4 {
582 *ring.dequeue_one().unwrap() = b'.';
583 }
584 assert_eq!(ring.len(), 4);
585 assert_eq!(&ring.storage[..], b"....efgh....");
586
587 assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
588 assert_eq!(ring.len(), 12);
589 assert_eq!(&ring.storage[..], b"abcdefghijkl");
590 }
591
592 #[test]
593 fn test_buffer_dequeue_many_with() {
594 let mut ring = RingBuffer::new(vec![b'.'; 12]);
595
596 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
597
598 assert_eq!(
599 ring.dequeue_many_with(|buf| {
600 assert_eq!(buf.len(), 12);
601 assert_eq!(buf, b"abcdefghijkl");
602 buf[..4].copy_from_slice(b"....");
603 (4, true)
604 }),
605 (4, true)
606 );
607 assert_eq!(ring.len(), 8);
608 assert_eq!(&ring.storage[..], b"....efghijkl");
609
610 ring.dequeue_many_with(|buf| {
611 assert_eq!(buf, b"efghijkl");
612 buf[..4].copy_from_slice(b"....");
613 (4, ())
614 });
615 assert_eq!(ring.len(), 4);
616 assert_eq!(&ring.storage[..], b"........ijkl");
617
618 assert_eq!(ring.enqueue_slice(b"abcd"), 4);
619 assert_eq!(ring.len(), 8);
620
621 ring.dequeue_many_with(|buf| {
622 assert_eq!(buf, b"ijkl");
623 buf[..4].copy_from_slice(b"....");
624 (4, ())
625 });
626 ring.dequeue_many_with(|buf| {
627 assert_eq!(buf, b"abcd");
628 buf[..4].copy_from_slice(b"....");
629 (4, ())
630 });
631 assert_eq!(ring.len(), 0);
632 assert_eq!(&ring.storage[..], b"............");
633 }
634
635 #[test]
636 fn test_buffer_dequeue_many() {
637 let mut ring = RingBuffer::new(vec![b'.'; 12]);
638
639 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
640
641 {
642 let buf = ring.dequeue_many(8);
643 assert_eq!(buf, b"abcdefgh");
644 buf.copy_from_slice(b"........");
645 }
646 assert_eq!(ring.len(), 4);
647 assert_eq!(&ring.storage[..], b"........ijkl");
648
649 {
650 let buf = ring.dequeue_many(8);
651 assert_eq!(buf, b"ijkl");
652 buf.copy_from_slice(b"....");
653 }
654 assert_eq!(ring.len(), 0);
655 assert_eq!(&ring.storage[..], b"............");
656 }
657
658 #[test]
659 fn test_buffer_dequeue_slice() {
660 let mut ring = RingBuffer::new(vec![b'.'; 12]);
661
662 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
663
664 {
665 let mut buf = [0; 8];
666 assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
667 assert_eq!(&buf[..], b"abcdefgh");
668 assert_eq!(ring.len(), 4);
669 }
670
671 assert_eq!(ring.enqueue_slice(b"abcd"), 4);
672
673 {
674 let mut buf = [0; 8];
675 assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
676 assert_eq!(&buf[..], b"ijklabcd");
677 assert_eq!(ring.len(), 0);
678 }
679 }
680
681 #[test]
682 fn test_buffer_get_unallocated() {
683 let mut ring = RingBuffer::new(vec![b'.'; 12]);
684
685 assert_eq!(ring.get_unallocated(16, 4), b"");
686
687 {
688 let buf = ring.get_unallocated(0, 4);
689 buf.copy_from_slice(b"abcd");
690 }
691 assert_eq!(&ring.storage[..], b"abcd........");
692
693 let buf_enqueued = ring.enqueue_many(4);
694 assert_eq!(buf_enqueued.len(), 4);
695 assert_eq!(ring.len(), 4);
696
697 {
698 let buf = ring.get_unallocated(4, 8);
699 buf.copy_from_slice(b"ijkl");
700 }
701 assert_eq!(&ring.storage[..], b"abcd....ijkl");
702
703 ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
704 ring.dequeue_many(4).copy_from_slice(b"abcd");
705 assert_eq!(ring.len(), 8);
706 assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
707
708 {
709 let buf = ring.get_unallocated(0, 8);
710 buf.copy_from_slice(b"ABCD");
711 }
712 assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
713 }
714
715 #[test]
716 fn test_buffer_write_unallocated() {
717 let mut ring = RingBuffer::new(vec![b'.'; 12]);
718 ring.enqueue_many(6).copy_from_slice(b"abcdef");
719 ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
720
721 assert_eq!(ring.write_unallocated(0, b"ghi"), 3);
722 assert_eq!(ring.get_unallocated(0, 3), b"ghi");
723
724 assert_eq!(ring.write_unallocated(3, b"jklmno"), 6);
725 assert_eq!(ring.get_unallocated(3, 3), b"jkl");
726
727 assert_eq!(ring.write_unallocated(9, b"pqrstu"), 3);
728 assert_eq!(ring.get_unallocated(9, 3), b"pqr");
729 }
730
731 #[test]
732 fn test_buffer_get_allocated() {
733 let mut ring = RingBuffer::new(vec![b'.'; 12]);
734
735 assert_eq!(ring.get_allocated(16, 4), b"");
736 assert_eq!(ring.get_allocated(0, 4), b"");
737
738 let len_enqueued = ring.enqueue_slice(b"abcd");
739 assert_eq!(ring.get_allocated(0, 8), b"abcd");
740 assert_eq!(len_enqueued, 4);
741
742 let len_enqueued = ring.enqueue_slice(b"efghijkl");
743 ring.dequeue_many(4).copy_from_slice(b"....");
744 assert_eq!(ring.get_allocated(4, 8), b"ijkl");
745 assert_eq!(len_enqueued, 8);
746
747 let len_enqueued = ring.enqueue_slice(b"abcd");
748 assert_eq!(ring.get_allocated(4, 8), b"ijkl");
749 assert_eq!(len_enqueued, 4);
750 }
751
752 #[test]
753 fn test_buffer_read_allocated() {
754 let mut ring = RingBuffer::new(vec![b'.'; 12]);
755 ring.enqueue_many(12).copy_from_slice(b"abcdefghijkl");
756
757 let mut data = [0; 6];
758 assert_eq!(ring.read_allocated(0, &mut data[..]), 6);
759 assert_eq!(&data[..], b"abcdef");
760
761 ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
762 ring.enqueue_many(3).copy_from_slice(b"mno");
763
764 let mut data = [0; 6];
765 assert_eq!(ring.read_allocated(3, &mut data[..]), 6);
766 assert_eq!(&data[..], b"jklmno");
767
768 let mut data = [0; 6];
769 assert_eq!(ring.read_allocated(6, &mut data[..]), 3);
770 assert_eq!(&data[..], b"mno\x00\x00\x00");
771 }
772
773 #[test]
774 fn test_buffer_with_no_capacity() {
775 let mut no_capacity: RingBuffer<u8> = RingBuffer::new(vec![]);
776
777 assert_eq!(no_capacity.get_unallocated(0, 0), &[]);
780 assert_eq!(no_capacity.get_allocated(0, 0), &[]);
781 no_capacity.dequeue_allocated(0);
782 assert_eq!(no_capacity.enqueue_many(0), &[]);
783 assert_eq!(no_capacity.enqueue_one(), Err(Full));
784 assert_eq!(no_capacity.contiguous_window(), 0);
785 }
786
787 #[test]
791 fn test_buffer_write_wholly() {
792 let mut ring = RingBuffer::new(vec![b'.'; 8]);
793 ring.enqueue_many(2).copy_from_slice(b"ab");
794 ring.enqueue_many(2).copy_from_slice(b"cd");
795 assert_eq!(ring.len(), 4);
796 let buf_dequeued = ring.dequeue_many(4);
797 assert_eq!(buf_dequeued, b"abcd");
798 assert_eq!(ring.len(), 0);
799
800 let large = ring.enqueue_many(8);
801 assert_eq!(large.len(), 8);
802 }
803}