event_listener/
slab.rs

1//! Implementation of `event-listener` built exclusively on atomics.
2//!
3//! On `no_std`, we don't have access to `Mutex`, so we can't use intrusive linked lists like the `std`
4//! implementation. Normally, we would use a concurrent atomic queue to store listeners, but benchmarks
5//! show that using queues in this way is very slow, especially for the single threaded use-case.
6//!
7//! We've found that it's easier to assume that the `Event` won't be under high contention in most use
8//! cases. Therefore, we use a spinlock that protects a linked list of listeners, and fall back to an
9//! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std
10//! implementation, but still much faster than using a queue.
11
12#[path = "slab/node.rs"]
13mod node;
14
15use node::{Node, NothingProducer, TaskWaiting};
16
17use crate::notify::{GenericNotify, Internal, Notification};
18use crate::sync::atomic::{AtomicBool, Ordering};
19use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
20use crate::sync::Arc;
21use crate::{RegisterResult, State, Task, TaskRef};
22
23use core::fmt;
24use core::marker::PhantomData;
25use core::mem;
26use core::num::NonZeroUsize;
27use core::ops;
28use core::pin::Pin;
29
30use alloc::vec::Vec;
31
32impl<T> crate::Inner<T> {
33    /// Locks the list.
34    fn try_lock(&self) -> Option<ListGuard<'_, T>> {
35        self.list.inner.try_lock().map(|guard| ListGuard {
36            inner: self,
37            guard: Some(guard),
38            tasks: alloc::vec![],
39        })
40    }
41
42    /// Force a queue update.
43    fn queue_update(&self) {
44        // Locking and unlocking the mutex will drain the queue if there is no contention.
45        drop(self.try_lock());
46    }
47
48    /// Add a new listener to the list.
49    ///
50    /// Does nothing if the list is already registered.
51    pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
52        if listener.as_ref().as_pin_ref().is_some() {
53            // Already inserted.
54            return;
55        }
56
57        match self.try_lock() {
58            Some(mut lock) => {
59                let key = lock.insert(State::Created);
60                *listener = Some(Listener::HasNode(key));
61            }
62
63            None => {
64                // Push it to the queue.
65                let (node, task_waiting) = Node::listener();
66                self.list.queue.push(node).unwrap();
67                *listener = Some(Listener::Queued(task_waiting));
68
69                // Force a queue update.
70                self.queue_update();
71            }
72        }
73    }
74
75    /// Remove a listener from the list.
76    pub(crate) fn remove(
77        &self,
78        mut listener: Pin<&mut Option<Listener<T>>>,
79        propagate: bool,
80    ) -> Option<State<T>> {
81        loop {
82            let state = match listener.as_mut().take() {
83                Some(Listener::HasNode(key)) => {
84                    match self.try_lock() {
85                        Some(mut list) => {
86                            // Fast path removal.
87                            list.remove(key, propagate)
88                        }
89
90                        None => {
91                            // Slow path removal.
92                            // This is why intrusive lists don't work on no_std.
93                            let node = Node::RemoveListener {
94                                listener: key,
95                                propagate,
96                            };
97
98                            self.list.queue.push(node).unwrap();
99
100                            // Force a queue update.
101                            self.queue_update();
102
103                            None
104                        }
105                    }
106                }
107
108                Some(Listener::Queued(tw)) => {
109                    // Make sure it's not added after the queue is drained.
110                    if let Some(key) = tw.cancel() {
111                        // If it was already added, set up our listener and try again.
112                        *listener = Some(Listener::HasNode(key));
113                        continue;
114                    }
115
116                    None
117                }
118
119                None => None,
120
121                _ => unreachable!(),
122            };
123
124            return state;
125        }
126    }
127
128    /// Notifies a number of entries.
129    #[cold]
130    pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
131        match self.try_lock() {
132            Some(mut guard) => {
133                // Notify the listeners.
134                guard.notify(notify)
135            }
136
137            None => {
138                // Push it to the queue.
139                let node = Node::Notify(GenericNotify::new(
140                    notify.count(Internal::new()),
141                    notify.is_additional(Internal::new()),
142                    NothingProducer::default(),
143                ));
144
145                self.list.queue.push(node).unwrap();
146
147                // Force a queue update.
148                self.queue_update();
149
150                // We haven't notified anyone yet.
151                0
152            }
153        }
154    }
155
156    /// Register a task to be notified when the event is triggered.
157    ///
158    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
159    /// isn't inserted, returns `None`.
160    pub(crate) fn register(
161        &self,
162        mut listener: Pin<&mut Option<Listener<T>>>,
163        task: TaskRef<'_>,
164    ) -> RegisterResult<T> {
165        loop {
166            match listener.as_mut().take() {
167                Some(Listener::HasNode(key)) => {
168                    *listener = Some(Listener::HasNode(key));
169                    match self.try_lock() {
170                        Some(mut guard) => {
171                            // Fast path registration.
172                            return guard.register(listener, task);
173                        }
174
175                        None => {
176                            // Wait for the lock.
177                            let node = Node::Waiting(task.into_task());
178                            self.list.queue.push(node).unwrap();
179
180                            // Force a queue update.
181                            self.queue_update();
182
183                            return RegisterResult::Registered;
184                        }
185                    }
186                }
187
188                Some(Listener::Queued(task_waiting)) => {
189                    // Force a queue update.
190                    self.queue_update();
191
192                    // Are we done yet?
193                    match task_waiting.status() {
194                        Some(key) => {
195                            assert!(key.get() != usize::MAX);
196
197                            // We're inserted now, adjust state.
198                            *listener = Some(Listener::HasNode(key));
199                        }
200
201                        None => {
202                            // We're still queued, so register the task.
203                            task_waiting.register(task.into_task());
204                            *listener = Some(Listener::Queued(task_waiting));
205
206                            // Force a queue update.
207                            self.queue_update();
208
209                            return RegisterResult::Registered;
210                        }
211                    }
212                }
213
214                None => return RegisterResult::NeverInserted,
215
216                _ => unreachable!(),
217            }
218        }
219    }
220}
221
222#[derive(Debug)]
223pub(crate) struct List<T> {
224    /// The inner list.
225    inner: Mutex<ListenerSlab<T>>,
226
227    /// The queue of pending operations.
228    queue: concurrent_queue::ConcurrentQueue<Node<T>>,
229}
230
231impl<T> List<T> {
232    pub(super) fn new() -> List<T> {
233        List {
234            inner: Mutex::new(ListenerSlab::new()),
235            queue: concurrent_queue::ConcurrentQueue::unbounded(),
236        }
237    }
238
239    /// Try to get the total number of listeners without blocking.
240    pub(super) fn try_total_listeners(&self) -> Option<usize> {
241        self.inner.try_lock().map(|lock| lock.listeners.len())
242    }
243}
244
245/// The guard returned by [`Inner::lock`].
246pub(crate) struct ListGuard<'a, T> {
247    /// Reference to the inner state.
248    pub(crate) inner: &'a crate::Inner<T>,
249
250    /// The locked list.
251    pub(crate) guard: Option<MutexGuard<'a, ListenerSlab<T>>>,
252
253    /// Tasks to wake up once this guard is dropped.
254    tasks: Vec<Task>,
255}
256
257impl<T> ListGuard<'_, T> {
258    #[cold]
259    fn process_nodes_slow(&mut self, start_node: Node<T>) {
260        let guard = self.guard.as_mut().unwrap();
261
262        // Process the start node.
263        self.tasks.extend(start_node.apply(guard));
264
265        // Process all remaining nodes.
266        while let Ok(node) = self.inner.list.queue.pop() {
267            self.tasks.extend(node.apply(guard));
268        }
269    }
270
271    #[inline]
272    fn process_nodes(&mut self) {
273        // Process every node left in the queue.
274        if let Ok(start_node) = self.inner.list.queue.pop() {
275            self.process_nodes_slow(start_node);
276        }
277    }
278}
279
280impl<T> ops::Deref for ListGuard<'_, T> {
281    type Target = ListenerSlab<T>;
282
283    fn deref(&self) -> &Self::Target {
284        self.guard.as_ref().unwrap()
285    }
286}
287
288impl<T> ops::DerefMut for ListGuard<'_, T> {
289    fn deref_mut(&mut self) -> &mut Self::Target {
290        self.guard.as_mut().unwrap()
291    }
292}
293
294impl<T> Drop for ListGuard<'_, T> {
295    fn drop(&mut self) {
296        while self.guard.is_some() {
297            // Process every node left in the queue.
298            self.process_nodes();
299
300            // Update the atomic `notified` counter.
301            let list = self.guard.take().unwrap();
302            let notified = if list.notified < list.len {
303                list.notified
304            } else {
305                usize::MAX
306            };
307
308            self.inner.notified.store(notified, Ordering::Release);
309
310            // Drop the actual lock.
311            drop(list);
312
313            // Wakeup all tasks.
314            for task in self.tasks.drain(..) {
315                task.wake();
316            }
317
318            // There is a deadlock where a node is pushed to the end of the queue after we've finished
319            // process_nodes() but before we've finished dropping the lock. This can lead to some
320            // notifications not being properly delivered, or listeners not being added to the list.
321            // Therefore check before we finish dropping if there is anything left in the queue, and
322            // if so, lock it again and force a queue update.
323            if !self.inner.list.queue.is_empty() {
324                self.guard = self.inner.list.inner.try_lock();
325            }
326        }
327    }
328}
329
330/// An entry representing a registered listener.
331enum Entry<T> {
332    /// Contains the listener state.
333    Listener {
334        /// The state of the listener.
335        state: Cell<State<T>>,
336
337        /// The previous listener in the list.
338        prev: Cell<Option<NonZeroUsize>>,
339
340        /// The next listener in the list.
341        next: Cell<Option<NonZeroUsize>>,
342    },
343
344    /// An empty slot that contains the index of the next empty slot.
345    Empty(NonZeroUsize),
346
347    /// Sentinel value.
348    Sentinel,
349}
350
351struct TakenState<'a, T> {
352    slot: &'a Cell<State<T>>,
353    state: State<T>,
354}
355
356impl<T> Drop for TakenState<'_, T> {
357    fn drop(&mut self) {
358        self.slot
359            .set(mem::replace(&mut self.state, State::NotifiedTaken));
360    }
361}
362
363impl<T> fmt::Debug for TakenState<'_, T> {
364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365        fmt::Debug::fmt(&self.state, f)
366    }
367}
368
369impl<T: PartialEq> PartialEq for TakenState<'_, T> {
370    fn eq(&self, other: &Self) -> bool {
371        self.state == other.state
372    }
373}
374
375impl<'a, T> TakenState<'a, T> {
376    fn new(slot: &'a Cell<State<T>>) -> Self {
377        let state = slot.replace(State::NotifiedTaken);
378        Self { slot, state }
379    }
380}
381
382impl<T> fmt::Debug for Entry<T> {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        match self {
385            Entry::Listener { state, next, prev } => f
386                .debug_struct("Listener")
387                .field("state", &TakenState::new(state))
388                .field("prev", prev)
389                .field("next", next)
390                .finish(),
391            Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(),
392            Entry::Sentinel => f.debug_tuple("Sentinel").finish(),
393        }
394    }
395}
396
397impl<T: PartialEq> PartialEq for Entry<T> {
398    fn eq(&self, other: &Entry<T>) -> bool {
399        match (self, other) {
400            (
401                Self::Listener {
402                    state: state1,
403                    prev: prev1,
404                    next: next1,
405                },
406                Self::Listener {
407                    state: state2,
408                    prev: prev2,
409                    next: next2,
410                },
411            ) => {
412                if TakenState::new(state1) != TakenState::new(state2) {
413                    return false;
414                }
415
416                prev1.get() == prev2.get() && next1.get() == next2.get()
417            }
418            (Self::Empty(next1), Self::Empty(next2)) => next1 == next2,
419            (Self::Sentinel, Self::Sentinel) => true,
420            _ => false,
421        }
422    }
423}
424
425impl<T> Entry<T> {
426    fn state(&self) -> &Cell<State<T>> {
427        match self {
428            Entry::Listener { state, .. } => state,
429            _ => unreachable!(),
430        }
431    }
432
433    fn prev(&self) -> &Cell<Option<NonZeroUsize>> {
434        match self {
435            Entry::Listener { prev, .. } => prev,
436            _ => unreachable!(),
437        }
438    }
439
440    fn next(&self) -> &Cell<Option<NonZeroUsize>> {
441        match self {
442            Entry::Listener { next, .. } => next,
443            _ => unreachable!(),
444        }
445    }
446}
447
448/// A linked list of entries.
449pub(crate) struct ListenerSlab<T> {
450    /// The raw list of entries.
451    listeners: Vec<Entry<T>>,
452
453    /// First entry in the list.
454    head: Option<NonZeroUsize>,
455
456    /// Last entry in the list.
457    tail: Option<NonZeroUsize>,
458
459    /// The first unnotified entry in the list.
460    start: Option<NonZeroUsize>,
461
462    /// The number of notified entries in the list.
463    notified: usize,
464
465    /// The total number of listeners.
466    len: usize,
467
468    /// The index of the first `Empty` entry, or the length of the list plus one if there
469    /// are no empty entries.
470    first_empty: NonZeroUsize,
471}
472
473impl<T> fmt::Debug for ListenerSlab<T> {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        f.debug_struct("ListenerSlab")
476            .field("listeners", &self.listeners)
477            .field("head", &self.head)
478            .field("tail", &self.tail)
479            .field("start", &self.start)
480            .field("notified", &self.notified)
481            .field("len", &self.len)
482            .field("first_empty", &self.first_empty)
483            .finish()
484    }
485}
486
487impl<T> ListenerSlab<T> {
488    /// Create a new, empty list.
489    pub(crate) fn new() -> Self {
490        Self {
491            listeners: alloc::vec![Entry::Sentinel],
492            head: None,
493            tail: None,
494            start: None,
495            notified: 0,
496            len: 0,
497            first_empty: unsafe { NonZeroUsize::new_unchecked(1) },
498        }
499    }
500
501    /// Inserts a new entry into the list.
502    pub(crate) fn insert(&mut self, state: State<T>) -> NonZeroUsize {
503        // Add the new entry into the list.
504        let key = {
505            let entry = Entry::Listener {
506                state: Cell::new(state),
507                prev: Cell::new(self.tail),
508                next: Cell::new(None),
509            };
510
511            let key = self.first_empty;
512            if self.first_empty.get() == self.listeners.len() {
513                // No empty entries, so add a new entry.
514                self.listeners.push(entry);
515
516                // SAFETY: Guaranteed to not overflow, since the Vec would have panicked already.
517                self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) };
518            } else {
519                // There is an empty entry, so replace it.
520                let slot = &mut self.listeners[key.get()];
521                let next = match mem::replace(slot, entry) {
522                    Entry::Empty(next) => next,
523                    _ => unreachable!(),
524                };
525
526                self.first_empty = next;
527            }
528
529            key
530        };
531
532        // Replace the tail with the new entry.
533        match mem::replace(&mut self.tail, Some(key)) {
534            None => self.head = Some(key),
535            Some(tail) => {
536                let tail = &self.listeners[tail.get()];
537                tail.next().set(Some(key));
538            }
539        }
540
541        // If there are no listeners that have been notified, then the new listener is the next
542        // listener to be notified.
543        if self.start.is_none() {
544            self.start = Some(key);
545        }
546
547        // Increment the length.
548        self.len += 1;
549
550        key
551    }
552
553    /// Removes an entry from the list and returns its state.
554    pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option<State<T>> {
555        let entry = &self.listeners[key.get()];
556        let prev = entry.prev().get();
557        let next = entry.next().get();
558
559        // Unlink from the previous entry.
560        match prev {
561            None => self.head = next,
562            Some(p) => self.listeners[p.get()].next().set(next),
563        }
564
565        // Unlink from the next entry.
566        match next {
567            None => self.tail = prev,
568            Some(n) => self.listeners[n.get()].prev().set(prev),
569        }
570
571        // If this was the first unnotified entry, move the pointer to the next one.
572        if self.start == Some(key) {
573            self.start = next;
574        }
575
576        // Extract the state.
577        let entry = mem::replace(
578            &mut self.listeners[key.get()],
579            Entry::Empty(self.first_empty),
580        );
581        self.first_empty = key;
582
583        let mut state = match entry {
584            Entry::Listener { state, .. } => state.into_inner(),
585            _ => unreachable!(),
586        };
587
588        // Update the counters.
589        if state.is_notified() {
590            self.notified = self.notified.saturating_sub(1);
591
592            if propagate {
593                // Propagate the notification to the next entry.
594                let state = mem::replace(&mut state, State::NotifiedTaken);
595                if let State::Notified { tag, additional } = state {
596                    let tags = {
597                        let mut tag = Some(tag);
598                        move || tag.take().expect("called more than once")
599                    };
600
601                    self.notify(GenericNotify::new(1, additional, tags));
602                }
603            }
604        }
605        self.len -= 1;
606
607        Some(state)
608    }
609
610    /// Notifies a number of listeners.
611    #[cold]
612    pub(crate) fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
613        let mut n = notify.count(Internal::new());
614        let is_additional = notify.is_additional(Internal::new());
615        if !is_additional {
616            // Make sure we're not notifying more than we have.
617            if n <= self.notified {
618                return 0;
619            }
620            n -= self.notified;
621        }
622
623        let original_count = n;
624        while n > 0 {
625            n -= 1;
626
627            // Notify the next entry.
628            match self.start {
629                None => return original_count - n - 1,
630
631                Some(e) => {
632                    // Get the entry and move the pointer forwards.
633                    let entry = &self.listeners[e.get()];
634                    self.start = entry.next().get();
635
636                    // Set the state to `Notified` and notify.
637                    let tag = notify.next_tag(Internal::new());
638                    if let State::Task(task) = entry.state().replace(State::Notified {
639                        tag,
640                        additional: is_additional,
641                    }) {
642                        task.wake();
643                    }
644
645                    // Bump the notified count.
646                    self.notified += 1;
647                }
648            }
649        }
650
651        original_count - n
652    }
653
654    /// Register a task to be notified when the event is triggered.
655    ///
656    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
657    /// isn't inserted, returns `None`.
658    pub(crate) fn register(
659        &mut self,
660        mut listener: Pin<&mut Option<Listener<T>>>,
661        task: TaskRef<'_>,
662    ) -> RegisterResult<T> {
663        let key = match *listener {
664            Some(Listener::HasNode(key)) => key,
665            _ => return RegisterResult::NeverInserted,
666        };
667
668        let entry = &self.listeners[key.get()];
669
670        // Take the state out and check it.
671        match entry.state().replace(State::NotifiedTaken) {
672            State::Notified { tag, .. } => {
673                // The listener was already notified, so we don't need to do anything.
674                self.remove(key, false);
675                *listener = None;
676                RegisterResult::Notified(tag)
677            }
678
679            State::Task(other_task) => {
680                // Only replace the task if it's not the same as the one we're registering.
681                if task.will_wake(other_task.as_task_ref()) {
682                    entry.state().set(State::Task(other_task));
683                } else {
684                    entry.state().set(State::Task(task.into_task()));
685                }
686
687                RegisterResult::Registered
688            }
689
690            _ => {
691                // Register the task.
692                entry.state().set(State::Task(task.into_task()));
693                RegisterResult::Registered
694            }
695        }
696    }
697}
698
699pub(crate) enum Listener<T> {
700    /// The listener has a node inside of the linked list.
701    HasNode(NonZeroUsize),
702
703    /// The listener has an entry in the queue that may or may not have a task waiting.
704    Queued(Arc<TaskWaiting>),
705
706    /// Eat the generic type for consistency.
707    _EatGenericType(PhantomData<T>),
708}
709
710impl<T> fmt::Debug for Listener<T> {
711    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
712        match self {
713            Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(),
714            Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(),
715            Self::_EatGenericType(_) => unreachable!(),
716        }
717    }
718}
719
720impl<T> Unpin for Listener<T> {}
721
722impl<T> PartialEq for Listener<T> {
723    fn eq(&self, other: &Self) -> bool {
724        match (self, other) {
725            (Self::HasNode(a), Self::HasNode(b)) => a == b,
726            (Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b),
727            _ => false,
728        }
729    }
730}
731
732/// A simple mutex type that optimistically assumes that the lock is uncontended.
733pub(crate) struct Mutex<T> {
734    /// The inner value.
735    value: UnsafeCell<T>,
736
737    /// Whether the mutex is locked.
738    locked: AtomicBool,
739}
740
741impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
742    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
743        if let Some(lock) = self.try_lock() {
744            f.debug_tuple("Mutex").field(&*lock).finish()
745        } else {
746            f.write_str("Mutex { <locked> }")
747        }
748    }
749}
750
751impl<T> Mutex<T> {
752    /// Create a new mutex.
753    pub(crate) fn new(value: T) -> Self {
754        Self {
755            value: UnsafeCell::new(value),
756            locked: AtomicBool::new(false),
757        }
758    }
759
760    /// Lock the mutex.
761    pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
762        // Try to lock the mutex.
763        if self
764            .locked
765            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
766            .is_ok()
767        {
768            // We have successfully locked the mutex.
769            Some(MutexGuard {
770                mutex: self,
771                guard: self.value.get(),
772            })
773        } else {
774            self.try_lock_slow()
775        }
776    }
777
778    #[cold]
779    fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
780        // Assume that the contention is short-term.
781        // Spin for a while to see if the mutex becomes unlocked.
782        let mut spins = 100u32;
783
784        loop {
785            if self
786                .locked
787                .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
788                .is_ok()
789            {
790                // We have successfully locked the mutex.
791                return Some(MutexGuard {
792                    mutex: self,
793                    guard: self.value.get(),
794                });
795            }
796
797            // Use atomic loads instead of compare-exchange.
798            while self.locked.load(Ordering::Relaxed) {
799                // Return None once we've exhausted the number of spins.
800                spins = spins.checked_sub(1)?;
801            }
802        }
803    }
804}
805
806pub(crate) struct MutexGuard<'a, T> {
807    mutex: &'a Mutex<T>,
808    guard: ConstPtr<T>,
809}
810
811impl<'a, T> Drop for MutexGuard<'a, T> {
812    fn drop(&mut self) {
813        self.mutex.locked.store(false, Ordering::Release);
814    }
815}
816
817impl<'a, T> ops::Deref for MutexGuard<'a, T> {
818    type Target = T;
819
820    fn deref(&self) -> &T {
821        unsafe { self.guard.deref() }
822    }
823}
824
825impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
826    fn deref_mut(&mut self) -> &mut T {
827        unsafe { self.guard.deref_mut() }
828    }
829}
830
831unsafe impl<T: Send> Send for Mutex<T> {}
832unsafe impl<T: Send> Sync for Mutex<T> {}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837
838    #[cfg(target_family = "wasm")]
839    use wasm_bindgen_test::wasm_bindgen_test as test;
840
841    #[test]
842    fn smoke_mutex() {
843        let mutex = Mutex::new(0);
844
845        {
846            let mut guard = mutex.try_lock().unwrap();
847            *guard += 1;
848        }
849
850        {
851            let mut guard = mutex.try_lock().unwrap();
852            *guard += 1;
853        }
854
855        let guard = mutex.try_lock().unwrap();
856        assert_eq!(*guard, 2);
857    }
858
859    #[test]
860    fn smoke_listener_slab() {
861        let mut listeners = ListenerSlab::<()>::new();
862
863        // Insert a few listeners.
864        let key1 = listeners.insert(State::Created);
865        let key2 = listeners.insert(State::Created);
866        let key3 = listeners.insert(State::Created);
867
868        assert_eq!(listeners.len, 3);
869        assert_eq!(listeners.notified, 0);
870        assert_eq!(listeners.tail, Some(key3));
871        assert_eq!(listeners.head, Some(key1));
872        assert_eq!(listeners.start, Some(key1));
873        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
874        assert_eq!(listeners.listeners[0], Entry::Sentinel);
875        assert_eq!(
876            listeners.listeners[1],
877            Entry::Listener {
878                state: Cell::new(State::Created),
879                prev: Cell::new(None),
880                next: Cell::new(Some(key2)),
881            }
882        );
883        assert_eq!(
884            listeners.listeners[2],
885            Entry::Listener {
886                state: Cell::new(State::Created),
887                prev: Cell::new(Some(key1)),
888                next: Cell::new(Some(key3)),
889            }
890        );
891        assert_eq!(
892            listeners.listeners[3],
893            Entry::Listener {
894                state: Cell::new(State::Created),
895                prev: Cell::new(Some(key2)),
896                next: Cell::new(None),
897            }
898        );
899
900        // Remove one.
901        assert_eq!(listeners.remove(key2, false), Some(State::Created));
902
903        assert_eq!(listeners.len, 2);
904        assert_eq!(listeners.notified, 0);
905        assert_eq!(listeners.tail, Some(key3));
906        assert_eq!(listeners.head, Some(key1));
907        assert_eq!(listeners.start, Some(key1));
908        assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
909        assert_eq!(listeners.listeners[0], Entry::Sentinel);
910        assert_eq!(
911            listeners.listeners[1],
912            Entry::Listener {
913                state: Cell::new(State::Created),
914                prev: Cell::new(None),
915                next: Cell::new(Some(key3)),
916            }
917        );
918        assert_eq!(
919            listeners.listeners[2],
920            Entry::Empty(NonZeroUsize::new(4).unwrap())
921        );
922        assert_eq!(
923            listeners.listeners[3],
924            Entry::Listener {
925                state: Cell::new(State::Created),
926                prev: Cell::new(Some(key1)),
927                next: Cell::new(None),
928            }
929        );
930    }
931
932    #[test]
933    fn listener_slab_notify() {
934        let mut listeners = ListenerSlab::new();
935
936        // Insert a few listeners.
937        let key1 = listeners.insert(State::Created);
938        let key2 = listeners.insert(State::Created);
939        let key3 = listeners.insert(State::Created);
940
941        // Notify one.
942        listeners.notify(GenericNotify::new(1, true, || ()));
943
944        assert_eq!(listeners.len, 3);
945        assert_eq!(listeners.notified, 1);
946        assert_eq!(listeners.tail, Some(key3));
947        assert_eq!(listeners.head, Some(key1));
948        assert_eq!(listeners.start, Some(key2));
949        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
950        assert_eq!(listeners.listeners[0], Entry::Sentinel);
951        assert_eq!(
952            listeners.listeners[1],
953            Entry::Listener {
954                state: Cell::new(State::Notified {
955                    additional: true,
956                    tag: ()
957                }),
958                prev: Cell::new(None),
959                next: Cell::new(Some(key2)),
960            }
961        );
962        assert_eq!(
963            listeners.listeners[2],
964            Entry::Listener {
965                state: Cell::new(State::Created),
966                prev: Cell::new(Some(key1)),
967                next: Cell::new(Some(key3)),
968            }
969        );
970        assert_eq!(
971            listeners.listeners[3],
972            Entry::Listener {
973                state: Cell::new(State::Created),
974                prev: Cell::new(Some(key2)),
975                next: Cell::new(None),
976            }
977        );
978
979        // Remove the notified listener.
980        assert_eq!(
981            listeners.remove(key1, false),
982            Some(State::Notified {
983                additional: true,
984                tag: ()
985            })
986        );
987
988        assert_eq!(listeners.len, 2);
989        assert_eq!(listeners.notified, 0);
990        assert_eq!(listeners.tail, Some(key3));
991        assert_eq!(listeners.head, Some(key2));
992        assert_eq!(listeners.start, Some(key2));
993        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
994        assert_eq!(listeners.listeners[0], Entry::Sentinel);
995        assert_eq!(
996            listeners.listeners[1],
997            Entry::Empty(NonZeroUsize::new(4).unwrap())
998        );
999        assert_eq!(
1000            listeners.listeners[2],
1001            Entry::Listener {
1002                state: Cell::new(State::Created),
1003                prev: Cell::new(None),
1004                next: Cell::new(Some(key3)),
1005            }
1006        );
1007        assert_eq!(
1008            listeners.listeners[3],
1009            Entry::Listener {
1010                state: Cell::new(State::Created),
1011                prev: Cell::new(Some(key2)),
1012                next: Cell::new(None),
1013            }
1014        );
1015    }
1016
1017    #[test]
1018    fn listener_slab_register() {
1019        let woken = Arc::new(AtomicBool::new(false));
1020        let waker = waker_fn::waker_fn({
1021            let woken = woken.clone();
1022            move || woken.store(true, Ordering::SeqCst)
1023        });
1024
1025        let mut listeners = ListenerSlab::new();
1026
1027        // Insert a few listeners.
1028        let key1 = listeners.insert(State::Created);
1029        let key2 = listeners.insert(State::Created);
1030        let key3 = listeners.insert(State::Created);
1031
1032        // Register one.
1033        assert_eq!(
1034            listeners.register(
1035                Pin::new(&mut Some(Listener::HasNode(key2))),
1036                TaskRef::Waker(&waker)
1037            ),
1038            RegisterResult::Registered
1039        );
1040
1041        assert_eq!(listeners.len, 3);
1042        assert_eq!(listeners.notified, 0);
1043        assert_eq!(listeners.tail, Some(key3));
1044        assert_eq!(listeners.head, Some(key1));
1045        assert_eq!(listeners.start, Some(key1));
1046        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1047        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1048        assert_eq!(
1049            listeners.listeners[1],
1050            Entry::Listener {
1051                state: Cell::new(State::Created),
1052                prev: Cell::new(None),
1053                next: Cell::new(Some(key2)),
1054            }
1055        );
1056        assert_eq!(
1057            listeners.listeners[2],
1058            Entry::Listener {
1059                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1060                prev: Cell::new(Some(key1)),
1061                next: Cell::new(Some(key3)),
1062            }
1063        );
1064        assert_eq!(
1065            listeners.listeners[3],
1066            Entry::Listener {
1067                state: Cell::new(State::Created),
1068                prev: Cell::new(Some(key2)),
1069                next: Cell::new(None),
1070            }
1071        );
1072
1073        // Notify the listener.
1074        listeners.notify(GenericNotify::new(2, false, || ()));
1075
1076        assert_eq!(listeners.len, 3);
1077        assert_eq!(listeners.notified, 2);
1078        assert_eq!(listeners.tail, Some(key3));
1079        assert_eq!(listeners.head, Some(key1));
1080        assert_eq!(listeners.start, Some(key3));
1081        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1082        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1083        assert_eq!(
1084            listeners.listeners[1],
1085            Entry::Listener {
1086                state: Cell::new(State::Notified {
1087                    additional: false,
1088                    tag: (),
1089                }),
1090                prev: Cell::new(None),
1091                next: Cell::new(Some(key2)),
1092            }
1093        );
1094        assert_eq!(
1095            listeners.listeners[2],
1096            Entry::Listener {
1097                state: Cell::new(State::Notified {
1098                    additional: false,
1099                    tag: (),
1100                }),
1101                prev: Cell::new(Some(key1)),
1102                next: Cell::new(Some(key3)),
1103            }
1104        );
1105        assert_eq!(
1106            listeners.listeners[3],
1107            Entry::Listener {
1108                state: Cell::new(State::Created),
1109                prev: Cell::new(Some(key2)),
1110                next: Cell::new(None),
1111            }
1112        );
1113
1114        assert!(woken.load(Ordering::SeqCst));
1115        assert_eq!(
1116            listeners.register(
1117                Pin::new(&mut Some(Listener::HasNode(key2))),
1118                TaskRef::Waker(&waker)
1119            ),
1120            RegisterResult::Notified(())
1121        );
1122    }
1123
1124    #[test]
1125    fn listener_slab_notify_prop() {
1126        let woken = Arc::new(AtomicBool::new(false));
1127        let waker = waker_fn::waker_fn({
1128            let woken = woken.clone();
1129            move || woken.store(true, Ordering::SeqCst)
1130        });
1131
1132        let mut listeners = ListenerSlab::new();
1133
1134        // Insert a few listeners.
1135        let key1 = listeners.insert(State::Created);
1136        let key2 = listeners.insert(State::Created);
1137        let key3 = listeners.insert(State::Created);
1138
1139        // Register one.
1140        assert_eq!(
1141            listeners.register(
1142                Pin::new(&mut Some(Listener::HasNode(key2))),
1143                TaskRef::Waker(&waker)
1144            ),
1145            RegisterResult::Registered
1146        );
1147
1148        assert_eq!(listeners.len, 3);
1149        assert_eq!(listeners.notified, 0);
1150        assert_eq!(listeners.tail, Some(key3));
1151        assert_eq!(listeners.head, Some(key1));
1152        assert_eq!(listeners.start, Some(key1));
1153        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1154        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1155        assert_eq!(
1156            listeners.listeners[1],
1157            Entry::Listener {
1158                state: Cell::new(State::Created),
1159                prev: Cell::new(None),
1160                next: Cell::new(Some(key2)),
1161            }
1162        );
1163        assert_eq!(
1164            listeners.listeners[2],
1165            Entry::Listener {
1166                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1167                prev: Cell::new(Some(key1)),
1168                next: Cell::new(Some(key3)),
1169            }
1170        );
1171        assert_eq!(
1172            listeners.listeners[3],
1173            Entry::Listener {
1174                state: Cell::new(State::Created),
1175                prev: Cell::new(Some(key2)),
1176                next: Cell::new(None),
1177            }
1178        );
1179
1180        // Notify the first listener.
1181        listeners.notify(GenericNotify::new(1, false, || ()));
1182
1183        assert_eq!(listeners.len, 3);
1184        assert_eq!(listeners.notified, 1);
1185        assert_eq!(listeners.tail, Some(key3));
1186        assert_eq!(listeners.head, Some(key1));
1187        assert_eq!(listeners.start, Some(key2));
1188        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1189        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1190        assert_eq!(
1191            listeners.listeners[1],
1192            Entry::Listener {
1193                state: Cell::new(State::Notified {
1194                    additional: false,
1195                    tag: (),
1196                }),
1197                prev: Cell::new(None),
1198                next: Cell::new(Some(key2)),
1199            }
1200        );
1201        assert_eq!(
1202            listeners.listeners[2],
1203            Entry::Listener {
1204                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1205                prev: Cell::new(Some(key1)),
1206                next: Cell::new(Some(key3)),
1207            }
1208        );
1209        assert_eq!(
1210            listeners.listeners[3],
1211            Entry::Listener {
1212                state: Cell::new(State::Created),
1213                prev: Cell::new(Some(key2)),
1214                next: Cell::new(None),
1215            }
1216        );
1217
1218        // Calling notify again should not change anything.
1219        listeners.notify(GenericNotify::new(1, false, || ()));
1220
1221        assert_eq!(listeners.len, 3);
1222        assert_eq!(listeners.notified, 1);
1223        assert_eq!(listeners.tail, Some(key3));
1224        assert_eq!(listeners.head, Some(key1));
1225        assert_eq!(listeners.start, Some(key2));
1226        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
1227        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1228        assert_eq!(
1229            listeners.listeners[1],
1230            Entry::Listener {
1231                state: Cell::new(State::Notified {
1232                    additional: false,
1233                    tag: (),
1234                }),
1235                prev: Cell::new(None),
1236                next: Cell::new(Some(key2)),
1237            }
1238        );
1239        assert_eq!(
1240            listeners.listeners[2],
1241            Entry::Listener {
1242                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
1243                prev: Cell::new(Some(key1)),
1244                next: Cell::new(Some(key3)),
1245            }
1246        );
1247        assert_eq!(
1248            listeners.listeners[3],
1249            Entry::Listener {
1250                state: Cell::new(State::Created),
1251                prev: Cell::new(Some(key2)),
1252                next: Cell::new(None),
1253            }
1254        );
1255
1256        // Remove the first listener.
1257        assert_eq!(
1258            listeners.remove(key1, false),
1259            Some(State::Notified {
1260                additional: false,
1261                tag: ()
1262            })
1263        );
1264
1265        assert_eq!(listeners.len, 2);
1266        assert_eq!(listeners.notified, 0);
1267        assert_eq!(listeners.tail, Some(key3));
1268        assert_eq!(listeners.head, Some(key2));
1269        assert_eq!(listeners.start, Some(key2));
1270        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
1271        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1272        assert_eq!(
1273            listeners.listeners[1],
1274            Entry::Empty(NonZeroUsize::new(4).unwrap())
1275        );
1276        assert_eq!(*listeners.listeners[2].prev(), Cell::new(None));
1277        assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3)));
1278        assert_eq!(
1279            listeners.listeners[3],
1280            Entry::Listener {
1281                state: Cell::new(State::Created),
1282                prev: Cell::new(Some(key2)),
1283                next: Cell::new(None),
1284            }
1285        );
1286
1287        // Notify the second listener.
1288        listeners.notify(GenericNotify::new(1, false, || ()));
1289        assert!(woken.load(Ordering::SeqCst));
1290
1291        assert_eq!(listeners.len, 2);
1292        assert_eq!(listeners.notified, 1);
1293        assert_eq!(listeners.tail, Some(key3));
1294        assert_eq!(listeners.head, Some(key2));
1295        assert_eq!(listeners.start, Some(key3));
1296        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
1297        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1298        assert_eq!(
1299            listeners.listeners[1],
1300            Entry::Empty(NonZeroUsize::new(4).unwrap())
1301        );
1302        assert_eq!(
1303            listeners.listeners[2],
1304            Entry::Listener {
1305                state: Cell::new(State::Notified {
1306                    additional: false,
1307                    tag: (),
1308                }),
1309                prev: Cell::new(None),
1310                next: Cell::new(Some(key3)),
1311            }
1312        );
1313        assert_eq!(
1314            listeners.listeners[3],
1315            Entry::Listener {
1316                state: Cell::new(State::Created),
1317                prev: Cell::new(Some(key2)),
1318                next: Cell::new(None),
1319            }
1320        );
1321
1322        // Remove and propagate the second listener.
1323        assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken));
1324
1325        // The third listener should be notified.
1326        assert_eq!(listeners.len, 1);
1327        assert_eq!(listeners.notified, 1);
1328        assert_eq!(listeners.tail, Some(key3));
1329        assert_eq!(listeners.head, Some(key3));
1330        assert_eq!(listeners.start, None);
1331        assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
1332        assert_eq!(listeners.listeners[0], Entry::Sentinel);
1333        assert_eq!(
1334            listeners.listeners[1],
1335            Entry::Empty(NonZeroUsize::new(4).unwrap())
1336        );
1337        assert_eq!(
1338            listeners.listeners[2],
1339            Entry::Empty(NonZeroUsize::new(1).unwrap())
1340        );
1341        assert_eq!(
1342            listeners.listeners[3],
1343            Entry::Listener {
1344                state: Cell::new(State::Notified {
1345                    additional: false,
1346                    tag: (),
1347                }),
1348                prev: Cell::new(None),
1349                next: Cell::new(None),
1350            }
1351        );
1352
1353        // Remove the third listener.
1354        assert_eq!(
1355            listeners.remove(key3, false),
1356            Some(State::Notified {
1357                additional: false,
1358                tag: ()
1359            })
1360        );
1361    }
1362
1363    #[test]
1364    fn uncontended_inner() {
1365        let inner = crate::Inner::new();
1366
1367        // Register two listeners.
1368        let (mut listener1, mut listener2, mut listener3) = (None, None, None);
1369        inner.insert(Pin::new(&mut listener1));
1370        inner.insert(Pin::new(&mut listener2));
1371        inner.insert(Pin::new(&mut listener3));
1372
1373        assert_eq!(
1374            listener1,
1375            Some(Listener::HasNode(NonZeroUsize::new(1).unwrap()))
1376        );
1377        assert_eq!(
1378            listener2,
1379            Some(Listener::HasNode(NonZeroUsize::new(2).unwrap()))
1380        );
1381
1382        // Register a waker in the second listener.
1383        let woken = Arc::new(AtomicBool::new(false));
1384        let waker = waker_fn::waker_fn({
1385            let woken = woken.clone();
1386            move || woken.store(true, Ordering::SeqCst)
1387        });
1388        assert_eq!(
1389            inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)),
1390            RegisterResult::Registered
1391        );
1392
1393        // Notify the first listener.
1394        inner.notify(GenericNotify::new(1, false, || ()));
1395        assert!(!woken.load(Ordering::SeqCst));
1396
1397        // Another notify should do nothing.
1398        inner.notify(GenericNotify::new(1, false, || ()));
1399        assert!(!woken.load(Ordering::SeqCst));
1400
1401        // Receive the notification.
1402        assert_eq!(
1403            inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)),
1404            RegisterResult::Notified(())
1405        );
1406
1407        // First listener is already removed.
1408        assert!(listener1.is_none());
1409
1410        // Notify the second listener.
1411        inner.notify(GenericNotify::new(1, false, || ()));
1412        assert!(woken.load(Ordering::SeqCst));
1413
1414        // Remove the second listener and propagate the notification.
1415        assert_eq!(
1416            inner.remove(Pin::new(&mut listener2), true),
1417            Some(State::NotifiedTaken)
1418        );
1419
1420        // Second listener is already removed.
1421        assert!(listener2.is_none());
1422
1423        // Third listener should be notified.
1424        assert_eq!(
1425            inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)),
1426            RegisterResult::Notified(())
1427        );
1428    }
1429}