hermit/executor/
mod.rs

1#[cfg(feature = "net")]
2pub(crate) mod device;
3#[cfg(feature = "net")]
4pub(crate) mod network;
5pub(crate) mod task;
6#[cfg(feature = "vsock")]
7pub(crate) mod vsock;
8
9use alloc::sync::Arc;
10use alloc::task::Wake;
11use core::future::Future;
12use core::pin::pin;
13use core::sync::atomic::AtomicU32;
14use core::task::{Context, Poll, Waker};
15use core::time::Duration;
16
17use crossbeam_utils::Backoff;
18use hermit_sync::without_interrupts;
19#[cfg(feature = "net")]
20use smoltcp::time::Instant;
21
22use crate::arch::core_local;
23use crate::errno::Errno;
24use crate::executor::task::AsyncTask;
25use crate::io;
26#[cfg(feature = "net")]
27use crate::scheduler::PerCoreSchedulerExt;
28use crate::synch::futex::*;
29
30/// WakerRegistration is derived from smoltcp's
31/// implementation.
32#[derive(Debug)]
33pub(crate) struct WakerRegistration {
34	waker: Option<Waker>,
35}
36
37impl WakerRegistration {
38	pub const fn new() -> Self {
39		Self { waker: None }
40	}
41
42	/// Register a waker. Overwrites the previous waker, if any.
43	pub fn register(&mut self, w: &Waker) {
44		match self.waker {
45			// Optimization: If both the old and new Wakers wake the same task, we can simply
46			// keep the old waker, skipping the clone.
47			Some(ref w2) if (w2.will_wake(w)) => {}
48			// In all other cases
49			// - we have no waker registered
50			// - we have a waker registered but it's for a different task.
51			// then clone the new waker and store it
52			_ => self.waker = Some(w.clone()),
53		}
54	}
55
56	/// Wake the registered waker, if any.
57	#[allow(dead_code)]
58	pub fn wake(&mut self) {
59		if let Some(w) = self.waker.take() {
60			w.wake();
61		}
62	}
63}
64
65struct TaskNotify {
66	/// Futex to wakeup a single task
67	futex: AtomicU32,
68}
69
70impl TaskNotify {
71	pub const fn new() -> Self {
72		Self {
73			futex: AtomicU32::new(0),
74		}
75	}
76
77	pub fn wait(&self, timeout: Option<u64>) {
78		// Wait for a futex and reset the value to zero. If the value
79		// is not zero, someone already wanted to wakeup a task and stored another
80		// value to the futex address. In this case, the function directly returns
81		// and doesn't block.
82		let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0);
83	}
84}
85
86impl Wake for TaskNotify {
87	fn wake(self: Arc<Self>) {
88		self.wake_by_ref();
89	}
90
91	fn wake_by_ref(self: &Arc<Self>) {
92		let _ = futex_wake_or_set(&self.futex, 1, u32::MAX);
93	}
94}
95
96pub(crate) fn run() {
97	without_interrupts(|| {
98		// FIXME: We currently have no more than 3 tasks at a time, so this is fine.
99		// Ideally, we would set this value to 200, but the network task currently immediately wakes up again.
100		// This would lead to the network task being polled 200 times back to back, slowing things down considerably.
101		for _ in 0..3 {
102			if !core_local::ex().try_tick() {
103				break;
104			}
105		}
106	});
107}
108
109/// Spawns a future on the executor.
110#[cfg_attr(
111	not(any(feature = "shell", feature = "net", feature = "vsock")),
112	expect(dead_code)
113)]
114pub(crate) fn spawn<F>(future: F)
115where
116	F: Future<Output = ()> + Send + 'static,
117{
118	core_local::ex().spawn(AsyncTask::new(future)).detach();
119}
120
121pub fn init() {
122	#[cfg(feature = "net")]
123	crate::executor::network::init();
124	#[cfg(feature = "vsock")]
125	crate::executor::vsock::init();
126}
127
128/// Blocks the current thread on `f`, running the executor when idling.
129pub(crate) fn poll_on<F, T>(future: F) -> io::Result<T>
130where
131	F: Future<Output = io::Result<T>>,
132{
133	let mut cx = Context::from_waker(Waker::noop());
134	let mut future = pin!(future);
135
136	loop {
137		// run background tasks
138		run();
139
140		if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
141			return t;
142		}
143	}
144}
145
146/// Blocks the current thread on `f`, running the executor when idling.
147pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> io::Result<T>
148where
149	F: Future<Output = io::Result<T>>,
150{
151	let backoff = Backoff::new();
152	let start = crate::arch::kernel::systemtime::now_micros();
153	let task_notify = Arc::new(TaskNotify::new());
154	let waker = task_notify.clone().into();
155	let mut cx = Context::from_waker(&waker);
156	let mut future = pin!(future);
157
158	loop {
159		// check future
160		let result = future.as_mut().poll(&mut cx);
161
162		// run background all tasks, which poll also the network device
163		run();
164
165		let now = crate::arch::kernel::systemtime::now_micros();
166		if let Poll::Ready(t) = result {
167			// allow network interrupts
168			#[cfg(feature = "net")]
169			{
170				if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
171					let delay = if let Ok(nic) = guard.as_nic_mut() {
172						nic.set_polling_mode(false);
173
174						nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
175							.map(|d| d.total_micros())
176					} else {
177						None
178					};
179					core_local::core_scheduler().add_network_timer(
180						delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
181					);
182				}
183			}
184
185			return t;
186		}
187
188		if let Some(duration) = timeout
189			&& Duration::from_micros(now - start) >= duration
190		{
191			// allow network interrupts
192			#[cfg(feature = "net")]
193			{
194				if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
195					let delay = if let Ok(nic) = guard.as_nic_mut() {
196						nic.set_polling_mode(false);
197
198						nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
199							.map(|d| d.total_micros())
200					} else {
201						None
202					};
203					core_local::core_scheduler().add_network_timer(
204						delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
205					);
206				}
207			}
208
209			return Err(Errno::Time);
210		}
211
212		#[cfg(feature = "net")]
213		if backoff.is_completed() {
214			let delay = if let Some(mut guard) = crate::executor::network::NIC.try_lock() {
215				if let Ok(nic) = guard.as_nic_mut() {
216					nic.set_polling_mode(false);
217
218					nic.poll_delay(Instant::from_micros_const(now.try_into().unwrap()))
219						.map(|d| d.total_micros())
220				} else {
221					None
222				}
223			} else {
224				None
225			};
226
227			if delay.unwrap_or(10_000_000) > 10_000 {
228				core_local::core_scheduler().add_network_timer(
229					delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
230				);
231				let wakeup_time =
232					timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
233
234				// switch to another task
235				task_notify.wait(wakeup_time);
236
237				// restore default values
238				if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
239					nic.set_polling_mode(true);
240				}
241
242				backoff.reset();
243			}
244		} else {
245			backoff.snooze();
246		}
247
248		#[cfg(not(feature = "net"))]
249		{
250			if backoff.is_completed() {
251				let wakeup_time =
252					timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
253
254				// switch to another task
255				task_notify.wait(wakeup_time);
256
257				// restore default values
258				backoff.reset();
259			} else {
260				backoff.snooze();
261			}
262		}
263	}
264}