event_listener/slab/
node.rs1use crate::notify::{GenericNotify, Internal, NotificationPrivate, TagProducer};
6use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
7use crate::sync::Arc;
8use crate::sys::ListenerSlab;
9use crate::{State, Task};
10
11use alloc::boxed::Box;
12
13use core::fmt;
14use core::marker::PhantomData;
15use core::mem;
16use core::num::NonZeroUsize;
17use core::ptr;
18
19pub(crate) struct NothingProducer<T>(PhantomData<T>);
20
21impl<T> Default for NothingProducer<T> {
22 fn default() -> Self {
23 Self(PhantomData)
24 }
25}
26
27impl<T> fmt::Debug for NothingProducer<T> {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("NothingProducer").finish()
30 }
31}
32
33impl<T> TagProducer for NothingProducer<T> {
34 type Tag = T;
35
36 fn next_tag(&mut self) -> Self::Tag {
37 assert_eq!(mem::size_of::<Self::Tag>(), 0);
39 assert!(!mem::needs_drop::<Self::Tag>());
40
41 unsafe { mem::zeroed() }
43 }
44}
45
46pub(crate) enum Node<T> {
48 #[allow(dead_code)]
51 AddListener {
52 task_waiting: Arc<TaskWaiting>,
54 },
55
56 Notify(GenericNotify<NothingProducer<T>>),
58
59 RemoveListener {
61 listener: NonZeroUsize,
63
64 propagate: bool,
66 },
67
68 Waiting(Task),
70}
71
72impl<T> fmt::Debug for Node<T> {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 match self {
75 Self::AddListener { .. } => f.write_str("AddListener"),
76 Self::Notify(notify) => f
77 .debug_struct("Notify")
78 .field("count", ¬ify.count(Internal::new()))
79 .field("is_additional", ¬ify.is_additional(Internal::new()))
80 .finish(),
81 Self::RemoveListener {
82 listener,
83 propagate,
84 } => f
85 .debug_struct("RemoveListener")
86 .field("listener", listener)
87 .field("propagate", propagate)
88 .finish(),
89 Self::Waiting(_) => f.write_str("Waiting"),
90 }
91 }
92}
93
94#[derive(Debug)]
95pub(crate) struct TaskWaiting {
96 task: AtomicCell<Task>,
98
99 entry_id: AtomicUsize,
104}
105
106impl<T> Node<T> {
107 pub(crate) fn listener() -> (Self, Arc<TaskWaiting>) {
108 let task_waiting = Arc::new(TaskWaiting {
110 task: AtomicCell::new(),
111 entry_id: AtomicUsize::new(0),
112 });
113
114 (
115 Self::AddListener {
116 task_waiting: task_waiting.clone(),
117 },
118 task_waiting,
119 )
120 }
121
122 pub(super) fn apply(self, list: &mut ListenerSlab<T>) -> Option<Task> {
124 match self {
125 Node::AddListener { task_waiting } => {
126 if task_waiting.entry_id.load(Ordering::Relaxed) == usize::MAX {
128 return task_waiting.task.take().map(|t| *t);
129 }
130
131 let key = list.insert(State::Created);
133 assert!(key.get() != usize::MAX);
134
135 let old_value = task_waiting.entry_id.swap(key.get(), Ordering::Release);
137
138 if old_value == usize::MAX {
140 list.remove(key, false);
141 }
142
143 return task_waiting.task.take().map(|t| *t);
144 }
145 Node::Notify(notify) => {
146 list.notify(notify);
148 }
149 Node::RemoveListener {
150 listener,
151 propagate,
152 } => {
153 list.remove(listener, propagate);
155 }
156 Node::Waiting(task) => {
157 return Some(task);
158 }
159 }
160
161 None
162 }
163}
164
165impl TaskWaiting {
166 pub(crate) fn status(&self) -> Option<NonZeroUsize> {
170 NonZeroUsize::new(self.entry_id.load(Ordering::Acquire))
171 }
172
173 pub(crate) fn register(&self, task: Task) {
175 if let Some(task) = self.task.replace(Some(Box::new(task))) {
177 task.wake();
178 }
179
180 if self.status().is_some() {
182 if let Some(task) = self.task.take() {
184 task.wake();
185 }
186 }
187 }
188
189 pub(crate) fn cancel(&self) -> Option<NonZeroUsize> {
194 let id = self.entry_id.swap(usize::MAX, Ordering::Release);
196
197 if let Some(task) = self.task.take() {
199 task.wake();
200 }
201
202 NonZeroUsize::new(id)
204 }
205}
206
207#[derive(Debug)]
211struct AtomicCell<T>(AtomicPtr<T>);
212
213impl<T> AtomicCell<T> {
214 fn new() -> Self {
216 Self(AtomicPtr::new(ptr::null_mut()))
217 }
218
219 fn replace(&self, value: Option<Box<T>>) -> Option<Box<T>> {
221 let old_value = match value {
222 Some(value) => self.0.swap(Box::into_raw(value), Ordering::AcqRel),
223 None => self.0.swap(ptr::null_mut(), Ordering::Acquire),
226 };
227
228 if old_value.is_null() {
229 None
230 } else {
231 Some(unsafe { Box::from_raw(old_value) })
236 }
237 }
238
239 fn take(&self) -> Option<Box<T>> {
241 self.replace(None)
242 }
243}
244
245impl<T> Drop for AtomicCell<T> {
246 fn drop(&mut self) {
247 self.take();
248 }
249}