1#[cfg(feature = "net")]
2pub(crate) mod device;
3#[cfg(feature = "net")]
4pub(crate) mod network;
5pub(crate) mod task;
6#[cfg(feature = "vsock")]
7pub(crate) mod vsock;
8
9use alloc::sync::Arc;
10use alloc::task::Wake;
11use core::future::Future;
12use core::pin::pin;
13use core::sync::atomic::AtomicU32;
14use core::task::{Context, Poll, Waker};
15use core::time::Duration;
16
17use crossbeam_utils::Backoff;
18use hermit_sync::without_interrupts;
19#[cfg(feature = "net")]
20use smoltcp::time::Instant;
21
22use crate::arch::core_local;
23use crate::errno::Errno;
24use crate::executor::task::AsyncTask;
25use crate::io;
26#[cfg(feature = "net")]
27use crate::scheduler::PerCoreSchedulerExt;
28use crate::synch::futex::*;
29
30#[derive(Debug)]
33pub(crate) struct WakerRegistration {
34 waker: Option<Waker>,
35}
36
37impl WakerRegistration {
38 pub const fn new() -> Self {
39 Self { waker: None }
40 }
41
42 pub fn register(&mut self, w: &Waker) {
44 match self.waker {
45 Some(ref w2) if (w2.will_wake(w)) => {}
48 _ => self.waker = Some(w.clone()),
53 }
54 }
55
56 #[allow(dead_code)]
58 pub fn wake(&mut self) {
59 if let Some(w) = self.waker.take() {
60 w.wake();
61 }
62 }
63}
64
65struct TaskNotify {
66 futex: AtomicU32,
68}
69
70impl TaskNotify {
71 pub const fn new() -> Self {
72 Self {
73 futex: AtomicU32::new(0),
74 }
75 }
76
77 pub fn wait(&self, timeout: Option<u64>) {
78 let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0);
83 }
84}
85
86impl Wake for TaskNotify {
87 fn wake(self: Arc<Self>) {
88 self.wake_by_ref();
89 }
90
91 fn wake_by_ref(self: &Arc<Self>) {
92 let _ = futex_wake_or_set(&self.futex, 1, u32::MAX);
93 }
94}
95
96pub(crate) fn run() {
97 without_interrupts(|| {
98 for _ in 0..3 {
102 if !core_local::ex().try_tick() {
103 break;
104 }
105 }
106 });
107}
108
109#[cfg_attr(
111 not(any(feature = "shell", feature = "net", feature = "vsock")),
112 expect(dead_code)
113)]
114pub(crate) fn spawn<F>(future: F)
115where
116 F: Future<Output = ()> + Send + 'static,
117{
118 core_local::ex().spawn(AsyncTask::new(future)).detach();
119}
120
121pub fn init() {
122 #[cfg(feature = "net")]
123 crate::executor::network::init();
124 #[cfg(feature = "vsock")]
125 crate::executor::vsock::init();
126}
127
128pub(crate) fn poll_on<F, T>(future: F) -> io::Result<T>
130where
131 F: Future<Output = io::Result<T>>,
132{
133 let mut cx = Context::from_waker(Waker::noop());
134 let mut future = pin!(future);
135
136 loop {
137 run();
139
140 if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
141 return t;
142 }
143 }
144}
145
146pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> io::Result<T>
148where
149 F: Future<Output = io::Result<T>>,
150{
151 let backoff = Backoff::new();
152 let start = crate::arch::kernel::systemtime::now_micros();
153 let task_notify = Arc::new(TaskNotify::new());
154 let waker = task_notify.clone().into();
155 let mut cx = Context::from_waker(&waker);
156 let mut future = pin!(future);
157
158 loop {
159 let result = future.as_mut().poll(&mut cx);
161
162 run();
164
165 let now = crate::arch::kernel::systemtime::now_micros();
166 if let Poll::Ready(t) = result {
167 #[cfg(feature = "net")]
169 {
170 if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
171 let delay = if let Ok(nic) = guard.as_nic_mut() {
172 nic.set_polling_mode(false);
173
174 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
175 .map(|d| d.total_micros())
176 } else {
177 None
178 };
179 core_local::core_scheduler().add_network_timer(
180 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
181 );
182 }
183 }
184
185 return t;
186 }
187
188 if let Some(duration) = timeout
189 && Duration::from_micros(now - start) >= duration
190 {
191 #[cfg(feature = "net")]
193 {
194 if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
195 let delay = if let Ok(nic) = guard.as_nic_mut() {
196 nic.set_polling_mode(false);
197
198 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
199 .map(|d| d.total_micros())
200 } else {
201 None
202 };
203 core_local::core_scheduler().add_network_timer(
204 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
205 );
206 }
207 }
208
209 return Err(Errno::Time);
210 }
211
212 #[cfg(feature = "net")]
213 if backoff.is_completed() {
214 let delay = if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
215 if let Ok(nic) = guard.as_nic_mut() {
216 nic.set_polling_mode(false);
217
218 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
219 .map(|d| d.total_micros())
220 } else {
221 None
222 }
223 } else {
224 None
225 };
226
227 if delay.unwrap_or(10_000_000) > 10_000 {
228 core_local::core_scheduler().add_network_timer(
229 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
230 );
231 let wakeup_time =
232 timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
233
234 task_notify.wait(wakeup_time);
236
237 if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
239 nic.set_polling_mode(true);
240 }
241
242 backoff.reset();
243 }
244 } else {
245 backoff.snooze();
246 }
247
248 #[cfg(not(feature = "net"))]
249 {
250 if backoff.is_completed() {
251 let wakeup_time =
252 timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
253
254 task_notify.wait(wakeup_time);
256
257 backoff.reset();
259 } else {
260 backoff.snooze();
261 }
262 }
263 }
264}