event_listener/slab/
node.rs

1//! An operation that can be delayed.
2
3//! The node that makes up queues.
4
5use crate::notify::{GenericNotify, Internal, NotificationPrivate, TagProducer};
6use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
7use crate::sync::Arc;
8use crate::sys::ListenerSlab;
9use crate::{State, Task};
10
11use alloc::boxed::Box;
12
13use core::fmt;
14use core::marker::PhantomData;
15use core::mem;
16use core::num::NonZeroUsize;
17use core::ptr;
18
19pub(crate) struct NothingProducer<T>(PhantomData<T>);
20
21impl<T> Default for NothingProducer<T> {
22    fn default() -> Self {
23        Self(PhantomData)
24    }
25}
26
27impl<T> fmt::Debug for NothingProducer<T> {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("NothingProducer").finish()
30    }
31}
32
33impl<T> TagProducer for NothingProducer<T> {
34    type Tag = T;
35
36    fn next_tag(&mut self) -> Self::Tag {
37        // This has to be a zero-sized type with no drop handler.
38        assert_eq!(mem::size_of::<Self::Tag>(), 0);
39        assert!(!mem::needs_drop::<Self::Tag>());
40
41        // SAFETY: As this is a ZST without a drop handler, zero is valid.
42        unsafe { mem::zeroed() }
43    }
44}
45
46/// A node in the backup queue.
47pub(crate) enum Node<T> {
48    /// This node is requesting to add a listener.
49    // For some reason, the MSRV build says this variant is never constructed.
50    #[allow(dead_code)]
51    AddListener {
52        /// The state of the listener that wants to be added.
53        task_waiting: Arc<TaskWaiting>,
54    },
55
56    /// This node is notifying a listener.
57    Notify(GenericNotify<NothingProducer<T>>),
58
59    /// This node is removing a listener.
60    RemoveListener {
61        /// The ID of the listener to remove.
62        listener: NonZeroUsize,
63
64        /// Whether to propagate notifications to the next listener.
65        propagate: bool,
66    },
67
68    /// We are waiting for the mutex to lock, so they can manipulate it.
69    Waiting(Task),
70}
71
72impl<T> fmt::Debug for Node<T> {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        match self {
75            Self::AddListener { .. } => f.write_str("AddListener"),
76            Self::Notify(notify) => f
77                .debug_struct("Notify")
78                .field("count", &notify.count(Internal::new()))
79                .field("is_additional", &notify.is_additional(Internal::new()))
80                .finish(),
81            Self::RemoveListener {
82                listener,
83                propagate,
84            } => f
85                .debug_struct("RemoveListener")
86                .field("listener", listener)
87                .field("propagate", propagate)
88                .finish(),
89            Self::Waiting(_) => f.write_str("Waiting"),
90        }
91    }
92}
93
94#[derive(Debug)]
95pub(crate) struct TaskWaiting {
96    /// The task that is being waited on.
97    task: AtomicCell<Task>,
98
99    /// The ID of the new entry.
100    ///
101    /// This is set to zero when the task is still queued, or usize::MAX when the node should not
102    /// be added at all.
103    entry_id: AtomicUsize,
104}
105
106impl<T> Node<T> {
107    pub(crate) fn listener() -> (Self, Arc<TaskWaiting>) {
108        // Create a new `TaskWaiting` structure.
109        let task_waiting = Arc::new(TaskWaiting {
110            task: AtomicCell::new(),
111            entry_id: AtomicUsize::new(0),
112        });
113
114        (
115            Self::AddListener {
116                task_waiting: task_waiting.clone(),
117            },
118            task_waiting,
119        )
120    }
121
122    /// Apply the node to the list.
123    pub(super) fn apply(self, list: &mut ListenerSlab<T>) -> Option<Task> {
124        match self {
125            Node::AddListener { task_waiting } => {
126                // If we're cancelled, do nothing.
127                if task_waiting.entry_id.load(Ordering::Relaxed) == usize::MAX {
128                    return task_waiting.task.take().map(|t| *t);
129                }
130
131                // Add a new entry to the list.
132                let key = list.insert(State::Created);
133                assert!(key.get() != usize::MAX);
134
135                // Send the new key to the listener and wake it if necessary.
136                let old_value = task_waiting.entry_id.swap(key.get(), Ordering::Release);
137
138                // If we're cancelled, remove ourselves from the list.
139                if old_value == usize::MAX {
140                    list.remove(key, false);
141                }
142
143                return task_waiting.task.take().map(|t| *t);
144            }
145            Node::Notify(notify) => {
146                // Notify the next `count` listeners.
147                list.notify(notify);
148            }
149            Node::RemoveListener {
150                listener,
151                propagate,
152            } => {
153                // Remove the listener from the list.
154                list.remove(listener, propagate);
155            }
156            Node::Waiting(task) => {
157                return Some(task);
158            }
159        }
160
161        None
162    }
163}
164
165impl TaskWaiting {
166    /// Determine if we are still queued.
167    ///
168    /// Returns `Some` with the entry ID if we are no longer queued.
169    pub(crate) fn status(&self) -> Option<NonZeroUsize> {
170        NonZeroUsize::new(self.entry_id.load(Ordering::Acquire))
171    }
172
173    /// Register a listener.
174    pub(crate) fn register(&self, task: Task) {
175        // Set the task.
176        if let Some(task) = self.task.replace(Some(Box::new(task))) {
177            task.wake();
178        }
179
180        // If the entry ID is non-zero, then we are no longer queued.
181        if self.status().is_some() {
182            // Wake the task.
183            if let Some(task) = self.task.take() {
184                task.wake();
185            }
186        }
187    }
188
189    /// Mark this listener as cancelled, indicating that it should not be inserted into the list.
190    ///
191    /// If this listener was already inserted into the list, returns the entry ID. Otherwise returns
192    /// `None`.
193    pub(crate) fn cancel(&self) -> Option<NonZeroUsize> {
194        // Set the entry ID to usize::MAX.
195        let id = self.entry_id.swap(usize::MAX, Ordering::Release);
196
197        // Wake the task.
198        if let Some(task) = self.task.take() {
199            task.wake();
200        }
201
202        // Return the entry ID if we were queued.
203        NonZeroUsize::new(id)
204    }
205}
206
207/// A shared pointer to a value.
208///
209/// The inner value is a `Box<T>`.
210#[derive(Debug)]
211struct AtomicCell<T>(AtomicPtr<T>);
212
213impl<T> AtomicCell<T> {
214    /// Create a new `AtomicCell`.
215    fn new() -> Self {
216        Self(AtomicPtr::new(ptr::null_mut()))
217    }
218
219    /// Swap the value out.
220    fn replace(&self, value: Option<Box<T>>) -> Option<Box<T>> {
221        let old_value = match value {
222            Some(value) => self.0.swap(Box::into_raw(value), Ordering::AcqRel),
223            // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
224            // will never be dereferenced, there is no need to synchronize the store of a null ptr.
225            None => self.0.swap(ptr::null_mut(), Ordering::Acquire),
226        };
227
228        if old_value.is_null() {
229            None
230        } else {
231            // SAFETY:
232            // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
233            // - We've checked that old_value is not null.
234            // - We do not store invalid pointers other than null in self.0.
235            Some(unsafe { Box::from_raw(old_value) })
236        }
237    }
238
239    /// Take the value out.
240    fn take(&self) -> Option<Box<T>> {
241        self.replace(None)
242    }
243}
244
245impl<T> Drop for AtomicCell<T> {
246    fn drop(&mut self) {
247        self.take();
248    }
249}