async_lock/rwlock/raw.rs
1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use core::marker::PhantomPinned;
10use core::mem::forget;
11use core::pin::Pin;
12use core::task::Poll;
13
14use crate::sync::atomic::{AtomicUsize, Ordering};
15
16use event_listener::{Event, EventListener};
17use event_listener_strategy::{EventListenerFuture, Strategy};
18
19use crate::futures::Lock;
20use crate::Mutex;
21
22const WRITER_BIT: usize = 1;
23const ONE_READER: usize = 2;
24
25/// A "raw" RwLock that doesn't hold any data.
26pub(super) struct RawRwLock {
27 /// Acquired by the writer.
28 mutex: Mutex<()>,
29
30 /// Event triggered when the last reader is dropped.
31 no_readers: Event,
32
33 /// Event triggered when the writer is dropped.
34 no_writer: Event,
35
36 /// Current state of the lock.
37 ///
38 /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
39 /// trying to acquire it.
40 ///
41 /// The upper bits contain the number of currently active readers. Each active reader
42 /// increments the state by `ONE_READER`.
43 state: AtomicUsize,
44}
45
46impl RawRwLock {
47 const_fn! {
48 const_if: #[cfg(not(loom))];
49 #[inline]
50 pub(super) const fn new() -> Self {
51 RawRwLock {
52 mutex: Mutex::new(()),
53 no_readers: Event::new(),
54 no_writer: Event::new(),
55 state: AtomicUsize::new(0),
56 }
57 }
58 }
59
60 /// Returns `true` iff a read lock was successfully acquired.
61 pub(super) fn try_read(&self) -> bool {
62 let mut state = self.state.load(Ordering::Acquire);
63
64 loop {
65 // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
66 // a read lock here.
67 if state & WRITER_BIT != 0 {
68 return false;
69 }
70
71 // Make sure the number of readers doesn't overflow.
72 if state > isize::MAX as usize {
73 crate::abort();
74 }
75
76 // Increment the number of readers.
77 match self.state.compare_exchange(
78 state,
79 state + ONE_READER,
80 Ordering::AcqRel,
81 Ordering::Acquire,
82 ) {
83 Ok(_) => return true,
84 Err(s) => state = s,
85 }
86 }
87 }
88
89 #[inline]
90 pub(super) fn read(&self) -> RawRead<'_> {
91 RawRead {
92 lock: self,
93 state: self.state.load(Ordering::Acquire),
94 listener: None,
95 _pin: PhantomPinned,
96 }
97 }
98
99 /// Returns `true` iff an upgradable read lock was successfully acquired.
100 pub(super) fn try_upgradable_read(&self) -> bool {
101 // First try grabbing the mutex.
102 let lock = if let Some(lock) = self.mutex.try_lock() {
103 lock
104 } else {
105 return false;
106 };
107
108 forget(lock);
109
110 let mut state = self.state.load(Ordering::Acquire);
111
112 // Make sure the number of readers doesn't overflow.
113 if state > isize::MAX as usize {
114 crate::abort();
115 }
116
117 // Increment the number of readers.
118 loop {
119 match self.state.compare_exchange(
120 state,
121 state + ONE_READER,
122 Ordering::AcqRel,
123 Ordering::Acquire,
124 ) {
125 Ok(_) => return true,
126 Err(s) => state = s,
127 }
128 }
129 }
130
131 #[inline]
132 pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
133 RawUpgradableRead {
134 lock: self,
135 acquire: self.mutex.lock(),
136 }
137 }
138
139 /// Returns `true` iff a write lock was successfully acquired.
140 pub(super) fn try_write(&self) -> bool {
141 // First try grabbing the mutex.
142 let lock = if let Some(lock) = self.mutex.try_lock() {
143 lock
144 } else {
145 return false;
146 };
147
148 // If there are no readers, grab the write lock.
149 if self
150 .state
151 .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
152 .is_ok()
153 {
154 forget(lock);
155 true
156 } else {
157 drop(lock);
158 false
159 }
160 }
161
162 #[inline]
163 pub(super) fn write(&self) -> RawWrite<'_> {
164 RawWrite {
165 lock: self,
166 no_readers: None,
167 state: WriteState::Acquiring {
168 lock: self.mutex.lock(),
169 },
170 }
171 }
172
173 /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
174 ///
175 /// # Safety
176 ///
177 /// Caller must hold an upgradable read lock.
178 /// This will attempt to upgrade it to a write lock.
179 pub(super) unsafe fn try_upgrade(&self) -> bool {
180 self.state
181 .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
182 .is_ok()
183 }
184
185 /// # Safety
186 ///
187 /// Caller must hold an upgradable read lock.
188 /// This will upgrade it to a write lock.
189 pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
190 // Set `WRITER_BIT` and decrement the number of readers at the same time.
191 self.state
192 .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
193
194 RawUpgrade {
195 lock: Some(self),
196 listener: None,
197 _pin: PhantomPinned,
198 }
199 }
200
201 /// # Safety
202 ///
203 /// Caller must hold an upgradable read lock.
204 /// This will downgrade it to a standard read lock.
205 #[inline]
206 pub(super) unsafe fn downgrade_upgradable_read(&self) {
207 self.mutex.unlock_unchecked();
208 }
209
210 /// # Safety
211 ///
212 /// Caller must hold a write lock.
213 /// This will downgrade it to a read lock.
214 pub(super) unsafe fn downgrade_write(&self) {
215 // Atomically downgrade state.
216 self.state
217 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
218
219 // Release the writer mutex.
220 self.mutex.unlock_unchecked();
221
222 // Trigger the "no writer" event.
223 self.no_writer.notify(1);
224 }
225
226 /// # Safety
227 ///
228 /// Caller must hold a write lock.
229 /// This will downgrade it to an upgradable read lock.
230 pub(super) unsafe fn downgrade_to_upgradable(&self) {
231 // Atomically downgrade state.
232 self.state
233 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
234 }
235
236 /// # Safety
237 ///
238 /// Caller must hold a read lock .
239 /// This will unlock that lock.
240 pub(super) unsafe fn read_unlock(&self) {
241 // Decrement the number of readers.
242 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
243 // If this was the last reader, trigger the "no readers" event.
244 self.no_readers.notify(1);
245 }
246 }
247
248 /// # Safety
249 ///
250 /// Caller must hold an upgradable read lock.
251 /// This will unlock that lock.
252 pub(super) unsafe fn upgradable_read_unlock(&self) {
253 // Decrement the number of readers.
254 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
255 // If this was the last reader, trigger the "no readers" event.
256 self.no_readers.notify(1);
257 }
258
259 // SAFETY: upgradable read guards acquire the writer mutex upon creation.
260 self.mutex.unlock_unchecked();
261 }
262
263 /// # Safety
264 ///
265 /// Caller must hold a write lock.
266 /// This will unlock that lock.
267 pub(super) unsafe fn write_unlock(&self) {
268 // Unset `WRITER_BIT`.
269 self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
270 // Trigger the "no writer" event.
271 self.no_writer.notify(1);
272
273 // Release the writer lock.
274 // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
275 self.mutex.unlock_unchecked();
276 }
277}
278
279pin_project_lite::pin_project! {
280 /// The future returned by [`RawRwLock::read`].
281
282 pub(super) struct RawRead<'a> {
283 // The lock that is being acquired.
284 pub(super) lock: &'a RawRwLock,
285
286 // The last-observed state of the lock.
287 state: usize,
288
289 // The listener for the "no writers" event.
290 listener: Option<EventListener>,
291
292 // Making this type `!Unpin` enables future optimizations.
293 #[pin]
294 _pin: PhantomPinned
295 }
296}
297
298impl EventListenerFuture for RawRead<'_> {
299 type Output = ();
300
301 fn poll_with_strategy<'x, S: Strategy<'x>>(
302 self: Pin<&mut Self>,
303 strategy: &mut S,
304 cx: &mut S::Context,
305 ) -> Poll<()> {
306 let this = self.project();
307
308 loop {
309 if *this.state & WRITER_BIT == 0 {
310 // Make sure the number of readers doesn't overflow.
311 if *this.state > isize::MAX as usize {
312 crate::abort();
313 }
314
315 // If nobody is holding a write lock or attempting to acquire it, increment the
316 // number of readers.
317 match this.lock.state.compare_exchange(
318 *this.state,
319 *this.state + ONE_READER,
320 Ordering::AcqRel,
321 Ordering::Acquire,
322 ) {
323 Ok(_) => return Poll::Ready(()),
324 Err(s) => *this.state = s,
325 }
326 } else {
327 // Start listening for "no writer" events.
328 let load_ordering = if this.listener.is_none() {
329 *this.listener = Some(this.lock.no_writer.listen());
330
331 // Make sure there really is no writer.
332 Ordering::SeqCst
333 } else {
334 // Wait for the writer to finish.
335 ready!(strategy.poll(this.listener, cx));
336
337 // Notify the next reader waiting in list.
338 this.lock.no_writer.notify(1);
339
340 // Check the state again.
341 Ordering::Acquire
342 };
343
344 // Reload the state.
345 *this.state = this.lock.state.load(load_ordering);
346 }
347 }
348 }
349}
350
351pin_project_lite::pin_project! {
352 /// The future returned by [`RawRwLock::upgradable_read`].
353 pub(super) struct RawUpgradableRead<'a> {
354 // The lock that is being acquired.
355 pub(super) lock: &'a RawRwLock,
356
357 // The mutex we are trying to acquire.
358 #[pin]
359 acquire: Lock<'a, ()>,
360 }
361}
362
363impl EventListenerFuture for RawUpgradableRead<'_> {
364 type Output = ();
365
366 fn poll_with_strategy<'x, S: Strategy<'x>>(
367 self: Pin<&mut Self>,
368 strategy: &mut S,
369 cx: &mut S::Context,
370 ) -> Poll<()> {
371 let this = self.project();
372
373 // Acquire the mutex.
374 let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx));
375 forget(mutex_guard);
376
377 // Load the current state.
378 let mut state = this.lock.state.load(Ordering::Acquire);
379
380 // Make sure the number of readers doesn't overflow.
381 if state > isize::MAX as usize {
382 crate::abort();
383 }
384
385 // Increment the number of readers.
386 loop {
387 match this.lock.state.compare_exchange(
388 state,
389 state + ONE_READER,
390 Ordering::AcqRel,
391 Ordering::Acquire,
392 ) {
393 Ok(_) => {
394 return Poll::Ready(());
395 }
396 Err(s) => state = s,
397 }
398 }
399 }
400}
401
402pin_project_lite::pin_project! {
403 /// The future returned by [`RawRwLock::write`].
404
405 pub(super) struct RawWrite<'a> {
406 // The lock that is being acquired.
407 pub(super) lock: &'a RawRwLock,
408
409 // Our listener for the "no readers" event.
410 no_readers: Option<EventListener>,
411
412 // Current state of this future.
413 #[pin]
414 state: WriteState<'a>,
415 }
416
417 impl PinnedDrop for RawWrite<'_> {
418 fn drop(this: Pin<&mut Self>) {
419 let this = this.project();
420
421 if matches!(this.state.project(), WriteStateProj::WaitingReaders) {
422 // Safety: we hold a write lock, more or less.
423 unsafe {
424 this.lock.write_unlock();
425 }
426 }
427 }
428 }
429}
430
431pin_project_lite::pin_project! {
432 #[project = WriteStateProj]
433 #[project_replace = WriteStateProjReplace]
434 enum WriteState<'a> {
435 // We are currently acquiring the inner mutex.
436 Acquiring { #[pin] lock: Lock<'a, ()> },
437
438 // We are currently waiting for readers to finish.
439 WaitingReaders,
440
441 // The future has completed.
442 Acquired,
443 }
444}
445
446impl EventListenerFuture for RawWrite<'_> {
447 type Output = ();
448
449 fn poll_with_strategy<'x, S: Strategy<'x>>(
450 self: Pin<&mut Self>,
451 strategy: &mut S,
452 cx: &mut S::Context,
453 ) -> Poll<()> {
454 let mut this = self.project();
455
456 loop {
457 match this.state.as_mut().project() {
458 WriteStateProj::Acquiring { lock } => {
459 // First grab the mutex.
460 let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx));
461 forget(mutex_guard);
462
463 // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
464 let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
465
466 // If we just acquired the lock, return.
467 if new_state == WRITER_BIT {
468 this.state.as_mut().set(WriteState::Acquired);
469 return Poll::Ready(());
470 }
471
472 // Start waiting for the readers to finish.
473 *this.no_readers = Some(this.lock.no_readers.listen());
474 this.state.as_mut().set(WriteState::WaitingReaders);
475 }
476
477 WriteStateProj::WaitingReaders => {
478 let load_ordering = if this.no_readers.is_some() {
479 Ordering::Acquire
480 } else {
481 Ordering::SeqCst
482 };
483
484 // Check the state again.
485 if this.lock.state.load(load_ordering) == WRITER_BIT {
486 // We are the only ones holding the lock, return `Ready`.
487 this.state.as_mut().set(WriteState::Acquired);
488 return Poll::Ready(());
489 }
490
491 // Wait for the readers to finish.
492 if this.no_readers.is_none() {
493 // Register a listener.
494 *this.no_readers = Some(this.lock.no_readers.listen());
495 } else {
496 // Wait for the readers to finish.
497 ready!(strategy.poll(this.no_readers, cx));
498 };
499 }
500 WriteStateProj::Acquired => panic!("Write lock already acquired"),
501 }
502 }
503 }
504}
505
506pin_project_lite::pin_project! {
507 /// The future returned by [`RawRwLock::upgrade`].
508
509 pub(super) struct RawUpgrade<'a> {
510 lock: Option<&'a RawRwLock>,
511
512 // The event listener we are waiting on.
513 listener: Option<EventListener>,
514
515 // Keeping this future `!Unpin` enables future optimizations.
516 #[pin]
517 _pin: PhantomPinned
518 }
519
520 impl PinnedDrop for RawUpgrade<'_> {
521 fn drop(this: Pin<&mut Self>) {
522 let this = this.project();
523 if let Some(lock) = this.lock {
524 // SAFETY: we are dropping the future that would give us a write lock,
525 // so we don't need said lock anymore.
526 unsafe {
527 lock.write_unlock();
528 }
529 }
530 }
531 }
532}
533
534impl<'a> EventListenerFuture for RawUpgrade<'a> {
535 type Output = &'a RawRwLock;
536
537 fn poll_with_strategy<'x, S: Strategy<'x>>(
538 self: Pin<&mut Self>,
539 strategy: &mut S,
540 cx: &mut S::Context,
541 ) -> Poll<&'a RawRwLock> {
542 let this = self.project();
543 let lock = this.lock.expect("cannot poll future after completion");
544
545 // If there are readers, we need to wait for them to finish.
546 loop {
547 let load_ordering = if this.listener.is_some() {
548 Ordering::Acquire
549 } else {
550 Ordering::SeqCst
551 };
552
553 // See if the number of readers is zero.
554 let state = lock.state.load(load_ordering);
555 if state == WRITER_BIT {
556 break;
557 }
558
559 // If there are readers, wait for them to finish.
560 if this.listener.is_none() {
561 // Start listening for "no readers" events.
562 *this.listener = Some(lock.no_readers.listen());
563 } else {
564 // Wait for the readers to finish.
565 ready!(strategy.poll(this.listener, cx));
566 };
567 }
568
569 // We are done.
570 Poll::Ready(this.lock.take().unwrap())
571 }
572}
573
574impl RawUpgrade<'_> {
575 /// Whether the future returned `Poll::Ready(..)` at some point.
576 #[inline]
577 pub(super) fn is_ready(&self) -> bool {
578 self.lock.is_none()
579 }
580}