hermit/synch/
futex.rs

1use core::sync::atomic::AtomicU32;
2use core::sync::atomic::Ordering::SeqCst;
3
4use ahash::RandomState;
5use hashbrown::HashMap;
6use hashbrown::hash_map::Entry;
7use hermit_sync::InterruptTicketMutex;
8
9use crate::arch::kernel::core_local::core_scheduler;
10use crate::arch::kernel::processor::get_timer_ticks;
11use crate::errno::{EAGAIN, EINVAL, ETIMEDOUT};
12use crate::scheduler::PerCoreSchedulerExt;
13use crate::scheduler::task::TaskHandlePriorityQueue;
14
15// TODO: Replace with a concurrent hashmap.
16static PARKING_LOT: InterruptTicketMutex<HashMap<usize, TaskHandlePriorityQueue, RandomState>> =
17	InterruptTicketMutex::new(HashMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)));
18
19bitflags! {
20	pub struct Flags: u32 {
21		/// Use a relative timeout
22		const RELATIVE = 0b01;
23	}
24}
25
26fn addr(addr: &AtomicU32) -> usize {
27	let ptr: *const _ = addr;
28	ptr.addr()
29}
30
31/// If the value at address matches the expected value, park the current thread until it is either
32/// woken up with `futex_wake` (returns 0) or the specified timeout elapses (returns -ETIMEDOUT).
33///
34/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as
35/// relative to the current time. Otherwise it is understood to be an absolute time
36/// (see `get_timer_ticks`).
37pub(crate) fn futex_wait(
38	address: &AtomicU32,
39	expected: u32,
40	timeout: Option<u64>,
41	flags: Flags,
42) -> i32 {
43	let mut parking_lot = PARKING_LOT.lock();
44	// Check the futex value after locking the parking lot so that all changes are observed.
45	if address.load(SeqCst) != expected {
46		return -EAGAIN;
47	}
48
49	let wakeup_time = if flags.contains(Flags::RELATIVE) {
50		timeout.and_then(|t| get_timer_ticks().checked_add(t))
51	} else {
52		timeout
53	};
54
55	let scheduler = core_scheduler();
56	scheduler.block_current_task(wakeup_time);
57	let handle = scheduler.get_current_task_handle();
58	parking_lot.entry(addr(address)).or_default().push(handle);
59	drop(parking_lot);
60
61	loop {
62		scheduler.reschedule();
63
64		let mut parking_lot = PARKING_LOT.lock();
65		if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) {
66			let mut wakeup = true;
67			// Timeout occurred, try to remove ourselves from the waiting queue.
68			if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) {
69				// If we are not in the waking queue, this must have been a wakeup.
70				wakeup = !queue.get_mut().remove(handle);
71				if queue.get().is_empty() {
72					queue.remove();
73				}
74			}
75
76			if wakeup {
77				return 0;
78			} else {
79				return -ETIMEDOUT;
80			}
81		} else {
82			// If we are not in the waking queue, this must have been a wakeup.
83			let wakeup = !matches!(parking_lot
84				.get(&addr(address)), Some(queue) if queue.contains(handle));
85
86			if wakeup {
87				return 0;
88			} else {
89				// A spurious wakeup occurred, sleep again.
90				// Tasks do not change core, so the handle in the parking lot is still current.
91				scheduler.block_current_task(wakeup_time);
92			}
93		}
94		drop(parking_lot);
95	}
96}
97
98/// If the value at address matches the expected value, park the current thread until it is either
99/// woken up with `futex_wake` (returns 0) or the specified timeout elapses (returns -ETIMEDOUT).
100/// In addition, the value `new_value` will stored at address.
101///
102/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as
103/// relative to the current time. Otherwise it is understood to be an absolute time
104/// (see `get_timer_ticks`).
105pub(crate) fn futex_wait_and_set(
106	address: &AtomicU32,
107	expected: u32,
108	timeout: Option<u64>,
109	flags: Flags,
110	new_value: u32,
111) -> i32 {
112	let mut parking_lot = PARKING_LOT.lock();
113	// Check the futex value after locking the parking lot so that all changes are observed.
114	if address.swap(new_value, SeqCst) != expected {
115		return -EAGAIN;
116	}
117
118	let wakeup_time = if flags.contains(Flags::RELATIVE) {
119		timeout.and_then(|t| get_timer_ticks().checked_add(t))
120	} else {
121		timeout
122	};
123
124	let scheduler = core_scheduler();
125	scheduler.block_current_task(wakeup_time);
126	let handle = scheduler.get_current_task_handle();
127	parking_lot.entry(addr(address)).or_default().push(handle);
128	drop(parking_lot);
129
130	loop {
131		scheduler.reschedule();
132
133		let mut parking_lot = PARKING_LOT.lock();
134		if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) {
135			let mut wakeup = true;
136			// Timeout occurred, try to remove ourselves from the waiting queue.
137			if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) {
138				// If we are not in the waking queue, this must have been a wakeup.
139				wakeup = !queue.get_mut().remove(handle);
140				if queue.get().is_empty() {
141					queue.remove();
142				}
143			}
144
145			if wakeup {
146				return 0;
147			} else {
148				return -ETIMEDOUT;
149			}
150		} else {
151			// If we are not in the waking queue, this must have been a wakeup.
152			let wakeup = !matches!(parking_lot
153				.get(&addr(address)), Some(queue) if queue.contains(handle));
154
155			if wakeup {
156				return 0;
157			} else {
158				// A spurious wakeup occurred, sleep again.
159				// Tasks do not change core, so the handle in the parking lot is still current.
160				scheduler.block_current_task(wakeup_time);
161			}
162		}
163		drop(parking_lot);
164	}
165}
166
167/// Wake `count` threads waiting on the futex at address. Returns the number of threads
168/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching
169/// waiting threads. If `count` is negative, returns -EINVAL.
170/// `address` is used only for its address.
171/// It is safe to pass a dangling pointer.
172pub(crate) fn futex_wake(address: *const AtomicU32, count: i32) -> i32 {
173	if count < 0 {
174		return -EINVAL;
175	}
176
177	let mut parking_lot = PARKING_LOT.lock();
178	let mut queue = match parking_lot.entry(address.addr()) {
179		Entry::Occupied(entry) => entry,
180		Entry::Vacant(_) => return 0,
181	};
182
183	let scheduler = core_scheduler();
184	let mut woken = 0;
185	while woken != count || count == i32::MAX {
186		match queue.get_mut().pop() {
187			Some(handle) => scheduler.custom_wakeup(handle),
188			None => break,
189		}
190		woken = woken.saturating_add(1);
191	}
192
193	if queue.get().is_empty() {
194		queue.remove();
195	}
196
197	woken
198}
199
200/// Wake `count` threads waiting on the futex at address. Returns the number of threads
201/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching
202/// waiting threads. If `count` is negative, returns -EINVAL. If no thread is available,
203/// the futex at address will set to `new_value`.
204pub(crate) fn futex_wake_or_set(address: &AtomicU32, count: i32, new_value: u32) -> i32 {
205	if count < 0 {
206		return -EINVAL;
207	}
208
209	let mut parking_lot = PARKING_LOT.lock();
210	let mut queue = match parking_lot.entry(addr(address)) {
211		Entry::Occupied(entry) => entry,
212		Entry::Vacant(_) => {
213			address.store(new_value, SeqCst);
214			return 0;
215		}
216	};
217
218	let scheduler = core_scheduler();
219	let mut woken = 0;
220	while woken != count || count == i32::MAX {
221		match queue.get_mut().pop() {
222			Some(handle) => scheduler.custom_wakeup(handle),
223			None => break,
224		}
225		woken = woken.saturating_add(1);
226	}
227
228	if queue.get().is_empty() {
229		queue.remove();
230	}
231
232	if woken == 0 {
233		address.store(new_value, SeqCst);
234	}
235
236	woken
237}