hermit/executor/
mod.rs

1#![allow(dead_code)]
2
3#[cfg(any(feature = "tcp", feature = "udp"))]
4pub(crate) mod device;
5#[cfg(any(feature = "tcp", feature = "udp"))]
6pub(crate) mod network;
7pub(crate) mod task;
8#[cfg(feature = "vsock")]
9pub(crate) mod vsock;
10
11use alloc::sync::Arc;
12use alloc::task::Wake;
13use core::future::Future;
14use core::pin::pin;
15use core::sync::atomic::AtomicU32;
16use core::task::{Context, Poll, Waker};
17use core::time::Duration;
18
19use crossbeam_utils::Backoff;
20use hermit_sync::without_interrupts;
21#[cfg(any(feature = "tcp", feature = "udp"))]
22use smoltcp::time::Instant;
23
24use crate::arch::core_local::*;
25#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "pci")))]
26use crate::drivers::mmio::get_network_driver;
27#[cfg(any(feature = "tcp", feature = "udp"))]
28use crate::drivers::net::NetworkDriver;
29#[cfg(all(any(feature = "tcp", feature = "udp"), feature = "pci"))]
30use crate::drivers::pci::get_network_driver;
31use crate::executor::task::AsyncTask;
32use crate::io;
33#[cfg(any(feature = "tcp", feature = "udp"))]
34use crate::scheduler::PerCoreSchedulerExt;
35use crate::synch::futex::*;
36
37/// WakerRegistration is derived from smoltcp's
38/// implementation.
39#[derive(Debug)]
40pub(crate) struct WakerRegistration {
41	waker: Option<Waker>,
42}
43
44impl WakerRegistration {
45	pub const fn new() -> Self {
46		Self { waker: None }
47	}
48
49	/// Register a waker. Overwrites the previous waker, if any.
50	pub fn register(&mut self, w: &Waker) {
51		match self.waker {
52			// Optimization: If both the old and new Wakers wake the same task, we can simply
53			// keep the old waker, skipping the clone.
54			Some(ref w2) if (w2.will_wake(w)) => {}
55			// In all other cases
56			// - we have no waker registered
57			// - we have a waker registered but it's for a different task.
58			// then clone the new waker and store it
59			_ => self.waker = Some(w.clone()),
60		}
61	}
62
63	/// Wake the registered waker, if any.
64	pub fn wake(&mut self) {
65		if let Some(w) = self.waker.take() {
66			w.wake();
67		}
68	}
69}
70
71struct TaskNotify {
72	/// Futex to wakeup a single task
73	futex: AtomicU32,
74}
75
76impl TaskNotify {
77	pub const fn new() -> Self {
78		Self {
79			futex: AtomicU32::new(0),
80		}
81	}
82
83	pub fn wait(&self, timeout: Option<u64>) {
84		// Wait for a futex and reset the value to zero. If the value
85		// is not zero, someone already wanted to wakeup a task and stored another
86		// value to the futex address. In this case, the function directly returns
87		// and doesn't block.
88		let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0);
89	}
90}
91
92impl Wake for TaskNotify {
93	fn wake(self: Arc<Self>) {
94		self.wake_by_ref();
95	}
96
97	fn wake_by_ref(self: &Arc<Self>) {
98		let _ = futex_wake_or_set(&self.futex, 1, u32::MAX);
99	}
100}
101
102pub(crate) fn run() {
103	let mut cx = Context::from_waker(Waker::noop());
104
105	without_interrupts(|| {
106		async_tasks().retain_mut(|task| {
107			trace!("Run async task {}", task.id());
108
109			match task.poll(&mut cx) {
110				Poll::Ready(()) => false,
111				Poll::Pending => true,
112			}
113		});
114	});
115}
116
117/// Spawns a future on the executor.
118pub(crate) fn spawn<F>(future: F)
119where
120	F: Future<Output = ()> + Send + 'static,
121{
122	without_interrupts(|| async_tasks().push(AsyncTask::new(future)));
123}
124
125pub fn init() {
126	#[cfg(any(feature = "tcp", feature = "udp"))]
127	crate::executor::network::init();
128	#[cfg(feature = "vsock")]
129	crate::executor::vsock::init();
130}
131
132/// Blocks the current thread on `f`, running the executor when idling.
133pub(crate) fn poll_on<F, T>(future: F) -> io::Result<T>
134where
135	F: Future<Output = io::Result<T>>,
136{
137	let mut cx = Context::from_waker(Waker::noop());
138	let mut future = pin!(future);
139
140	loop {
141		// run background tasks
142		run();
143
144		if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
145			return t;
146		}
147	}
148}
149
150/// Blocks the current thread on `f`, running the executor when idling.
151pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> io::Result<T>
152where
153	F: Future<Output = io::Result<T>>,
154{
155	#[cfg(any(feature = "tcp", feature = "udp"))]
156	let device = get_network_driver();
157
158	let backoff = Backoff::new();
159	let start = crate::arch::kernel::systemtime::now_micros();
160	let task_notify = Arc::new(TaskNotify::new());
161	let waker = task_notify.clone().into();
162	let mut cx = Context::from_waker(&waker);
163	let mut future = pin!(future);
164
165	loop {
166		// check future
167		let result = future.as_mut().poll(&mut cx);
168
169		// run background all tasks, which poll also the network device
170		run();
171
172		let now = crate::arch::kernel::systemtime::now_micros();
173		if let Poll::Ready(t) = result {
174			// allow network interrupts
175			#[cfg(any(feature = "tcp", feature = "udp"))]
176			{
177				let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
178					nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
179						.map(|d| d.total_micros())
180				} else {
181					None
182				};
183				core_scheduler().add_network_timer(
184					delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
185				);
186
187				if let Some(device) = device {
188					device.lock().set_polling_mode(false);
189				}
190			}
191
192			return t;
193		}
194
195		if let Some(duration) = timeout {
196			if Duration::from_micros(now - start) >= duration {
197				// allow network interrupts
198				#[cfg(any(feature = "tcp", feature = "udp"))]
199				{
200					let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
201						nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
202							.map(|d| d.total_micros())
203					} else {
204						None
205					};
206					core_scheduler().add_network_timer(
207						delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
208					);
209
210					if let Some(device) = device {
211						device.lock().set_polling_mode(false);
212					}
213				}
214
215				return Err(io::Error::ETIME);
216			}
217		}
218
219		#[cfg(any(feature = "tcp", feature = "udp"))]
220		if backoff.is_completed() {
221			let delay = if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
222				nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
223					.map(|d| d.total_micros())
224			} else {
225				None
226			};
227
228			if delay.unwrap_or(10_000_000) > 10_000 {
229				core_scheduler().add_network_timer(
230					delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
231				);
232				let wakeup_time =
233					timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
234
235				// allow network interrupts
236				if let Some(device) = device {
237					device.lock().set_polling_mode(false);
238				}
239
240				// switch to another task
241				task_notify.wait(wakeup_time);
242
243				// restore default values
244				if let Some(device) = device {
245					device.lock().set_polling_mode(true);
246				}
247
248				backoff.reset();
249			}
250		} else {
251			backoff.snooze();
252		}
253
254		#[cfg(not(any(feature = "tcp", feature = "udp")))]
255		{
256			if backoff.is_completed() {
257				let wakeup_time =
258					timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
259
260				// switch to another task
261				task_notify.wait(wakeup_time);
262
263				// restore default values
264				backoff.reset();
265			} else {
266				backoff.snooze();
267			}
268		}
269	}
270}