1#![allow(dead_code)]
2
3#[cfg(any(feature = "tcp", feature = "udp"))]
4pub(crate) mod device;
5#[cfg(any(feature = "tcp", feature = "udp"))]
6pub(crate) mod network;
7pub(crate) mod task;
8#[cfg(feature = "vsock")]
9pub(crate) mod vsock;
10
11use alloc::sync::Arc;
12use alloc::task::Wake;
13use core::future::Future;
14use core::pin::pin;
15use core::sync::atomic::AtomicU32;
16use core::task::{Context, Poll, Waker};
17use core::time::Duration;
18
19use crossbeam_utils::Backoff;
20use hermit_sync::without_interrupts;
21#[cfg(any(feature = "tcp", feature = "udp"))]
22use smoltcp::time::Instant;
23
24use crate::arch::core_local::*;
25#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "pci")))]
26use crate::drivers::mmio::get_network_driver;
27#[cfg(any(feature = "tcp", feature = "udp"))]
28use crate::drivers::net::NetworkDriver;
29#[cfg(all(any(feature = "tcp", feature = "udp"), feature = "pci"))]
30use crate::drivers::pci::get_network_driver;
31use crate::executor::task::AsyncTask;
32use crate::io;
33#[cfg(any(feature = "tcp", feature = "udp"))]
34use crate::scheduler::PerCoreSchedulerExt;
35use crate::synch::futex::*;
36
37#[derive(Debug)]
40pub(crate) struct WakerRegistration {
41 waker: Option<Waker>,
42}
43
44impl WakerRegistration {
45 pub const fn new() -> Self {
46 Self { waker: None }
47 }
48
49 pub fn register(&mut self, w: &Waker) {
51 match self.waker {
52 Some(ref w2) if (w2.will_wake(w)) => {}
55 _ => self.waker = Some(w.clone()),
60 }
61 }
62
63 pub fn wake(&mut self) {
65 if let Some(w) = self.waker.take() {
66 w.wake();
67 }
68 }
69}
70
71struct TaskNotify {
72 futex: AtomicU32,
74}
75
76impl TaskNotify {
77 pub const fn new() -> Self {
78 Self {
79 futex: AtomicU32::new(0),
80 }
81 }
82
83 pub fn wait(&self, timeout: Option<u64>) {
84 let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0);
89 }
90}
91
92impl Wake for TaskNotify {
93 fn wake(self: Arc<Self>) {
94 self.wake_by_ref();
95 }
96
97 fn wake_by_ref(self: &Arc<Self>) {
98 let _ = futex_wake_or_set(&self.futex, 1, u32::MAX);
99 }
100}
101
102pub(crate) fn run() {
103 let mut cx = Context::from_waker(Waker::noop());
104
105 without_interrupts(|| {
106 async_tasks().retain_mut(|task| {
107 trace!("Run async task {}", task.id());
108
109 match task.poll(&mut cx) {
110 Poll::Ready(()) => false,
111 Poll::Pending => true,
112 }
113 });
114 });
115}
116
117pub(crate) fn spawn<F>(future: F)
119where
120 F: Future<Output = ()> + Send + 'static,
121{
122 without_interrupts(|| async_tasks().push(AsyncTask::new(future)));
123}
124
125pub fn init() {
126 #[cfg(any(feature = "tcp", feature = "udp"))]
127 crate::executor::network::init();
128 #[cfg(feature = "vsock")]
129 crate::executor::vsock::init();
130}
131
132pub(crate) fn poll_on<F, T>(future: F) -> io::Result<T>
134where
135 F: Future<Output = io::Result<T>>,
136{
137 let mut cx = Context::from_waker(Waker::noop());
138 let mut future = pin!(future);
139
140 loop {
141 run();
143
144 if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
145 return t;
146 }
147 }
148}
149
150pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> io::Result<T>
152where
153 F: Future<Output = io::Result<T>>,
154{
155 #[cfg(any(feature = "tcp", feature = "udp"))]
156 let device = get_network_driver();
157
158 let backoff = Backoff::new();
159 let start = crate::arch::kernel::systemtime::now_micros();
160 let task_notify = Arc::new(TaskNotify::new());
161 let waker = task_notify.clone().into();
162 let mut cx = Context::from_waker(&waker);
163 let mut future = pin!(future);
164
165 loop {
166 let result = future.as_mut().poll(&mut cx);
168
169 run();
171
172 let now = crate::arch::kernel::systemtime::now_micros();
173 if let Poll::Ready(t) = result {
174 #[cfg(any(feature = "tcp", feature = "udp"))]
176 {
177 let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
178 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
179 .map(|d| d.total_micros())
180 } else {
181 None
182 };
183 core_scheduler().add_network_timer(
184 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
185 );
186
187 if let Some(device) = device {
188 device.lock().set_polling_mode(false);
189 }
190 }
191
192 return t;
193 }
194
195 if let Some(duration) = timeout {
196 if Duration::from_micros(now - start) >= duration {
197 #[cfg(any(feature = "tcp", feature = "udp"))]
199 {
200 let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
201 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
202 .map(|d| d.total_micros())
203 } else {
204 None
205 };
206 core_scheduler().add_network_timer(
207 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
208 );
209
210 if let Some(device) = device {
211 device.lock().set_polling_mode(false);
212 }
213 }
214
215 return Err(io::Error::ETIME);
216 }
217 }
218
219 #[cfg(any(feature = "tcp", feature = "udp"))]
220 if backoff.is_completed() {
221 let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
222 nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
223 .map(|d| d.total_micros())
224 } else {
225 None
226 };
227
228 if delay.unwrap_or(10_000_000) > 10_000 {
229 core_scheduler().add_network_timer(
230 delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
231 );
232 let wakeup_time =
233 timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
234
235 if let Some(device) = device {
237 device.lock().set_polling_mode(false);
238 }
239
240 task_notify.wait(wakeup_time);
242
243 if let Some(device) = device {
245 device.lock().set_polling_mode(true);
246 }
247
248 backoff.reset();
249 }
250 } else {
251 backoff.snooze();
252 }
253
254 #[cfg(not(any(feature = "tcp", feature = "udp")))]
255 {
256 if backoff.is_completed() {
257 let wakeup_time =
258 timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
259
260 task_notify.wait(wakeup_time);
262
263 backoff.reset();
265 } else {
266 backoff.snooze();
267 }
268 }
269 }
270}