1#[path = "slab/node.rs"]
13mod node;
14
15use node::{Node, NothingProducer, TaskWaiting};
16
17use crate::notify::{GenericNotify, Internal, Notification};
18use crate::sync::atomic::{AtomicBool, Ordering};
19use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
20use crate::sync::Arc;
21use crate::{RegisterResult, State, Task, TaskRef};
22
23use core::fmt;
24use core::marker::PhantomData;
25use core::mem;
26use core::num::NonZeroUsize;
27use core::ops;
28use core::pin::Pin;
29
30use alloc::vec::Vec;
31
32impl<T> crate::Inner<T> {
33 fn try_lock(&self) -> Option<ListGuard<'_, T>> {
35 self.list.inner.try_lock().map(|guard| ListGuard {
36 inner: self,
37 guard: Some(guard),
38 tasks: alloc::vec![],
39 })
40 }
41
42 fn queue_update(&self) {
44 drop(self.try_lock());
46 }
47
48 pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
52 if listener.as_ref().as_pin_ref().is_some() {
53 return;
55 }
56
57 match self.try_lock() {
58 Some(mut lock) => {
59 let key = lock.insert(State::Created);
60 *listener = Some(Listener::HasNode(key));
61 }
62
63 None => {
64 let (node, task_waiting) = Node::listener();
66 self.list.queue.push(node).unwrap();
67 *listener = Some(Listener::Queued(task_waiting));
68
69 self.queue_update();
71 }
72 }
73 }
74
75 pub(crate) fn remove(
77 &self,
78 mut listener: Pin<&mut Option<Listener<T>>>,
79 propagate: bool,
80 ) -> Option<State<T>> {
81 loop {
82 let state = match listener.as_mut().take() {
83 Some(Listener::HasNode(key)) => {
84 match self.try_lock() {
85 Some(mut list) => {
86 list.remove(key, propagate)
88 }
89
90 None => {
91 let node = Node::RemoveListener {
94 listener: key,
95 propagate,
96 };
97
98 self.list.queue.push(node).unwrap();
99
100 self.queue_update();
102
103 None
104 }
105 }
106 }
107
108 Some(Listener::Queued(tw)) => {
109 if let Some(key) = tw.cancel() {
111 *listener = Some(Listener::HasNode(key));
113 continue;
114 }
115
116 None
117 }
118
119 None => None,
120
121 _ => unreachable!(),
122 };
123
124 return state;
125 }
126 }
127
128 #[cold]
130 pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
131 match self.try_lock() {
132 Some(mut guard) => {
133 guard.notify(notify)
135 }
136
137 None => {
138 let node = Node::Notify(GenericNotify::new(
140 notify.count(Internal::new()),
141 notify.is_additional(Internal::new()),
142 NothingProducer::default(),
143 ));
144
145 self.list.queue.push(node).unwrap();
146
147 self.queue_update();
149
150 0
152 }
153 }
154 }
155
156 pub(crate) fn register(
161 &self,
162 mut listener: Pin<&mut Option<Listener<T>>>,
163 task: TaskRef<'_>,
164 ) -> RegisterResult<T> {
165 loop {
166 match listener.as_mut().take() {
167 Some(Listener::HasNode(key)) => {
168 *listener = Some(Listener::HasNode(key));
169 match self.try_lock() {
170 Some(mut guard) => {
171 return guard.register(listener, task);
173 }
174
175 None => {
176 let node = Node::Waiting(task.into_task());
178 self.list.queue.push(node).unwrap();
179
180 self.queue_update();
182
183 return RegisterResult::Registered;
184 }
185 }
186 }
187
188 Some(Listener::Queued(task_waiting)) => {
189 self.queue_update();
191
192 match task_waiting.status() {
194 Some(key) => {
195 assert!(key.get() != usize::MAX);
196
197 *listener = Some(Listener::HasNode(key));
199 }
200
201 None => {
202 task_waiting.register(task.into_task());
204 *listener = Some(Listener::Queued(task_waiting));
205
206 self.queue_update();
208
209 return RegisterResult::Registered;
210 }
211 }
212 }
213
214 None => return RegisterResult::NeverInserted,
215
216 _ => unreachable!(),
217 }
218 }
219 }
220}
221
222#[derive(Debug)]
223pub(crate) struct List<T> {
224 inner: Mutex<ListenerSlab<T>>,
226
227 queue: concurrent_queue::ConcurrentQueue<Node<T>>,
229}
230
231impl<T> List<T> {
232 pub(super) fn new() -> List<T> {
233 List {
234 inner: Mutex::new(ListenerSlab::new()),
235 queue: concurrent_queue::ConcurrentQueue::unbounded(),
236 }
237 }
238
239 pub(super) fn try_total_listeners(&self) -> Option<usize> {
241 self.inner.try_lock().map(|lock| lock.listeners.len())
242 }
243}
244
245pub(crate) struct ListGuard<'a, T> {
247 pub(crate) inner: &'a crate::Inner<T>,
249
250 pub(crate) guard: Option<MutexGuard<'a, ListenerSlab<T>>>,
252
253 tasks: Vec<Task>,
255}
256
257impl<T> ListGuard<'_, T> {
258 #[cold]
259 fn process_nodes_slow(&mut self, start_node: Node<T>) {
260 let guard = self.guard.as_mut().unwrap();
261
262 self.tasks.extend(start_node.apply(guard));
264
265 while let Ok(node) = self.inner.list.queue.pop() {
267 self.tasks.extend(node.apply(guard));
268 }
269 }
270
271 #[inline]
272 fn process_nodes(&mut self) {
273 if let Ok(start_node) = self.inner.list.queue.pop() {
275 self.process_nodes_slow(start_node);
276 }
277 }
278}
279
280impl<T> ops::Deref for ListGuard<'_, T> {
281 type Target = ListenerSlab<T>;
282
283 fn deref(&self) -> &Self::Target {
284 self.guard.as_ref().unwrap()
285 }
286}
287
288impl<T> ops::DerefMut for ListGuard<'_, T> {
289 fn deref_mut(&mut self) -> &mut Self::Target {
290 self.guard.as_mut().unwrap()
291 }
292}
293
294impl<T> Drop for ListGuard<'_, T> {
295 fn drop(&mut self) {
296 while self.guard.is_some() {
297 self.process_nodes();
299
300 let list = self.guard.take().unwrap();
302 let notified = if list.notified < list.len {
303 list.notified
304 } else {
305 usize::MAX
306 };
307
308 self.inner.notified.store(notified, Ordering::Release);
309
310 drop(list);
312
313 for task in self.tasks.drain(..) {
315 task.wake();
316 }
317
318 if !self.inner.list.queue.is_empty() {
324 self.guard = self.inner.list.inner.try_lock();
325 }
326 }
327 }
328}
329
330enum Entry<T> {
332 Listener {
334 state: Cell<State<T>>,
336
337 prev: Cell<Option<NonZeroUsize>>,
339
340 next: Cell<Option<NonZeroUsize>>,
342 },
343
344 Empty(NonZeroUsize),
346
347 Sentinel,
349}
350
351struct TakenState<'a, T> {
352 slot: &'a Cell<State<T>>,
353 state: State<T>,
354}
355
356impl<T> Drop for TakenState<'_, T> {
357 fn drop(&mut self) {
358 self.slot
359 .set(mem::replace(&mut self.state, State::NotifiedTaken));
360 }
361}
362
363impl<T> fmt::Debug for TakenState<'_, T> {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 fmt::Debug::fmt(&self.state, f)
366 }
367}
368
369impl<T: PartialEq> PartialEq for TakenState<'_, T> {
370 fn eq(&self, other: &Self) -> bool {
371 self.state == other.state
372 }
373}
374
375impl<'a, T> TakenState<'a, T> {
376 fn new(slot: &'a Cell<State<T>>) -> Self {
377 let state = slot.replace(State::NotifiedTaken);
378 Self { slot, state }
379 }
380}
381
382impl<T> fmt::Debug for Entry<T> {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 match self {
385 Entry::Listener { state, next, prev } => f
386 .debug_struct("Listener")
387 .field("state", &TakenState::new(state))
388 .field("prev", prev)
389 .field("next", next)
390 .finish(),
391 Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(),
392 Entry::Sentinel => f.debug_tuple("Sentinel").finish(),
393 }
394 }
395}
396
397impl<T: PartialEq> PartialEq for Entry<T> {
398 fn eq(&self, other: &Entry<T>) -> bool {
399 match (self, other) {
400 (
401 Self::Listener {
402 state: state1,
403 prev: prev1,
404 next: next1,
405 },
406 Self::Listener {
407 state: state2,
408 prev: prev2,
409 next: next2,
410 },
411 ) => {
412 if TakenState::new(state1) != TakenState::new(state2) {
413 return false;
414 }
415
416 prev1.get() == prev2.get() && next1.get() == next2.get()
417 }
418 (Self::Empty(next1), Self::Empty(next2)) => next1 == next2,
419 (Self::Sentinel, Self::Sentinel) => true,
420 _ => false,
421 }
422 }
423}
424
425impl<T> Entry<T> {
426 fn state(&self) -> &Cell<State<T>> {
427 match self {
428 Entry::Listener { state, .. } => state,
429 _ => unreachable!(),
430 }
431 }
432
433 fn prev(&self) -> &Cell<Option<NonZeroUsize>> {
434 match self {
435 Entry::Listener { prev, .. } => prev,
436 _ => unreachable!(),
437 }
438 }
439
440 fn next(&self) -> &Cell<Option<NonZeroUsize>> {
441 match self {
442 Entry::Listener { next, .. } => next,
443 _ => unreachable!(),
444 }
445 }
446}
447
448pub(crate) struct ListenerSlab<T> {
450 listeners: Vec<Entry<T>>,
452
453 head: Option<NonZeroUsize>,
455
456 tail: Option<NonZeroUsize>,
458
459 start: Option<NonZeroUsize>,
461
462 notified: usize,
464
465 len: usize,
467
468 first_empty: NonZeroUsize,
471}
472
473impl<T> fmt::Debug for ListenerSlab<T> {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 f.debug_struct("ListenerSlab")
476 .field("listeners", &self.listeners)
477 .field("head", &self.head)
478 .field("tail", &self.tail)
479 .field("start", &self.start)
480 .field("notified", &self.notified)
481 .field("len", &self.len)
482 .field("first_empty", &self.first_empty)
483 .finish()
484 }
485}
486
487impl<T> ListenerSlab<T> {
488 pub(crate) fn new() -> Self {
490 Self {
491 listeners: alloc::vec![Entry::Sentinel],
492 head: None,
493 tail: None,
494 start: None,
495 notified: 0,
496 len: 0,
497 first_empty: unsafe { NonZeroUsize::new_unchecked(1) },
498 }
499 }
500
501 pub(crate) fn insert(&mut self, state: State<T>) -> NonZeroUsize {
503 let key = {
505 let entry = Entry::Listener {
506 state: Cell::new(state),
507 prev: Cell::new(self.tail),
508 next: Cell::new(None),
509 };
510
511 let key = self.first_empty;
512 if self.first_empty.get() == self.listeners.len() {
513 self.listeners.push(entry);
515
516 self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) };
518 } else {
519 let slot = &mut self.listeners[key.get()];
521 let next = match mem::replace(slot, entry) {
522 Entry::Empty(next) => next,
523 _ => unreachable!(),
524 };
525
526 self.first_empty = next;
527 }
528
529 key
530 };
531
532 match mem::replace(&mut self.tail, Some(key)) {
534 None => self.head = Some(key),
535 Some(tail) => {
536 let tail = &self.listeners[tail.get()];
537 tail.next().set(Some(key));
538 }
539 }
540
541 if self.start.is_none() {
544 self.start = Some(key);
545 }
546
547 self.len += 1;
549
550 key
551 }
552
553 pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option<State<T>> {
555 let entry = &self.listeners[key.get()];
556 let prev = entry.prev().get();
557 let next = entry.next().get();
558
559 match prev {
561 None => self.head = next,
562 Some(p) => self.listeners[p.get()].next().set(next),
563 }
564
565 match next {
567 None => self.tail = prev,
568 Some(n) => self.listeners[n.get()].prev().set(prev),
569 }
570
571 if self.start == Some(key) {
573 self.start = next;
574 }
575
576 let entry = mem::replace(
578 &mut self.listeners[key.get()],
579 Entry::Empty(self.first_empty),
580 );
581 self.first_empty = key;
582
583 let mut state = match entry {
584 Entry::Listener { state, .. } => state.into_inner(),
585 _ => unreachable!(),
586 };
587
588 if state.is_notified() {
590 self.notified = self.notified.saturating_sub(1);
591
592 if propagate {
593 let state = mem::replace(&mut state, State::NotifiedTaken);
595 if let State::Notified { tag, additional } = state {
596 let tags = {
597 let mut tag = Some(tag);
598 move || tag.take().expect("called more than once")
599 };
600
601 self.notify(GenericNotify::new(1, additional, tags));
602 }
603 }
604 }
605 self.len -= 1;
606
607 Some(state)
608 }
609
610 #[cold]
612 pub(crate) fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
613 let mut n = notify.count(Internal::new());
614 let is_additional = notify.is_additional(Internal::new());
615 if !is_additional {
616 if n <= self.notified {
618 return 0;
619 }
620 n -= self.notified;
621 }
622
623 let original_count = n;
624 while n > 0 {
625 n -= 1;
626
627 match self.start {
629 None => return original_count - n - 1,
630
631 Some(e) => {
632 let entry = &self.listeners[e.get()];
634 self.start = entry.next().get();
635
636 let tag = notify.next_tag(Internal::new());
638 if let State::Task(task) = entry.state().replace(State::Notified {
639 tag,
640 additional: is_additional,
641 }) {
642 task.wake();
643 }
644
645 self.notified += 1;
647 }
648 }
649 }
650
651 original_count - n
652 }
653
654 pub(crate) fn register(
659 &mut self,
660 mut listener: Pin<&mut Option<Listener<T>>>,
661 task: TaskRef<'_>,
662 ) -> RegisterResult<T> {
663 let key = match *listener {
664 Some(Listener::HasNode(key)) => key,
665 _ => return RegisterResult::NeverInserted,
666 };
667
668 let entry = &self.listeners[key.get()];
669
670 match entry.state().replace(State::NotifiedTaken) {
672 State::Notified { tag, .. } => {
673 self.remove(key, false);
675 *listener = None;
676 RegisterResult::Notified(tag)
677 }
678
679 State::Task(other_task) => {
680 if task.will_wake(other_task.as_task_ref()) {
682 entry.state().set(State::Task(other_task));
683 } else {
684 entry.state().set(State::Task(task.into_task()));
685 }
686
687 RegisterResult::Registered
688 }
689
690 _ => {
691 entry.state().set(State::Task(task.into_task()));
693 RegisterResult::Registered
694 }
695 }
696 }
697}
698
699pub(crate) enum Listener<T> {
700 HasNode(NonZeroUsize),
702
703 Queued(Arc<TaskWaiting>),
705
706 _EatGenericType(PhantomData<T>),
708}
709
710impl<T> fmt::Debug for Listener<T> {
711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
712 match self {
713 Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(),
714 Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(),
715 Self::_EatGenericType(_) => unreachable!(),
716 }
717 }
718}
719
720impl<T> Unpin for Listener<T> {}
721
722impl<T> PartialEq for Listener<T> {
723 fn eq(&self, other: &Self) -> bool {
724 match (self, other) {
725 (Self::HasNode(a), Self::HasNode(b)) => a == b,
726 (Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b),
727 _ => false,
728 }
729 }
730}
731
732pub(crate) struct Mutex<T> {
734 value: UnsafeCell<T>,
736
737 locked: AtomicBool,
739}
740
741impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
742 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
743 if let Some(lock) = self.try_lock() {
744 f.debug_tuple("Mutex").field(&*lock).finish()
745 } else {
746 f.write_str("Mutex { <locked> }")
747 }
748 }
749}
750
751impl<T> Mutex<T> {
752 pub(crate) fn new(value: T) -> Self {
754 Self {
755 value: UnsafeCell::new(value),
756 locked: AtomicBool::new(false),
757 }
758 }
759
760 pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
762 if self
764 .locked
765 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
766 .is_ok()
767 {
768 Some(MutexGuard {
770 mutex: self,
771 guard: self.value.get(),
772 })
773 } else {
774 self.try_lock_slow()
775 }
776 }
777
778 #[cold]
779 fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
780 let mut spins = 100u32;
783
784 loop {
785 if self
786 .locked
787 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
788 .is_ok()
789 {
790 return Some(MutexGuard {
792 mutex: self,
793 guard: self.value.get(),
794 });
795 }
796
797 while self.locked.load(Ordering::Relaxed) {
799 spins = spins.checked_sub(1)?;
801 }
802 }
803 }
804}
805
806pub(crate) struct MutexGuard<'a, T> {
807 mutex: &'a Mutex<T>,
808 guard: ConstPtr<T>,
809}
810
811impl<'a, T> Drop for MutexGuard<'a, T> {
812 fn drop(&mut self) {
813 self.mutex.locked.store(false, Ordering::Release);
814 }
815}
816
817impl<'a, T> ops::Deref for MutexGuard<'a, T> {
818 type Target = T;
819
820 fn deref(&self) -> &T {
821 unsafe { self.guard.deref() }
822 }
823}
824
825impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
826 fn deref_mut(&mut self) -> &mut T {
827 unsafe { self.guard.deref_mut() }
828 }
829}
830
831unsafe impl<T: Send> Send for Mutex<T> {}
832unsafe impl<T: Send> Sync for Mutex<T> {}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837
838 #[cfg(target_family = "wasm")]
839 use wasm_bindgen_test::wasm_bindgen_test as test;
840
841 #[test]
842 fn smoke_mutex() {
843 let mutex = Mutex::new(0);
844
845 {
846 let mut guard = mutex.try_lock().unwrap();
847 *guard += 1;
848 }
849
850 {
851 let mut guard = mutex.try_lock().unwrap();
852 *guard += 1;
853 }
854
855 let guard = mutex.try_lock().unwrap();
856 assert_eq!(*guard, 2);
857 }
858
859 #[test]
860 fn smoke_listener_slab() {
861 let mut listeners = ListenerSlab::<()>::new();
862
863 let key1 = listeners.insert(State::Created);
865 let key2 = listeners.insert(State::Created);
866 let key3 = listeners.insert(State::Created);
867
868 assert_eq!(listeners.len, 3);
869 assert_eq!(listeners.notified, 0);
870 assert_eq!(listeners.tail, Some(key3));
871 assert_eq!(listeners.head, Some(key1));
872 assert_eq!(listeners.start, Some(key1));
873 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
874 assert_eq!(listeners.listeners[0], Entry::Sentinel);
875 assert_eq!(
876 listeners.listeners[1],
877 Entry::Listener {
878 state: Cell::new(State::Created),
879 prev: Cell::new(None),
880 next: Cell::new(Some(key2)),
881 }
882 );
883 assert_eq!(
884 listeners.listeners[2],
885 Entry::Listener {
886 state: Cell::new(State::Created),
887 prev: Cell::new(Some(key1)),
888 next: Cell::new(Some(key3)),
889 }
890 );
891 assert_eq!(
892 listeners.listeners[3],
893 Entry::Listener {
894 state: Cell::new(State::Created),
895 prev: Cell::new(Some(key2)),
896 next: Cell::new(None),
897 }
898 );
899
900 assert_eq!(listeners.remove(key2, false), Some(State::Created));
902
903 assert_eq!(listeners.len, 2);
904 assert_eq!(listeners.notified, 0);
905 assert_eq!(listeners.tail, Some(key3));
906 assert_eq!(listeners.head, Some(key1));
907 assert_eq!(listeners.start, Some(key1));
908 assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
909 assert_eq!(listeners.listeners[0], Entry::Sentinel);
910 assert_eq!(
911 listeners.listeners[1],
912 Entry::Listener {
913 state: Cell::new(State::Created),
914 prev: Cell::new(None),
915 next: Cell::new(Some(key3)),
916 }
917 );
918 assert_eq!(
919 listeners.listeners[2],
920 Entry::Empty(NonZeroUsize::new(4).unwrap())
921 );
922 assert_eq!(
923 listeners.listeners[3],
924 Entry::Listener {
925 state: Cell::new(State::Created),
926 prev: Cell::new(Some(key1)),
927 next: Cell::new(None),
928 }
929 );
930 }
931
932 #[test]
933 fn listener_slab_notify() {
934 let mut listeners = ListenerSlab::new();
935
936 let key1 = listeners.insert(State::Created);
938 let key2 = listeners.insert(State::Created);
939 let key3 = listeners.insert(State::Created);
940
941 listeners.notify(GenericNotify::new(1, true, || ()));
943
944 assert_eq!(listeners.len, 3);
945 assert_eq!(listeners.notified, 1);
946 assert_eq!(listeners.tail, Some(key3));
947 assert_eq!(listeners.head, Some(key1));
948 assert_eq!(listeners.start, Some(key2));
949 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
950 assert_eq!(listeners.listeners[0], Entry::Sentinel);
951 assert_eq!(
952 listeners.listeners[1],
953 Entry::Listener {
954 state: Cell::new(State::Notified {
955 additional: true,
956 tag: ()
957 }),
958 prev: Cell::new(None),
959 next: Cell::new(Some(key2)),
960 }
961 );
962 assert_eq!(
963 listeners.listeners[2],
964 Entry::Listener {
965 state: Cell::new(State::Created),
966 prev: Cell::new(Some(key1)),
967 next: Cell::new(Some(key3)),
968 }
969 );
970 assert_eq!(
971 listeners.listeners[3],
972 Entry::Listener {
973 state: Cell::new(State::Created),
974 prev: Cell::new(Some(key2)),
975 next: Cell::new(None),
976 }
977 );
978
979 assert_eq!(
981 listeners.remove(key1, false),
982 Some(State::Notified {
983 additional: true,
984 tag: ()
985 })
986 );
987
988 assert_eq!(listeners.len, 2);
989 assert_eq!(listeners.notified, 0);
990 assert_eq!(listeners.tail, Some(key3));
991 assert_eq!(listeners.head, Some(key2));
992 assert_eq!(listeners.start, Some(key2));
993 assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
994 assert_eq!(listeners.listeners[0], Entry::Sentinel);
995 assert_eq!(
996 listeners.listeners[1],
997 Entry::Empty(NonZeroUsize::new(4).unwrap())
998 );
999 assert_eq!(
1000 listeners.listeners[2],
1001 Entry::Listener {
1002 state: Cell::new(State::Created),
1003 prev: Cell::new(None),
1004 next: Cell::new(Some(key3)),
1005 }
1006 );
1007 assert_eq!(
1008 listeners.listeners[3],
1009 Entry::Listener {
1010 state: Cell::new(State::Created),
1011 prev: Cell::new(Some(key2)),
1012 next: Cell::new(None),
1013 }
1014 );
1015 }
1016
1017 #[test]
1018 fn listener_slab_register() {
1019 let woken = Arc::new(AtomicBool::new(false));
1020 let waker = waker_fn::waker_fn({
1021 let woken = woken.clone();
1022 move || woken.store(true, Ordering::SeqCst)
1023 });
1024
1025 let mut listeners = ListenerSlab::new();
1026
1027 let key1 = listeners.insert(State::Created);
1029 let key2 = listeners.insert(State::Created);
1030 let key3 = listeners.insert(State::Created);
1031
1032 assert_eq!(
1034 listeners.register(
1035 Pin::new(&mut Some(Listener::HasNode(key2))),
1036 TaskRef::Waker(&waker)
1037 ),
1038 RegisterResult::Registered
1039 );
1040
1041 assert_eq!(listeners.len, 3);
1042 assert_eq!(listeners.notified, 0);
1043 assert_eq!(listeners.tail, Some(key3));
1044 assert_eq!(listeners.head, Some(key1));
1045 assert_eq!(listeners.start, Some(key1));
1046 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1047 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1048 assert_eq!(
1049 listeners.listeners[1],
1050 Entry::Listener {
1051 state: Cell::new(State::Created),
1052 prev: Cell::new(None),
1053 next: Cell::new(Some(key2)),
1054 }
1055 );
1056 assert_eq!(
1057 listeners.listeners[2],
1058 Entry::Listener {
1059 state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1060 prev: Cell::new(Some(key1)),
1061 next: Cell::new(Some(key3)),
1062 }
1063 );
1064 assert_eq!(
1065 listeners.listeners[3],
1066 Entry::Listener {
1067 state: Cell::new(State::Created),
1068 prev: Cell::new(Some(key2)),
1069 next: Cell::new(None),
1070 }
1071 );
1072
1073 listeners.notify(GenericNotify::new(2, false, || ()));
1075
1076 assert_eq!(listeners.len, 3);
1077 assert_eq!(listeners.notified, 2);
1078 assert_eq!(listeners.tail, Some(key3));
1079 assert_eq!(listeners.head, Some(key1));
1080 assert_eq!(listeners.start, Some(key3));
1081 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1082 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1083 assert_eq!(
1084 listeners.listeners[1],
1085 Entry::Listener {
1086 state: Cell::new(State::Notified {
1087 additional: false,
1088 tag: (),
1089 }),
1090 prev: Cell::new(None),
1091 next: Cell::new(Some(key2)),
1092 }
1093 );
1094 assert_eq!(
1095 listeners.listeners[2],
1096 Entry::Listener {
1097 state: Cell::new(State::Notified {
1098 additional: false,
1099 tag: (),
1100 }),
1101 prev: Cell::new(Some(key1)),
1102 next: Cell::new(Some(key3)),
1103 }
1104 );
1105 assert_eq!(
1106 listeners.listeners[3],
1107 Entry::Listener {
1108 state: Cell::new(State::Created),
1109 prev: Cell::new(Some(key2)),
1110 next: Cell::new(None),
1111 }
1112 );
1113
1114 assert!(woken.load(Ordering::SeqCst));
1115 assert_eq!(
1116 listeners.register(
1117 Pin::new(&mut Some(Listener::HasNode(key2))),
1118 TaskRef::Waker(&waker)
1119 ),
1120 RegisterResult::Notified(())
1121 );
1122 }
1123
1124 #[test]
1125 fn listener_slab_notify_prop() {
1126 let woken = Arc::new(AtomicBool::new(false));
1127 let waker = waker_fn::waker_fn({
1128 let woken = woken.clone();
1129 move || woken.store(true, Ordering::SeqCst)
1130 });
1131
1132 let mut listeners = ListenerSlab::new();
1133
1134 let key1 = listeners.insert(State::Created);
1136 let key2 = listeners.insert(State::Created);
1137 let key3 = listeners.insert(State::Created);
1138
1139 assert_eq!(
1141 listeners.register(
1142 Pin::new(&mut Some(Listener::HasNode(key2))),
1143 TaskRef::Waker(&waker)
1144 ),
1145 RegisterResult::Registered
1146 );
1147
1148 assert_eq!(listeners.len, 3);
1149 assert_eq!(listeners.notified, 0);
1150 assert_eq!(listeners.tail, Some(key3));
1151 assert_eq!(listeners.head, Some(key1));
1152 assert_eq!(listeners.start, Some(key1));
1153 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1154 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1155 assert_eq!(
1156 listeners.listeners[1],
1157 Entry::Listener {
1158 state: Cell::new(State::Created),
1159 prev: Cell::new(None),
1160 next: Cell::new(Some(key2)),
1161 }
1162 );
1163 assert_eq!(
1164 listeners.listeners[2],
1165 Entry::Listener {
1166 state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1167 prev: Cell::new(Some(key1)),
1168 next: Cell::new(Some(key3)),
1169 }
1170 );
1171 assert_eq!(
1172 listeners.listeners[3],
1173 Entry::Listener {
1174 state: Cell::new(State::Created),
1175 prev: Cell::new(Some(key2)),
1176 next: Cell::new(None),
1177 }
1178 );
1179
1180 listeners.notify(GenericNotify::new(1, false, || ()));
1182
1183 assert_eq!(listeners.len, 3);
1184 assert_eq!(listeners.notified, 1);
1185 assert_eq!(listeners.tail, Some(key3));
1186 assert_eq!(listeners.head, Some(key1));
1187 assert_eq!(listeners.start, Some(key2));
1188 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1189 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1190 assert_eq!(
1191 listeners.listeners[1],
1192 Entry::Listener {
1193 state: Cell::new(State::Notified {
1194 additional: false,
1195 tag: (),
1196 }),
1197 prev: Cell::new(None),
1198 next: Cell::new(Some(key2)),
1199 }
1200 );
1201 assert_eq!(
1202 listeners.listeners[2],
1203 Entry::Listener {
1204 state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1205 prev: Cell::new(Some(key1)),
1206 next: Cell::new(Some(key3)),
1207 }
1208 );
1209 assert_eq!(
1210 listeners.listeners[3],
1211 Entry::Listener {
1212 state: Cell::new(State::Created),
1213 prev: Cell::new(Some(key2)),
1214 next: Cell::new(None),
1215 }
1216 );
1217
1218 listeners.notify(GenericNotify::new(1, false, || ()));
1220
1221 assert_eq!(listeners.len, 3);
1222 assert_eq!(listeners.notified, 1);
1223 assert_eq!(listeners.tail, Some(key3));
1224 assert_eq!(listeners.head, Some(key1));
1225 assert_eq!(listeners.start, Some(key2));
1226 assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1227 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1228 assert_eq!(
1229 listeners.listeners[1],
1230 Entry::Listener {
1231 state: Cell::new(State::Notified {
1232 additional: false,
1233 tag: (),
1234 }),
1235 prev: Cell::new(None),
1236 next: Cell::new(Some(key2)),
1237 }
1238 );
1239 assert_eq!(
1240 listeners.listeners[2],
1241 Entry::Listener {
1242 state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1243 prev: Cell::new(Some(key1)),
1244 next: Cell::new(Some(key3)),
1245 }
1246 );
1247 assert_eq!(
1248 listeners.listeners[3],
1249 Entry::Listener {
1250 state: Cell::new(State::Created),
1251 prev: Cell::new(Some(key2)),
1252 next: Cell::new(None),
1253 }
1254 );
1255
1256 assert_eq!(
1258 listeners.remove(key1, false),
1259 Some(State::Notified {
1260 additional: false,
1261 tag: ()
1262 })
1263 );
1264
1265 assert_eq!(listeners.len, 2);
1266 assert_eq!(listeners.notified, 0);
1267 assert_eq!(listeners.tail, Some(key3));
1268 assert_eq!(listeners.head, Some(key2));
1269 assert_eq!(listeners.start, Some(key2));
1270 assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
1271 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1272 assert_eq!(
1273 listeners.listeners[1],
1274 Entry::Empty(NonZeroUsize::new(4).unwrap())
1275 );
1276 assert_eq!(*listeners.listeners[2].prev(), Cell::new(None));
1277 assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3)));
1278 assert_eq!(
1279 listeners.listeners[3],
1280 Entry::Listener {
1281 state: Cell::new(State::Created),
1282 prev: Cell::new(Some(key2)),
1283 next: Cell::new(None),
1284 }
1285 );
1286
1287 listeners.notify(GenericNotify::new(1, false, || ()));
1289 assert!(woken.load(Ordering::SeqCst));
1290
1291 assert_eq!(listeners.len, 2);
1292 assert_eq!(listeners.notified, 1);
1293 assert_eq!(listeners.tail, Some(key3));
1294 assert_eq!(listeners.head, Some(key2));
1295 assert_eq!(listeners.start, Some(key3));
1296 assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
1297 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1298 assert_eq!(
1299 listeners.listeners[1],
1300 Entry::Empty(NonZeroUsize::new(4).unwrap())
1301 );
1302 assert_eq!(
1303 listeners.listeners[2],
1304 Entry::Listener {
1305 state: Cell::new(State::Notified {
1306 additional: false,
1307 tag: (),
1308 }),
1309 prev: Cell::new(None),
1310 next: Cell::new(Some(key3)),
1311 }
1312 );
1313 assert_eq!(
1314 listeners.listeners[3],
1315 Entry::Listener {
1316 state: Cell::new(State::Created),
1317 prev: Cell::new(Some(key2)),
1318 next: Cell::new(None),
1319 }
1320 );
1321
1322 assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken));
1324
1325 assert_eq!(listeners.len, 1);
1327 assert_eq!(listeners.notified, 1);
1328 assert_eq!(listeners.tail, Some(key3));
1329 assert_eq!(listeners.head, Some(key3));
1330 assert_eq!(listeners.start, None);
1331 assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
1332 assert_eq!(listeners.listeners[0], Entry::Sentinel);
1333 assert_eq!(
1334 listeners.listeners[1],
1335 Entry::Empty(NonZeroUsize::new(4).unwrap())
1336 );
1337 assert_eq!(
1338 listeners.listeners[2],
1339 Entry::Empty(NonZeroUsize::new(1).unwrap())
1340 );
1341 assert_eq!(
1342 listeners.listeners[3],
1343 Entry::Listener {
1344 state: Cell::new(State::Notified {
1345 additional: false,
1346 tag: (),
1347 }),
1348 prev: Cell::new(None),
1349 next: Cell::new(None),
1350 }
1351 );
1352
1353 assert_eq!(
1355 listeners.remove(key3, false),
1356 Some(State::Notified {
1357 additional: false,
1358 tag: ()
1359 })
1360 );
1361 }
1362
1363 #[test]
1364 fn uncontended_inner() {
1365 let inner = crate::Inner::new();
1366
1367 let (mut listener1, mut listener2, mut listener3) = (None, None, None);
1369 inner.insert(Pin::new(&mut listener1));
1370 inner.insert(Pin::new(&mut listener2));
1371 inner.insert(Pin::new(&mut listener3));
1372
1373 assert_eq!(
1374 listener1,
1375 Some(Listener::HasNode(NonZeroUsize::new(1).unwrap()))
1376 );
1377 assert_eq!(
1378 listener2,
1379 Some(Listener::HasNode(NonZeroUsize::new(2).unwrap()))
1380 );
1381
1382 let woken = Arc::new(AtomicBool::new(false));
1384 let waker = waker_fn::waker_fn({
1385 let woken = woken.clone();
1386 move || woken.store(true, Ordering::SeqCst)
1387 });
1388 assert_eq!(
1389 inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)),
1390 RegisterResult::Registered
1391 );
1392
1393 inner.notify(GenericNotify::new(1, false, || ()));
1395 assert!(!woken.load(Ordering::SeqCst));
1396
1397 inner.notify(GenericNotify::new(1, false, || ()));
1399 assert!(!woken.load(Ordering::SeqCst));
1400
1401 assert_eq!(
1403 inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)),
1404 RegisterResult::Notified(())
1405 );
1406
1407 assert!(listener1.is_none());
1409
1410 inner.notify(GenericNotify::new(1, false, || ()));
1412 assert!(woken.load(Ordering::SeqCst));
1413
1414 assert_eq!(
1416 inner.remove(Pin::new(&mut listener2), true),
1417 Some(State::NotifiedTaken)
1418 );
1419
1420 assert!(listener2.is_none());
1422
1423 assert_eq!(
1425 inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)),
1426 RegisterResult::Notified(())
1427 );
1428 }
1429}