async_lock/rwlock/raw.rs
1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use core::marker::PhantomPinned;
10use core::mem::forget;
11use core::pin::Pin;
12use core::task::Poll;
13
14use crate::sync::atomic::{AtomicUsize, Ordering};
15
16use event_listener::{Event, EventListener};
17use event_listener_strategy::{EventListenerFuture, Strategy};
18
19use crate::futures::Lock;
20use crate::Mutex;
21
22const WRITER_BIT: usize = 1;
23const ONE_READER: usize = 2;
24
25/// A "raw" RwLock that doesn't hold any data.
26pub(super) struct RawRwLock {
27 /// Acquired by the writer.
28 mutex: Mutex<()>,
29
30 /// Event triggered when the last reader is dropped.
31 no_readers: Event,
32
33 /// Event triggered when the writer is dropped.
34 no_writer: Event,
35
36 /// Current state of the lock.
37 ///
38 /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
39 /// trying to acquire it.
40 ///
41 /// The upper bits contain the number of currently active readers. Each active reader
42 /// increments the state by `ONE_READER`.
43 state: AtomicUsize,
44}
45
46impl RawRwLock {
47 const_fn! {
48 const_if: #[cfg(not(loom))];
49 #[inline]
50 pub(super) const fn new() -> Self {
51 RawRwLock {
52 mutex: Mutex::new(()),
53 no_readers: Event::new(),
54 no_writer: Event::new(),
55 state: AtomicUsize::new(0),
56 }
57 }
58 }
59
60 /// Returns `true` iff a read lock was successfully acquired.
61 pub(super) fn try_read(&self) -> bool {
62 let mut state = self.state.load(Ordering::Acquire);
63
64 loop {
65 // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
66 // a read lock here.
67 if state & WRITER_BIT != 0 {
68 return false;
69 }
70
71 // Make sure the number of readers doesn't overflow.
72 if state > core::isize::MAX as usize {
73 crate::abort();
74 }
75
76 // Increment the number of readers.
77 match self.state.compare_exchange(
78 state,
79 state + ONE_READER,
80 Ordering::AcqRel,
81 Ordering::Acquire,
82 ) {
83 Ok(_) => return true,
84 Err(s) => state = s,
85 }
86 }
87 }
88
89 #[inline]
90 pub(super) fn read(&self) -> RawRead<'_> {
91 RawRead {
92 lock: self,
93 state: self.state.load(Ordering::Acquire),
94 listener: None,
95 _pin: PhantomPinned,
96 }
97 }
98
99 /// Returns `true` iff an upgradable read lock was successfully acquired.
100
101 pub(super) fn try_upgradable_read(&self) -> bool {
102 // First try grabbing the mutex.
103 let lock = if let Some(lock) = self.mutex.try_lock() {
104 lock
105 } else {
106 return false;
107 };
108
109 forget(lock);
110
111 let mut state = self.state.load(Ordering::Acquire);
112
113 // Make sure the number of readers doesn't overflow.
114 if state > core::isize::MAX as usize {
115 crate::abort();
116 }
117
118 // Increment the number of readers.
119 loop {
120 match self.state.compare_exchange(
121 state,
122 state + ONE_READER,
123 Ordering::AcqRel,
124 Ordering::Acquire,
125 ) {
126 Ok(_) => return true,
127 Err(s) => state = s,
128 }
129 }
130 }
131
132 #[inline]
133
134 pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
135 RawUpgradableRead {
136 lock: self,
137 acquire: self.mutex.lock(),
138 }
139 }
140
141 /// Returs `true` iff a write lock was successfully acquired.
142
143 pub(super) fn try_write(&self) -> bool {
144 // First try grabbing the mutex.
145 let lock = if let Some(lock) = self.mutex.try_lock() {
146 lock
147 } else {
148 return false;
149 };
150
151 // If there are no readers, grab the write lock.
152 if self
153 .state
154 .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
155 .is_ok()
156 {
157 forget(lock);
158 true
159 } else {
160 drop(lock);
161 false
162 }
163 }
164
165 #[inline]
166
167 pub(super) fn write(&self) -> RawWrite<'_> {
168 RawWrite {
169 lock: self,
170 no_readers: None,
171 state: WriteState::Acquiring {
172 lock: self.mutex.lock(),
173 },
174 }
175 }
176
177 /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
178 ///
179 /// # Safety
180 ///
181 /// Caller must hold an upgradable read lock.
182 /// This will attempt to upgrade it to a write lock.
183
184 pub(super) unsafe fn try_upgrade(&self) -> bool {
185 self.state
186 .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
187 .is_ok()
188 }
189
190 /// # Safety
191 ///
192 /// Caller must hold an upgradable read lock.
193 /// This will upgrade it to a write lock.
194
195 pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
196 // Set `WRITER_BIT` and decrement the number of readers at the same time.
197 self.state
198 .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
199
200 RawUpgrade {
201 lock: Some(self),
202 listener: None,
203 _pin: PhantomPinned,
204 }
205 }
206
207 /// # Safety
208 ///
209 /// Caller must hold an upgradable read lock.
210 /// This will downgrade it to a stadard read lock.
211 #[inline]
212
213 pub(super) unsafe fn downgrade_upgradable_read(&self) {
214 self.mutex.unlock_unchecked();
215 }
216
217 /// # Safety
218 ///
219 /// Caller must hold a write lock.
220 /// This will downgrade it to a read lock.
221
222 pub(super) unsafe fn downgrade_write(&self) {
223 // Atomically downgrade state.
224 self.state
225 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
226
227 // Release the writer mutex.
228 self.mutex.unlock_unchecked();
229
230 // Trigger the "no writer" event.
231 self.no_writer.notify(1);
232 }
233
234 /// # Safety
235 ///
236 /// Caller must hold a write lock.
237 /// This will downgrade it to an upgradable read lock.
238
239 pub(super) unsafe fn downgrade_to_upgradable(&self) {
240 // Atomically downgrade state.
241 self.state
242 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
243 }
244
245 /// # Safety
246 ///
247 /// Caller must hold a read lock .
248 /// This will unlock that lock.
249
250 pub(super) unsafe fn read_unlock(&self) {
251 // Decrement the number of readers.
252 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
253 // If this was the last reader, trigger the "no readers" event.
254 self.no_readers.notify(1);
255 }
256 }
257
258 /// # Safety
259 ///
260 /// Caller must hold an upgradable read lock.
261 /// This will unlock that lock.
262
263 pub(super) unsafe fn upgradable_read_unlock(&self) {
264 // Decrement the number of readers.
265 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
266 // If this was the last reader, trigger the "no readers" event.
267 self.no_readers.notify(1);
268 }
269
270 // SAFETY: upgradable read guards acquire the writer mutex upon creation.
271 self.mutex.unlock_unchecked();
272 }
273
274 /// # Safety
275 ///
276 /// Caller must hold a write lock.
277 /// This will unlock that lock.
278
279 pub(super) unsafe fn write_unlock(&self) {
280 // Unset `WRITER_BIT`.
281 self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
282 // Trigger the "no writer" event.
283 self.no_writer.notify(1);
284
285 // Release the writer lock.
286 // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
287 self.mutex.unlock_unchecked();
288 }
289}
290
291pin_project_lite::pin_project! {
292 /// The future returned by [`RawRwLock::read`].
293
294 pub(super) struct RawRead<'a> {
295 // The lock that is being acquired.
296 pub(super) lock: &'a RawRwLock,
297
298 // The last-observed state of the lock.
299 state: usize,
300
301 // The listener for the "no writers" event.
302 listener: Option<EventListener>,
303
304 // Making this type `!Unpin` enables future optimizations.
305 #[pin]
306 _pin: PhantomPinned
307 }
308}
309
310impl<'a> EventListenerFuture for RawRead<'a> {
311 type Output = ();
312
313 fn poll_with_strategy<'x, S: Strategy<'x>>(
314 self: Pin<&mut Self>,
315 strategy: &mut S,
316 cx: &mut S::Context,
317 ) -> Poll<()> {
318 let this = self.project();
319
320 loop {
321 if *this.state & WRITER_BIT == 0 {
322 // Make sure the number of readers doesn't overflow.
323 if *this.state > core::isize::MAX as usize {
324 crate::abort();
325 }
326
327 // If nobody is holding a write lock or attempting to acquire it, increment the
328 // number of readers.
329 match this.lock.state.compare_exchange(
330 *this.state,
331 *this.state + ONE_READER,
332 Ordering::AcqRel,
333 Ordering::Acquire,
334 ) {
335 Ok(_) => return Poll::Ready(()),
336 Err(s) => *this.state = s,
337 }
338 } else {
339 // Start listening for "no writer" events.
340 let load_ordering = if this.listener.is_none() {
341 *this.listener = Some(this.lock.no_writer.listen());
342
343 // Make sure there really is no writer.
344 Ordering::SeqCst
345 } else {
346 // Wait for the writer to finish.
347 ready!(strategy.poll(this.listener, cx));
348
349 // Notify the next reader waiting in list.
350 this.lock.no_writer.notify(1);
351
352 // Check the state again.
353 Ordering::Acquire
354 };
355
356 // Reload the state.
357 *this.state = this.lock.state.load(load_ordering);
358 }
359 }
360 }
361}
362
363pin_project_lite::pin_project! {
364 /// The future returned by [`RawRwLock::upgradable_read`].
365 pub(super) struct RawUpgradableRead<'a> {
366 // The lock that is being acquired.
367 pub(super) lock: &'a RawRwLock,
368
369 // The mutex we are trying to acquire.
370 #[pin]
371 acquire: Lock<'a, ()>,
372 }
373}
374
375impl<'a> EventListenerFuture for RawUpgradableRead<'a> {
376 type Output = ();
377
378 fn poll_with_strategy<'x, S: Strategy<'x>>(
379 self: Pin<&mut Self>,
380 strategy: &mut S,
381 cx: &mut S::Context,
382 ) -> Poll<()> {
383 let this = self.project();
384
385 // Acquire the mutex.
386 let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx));
387 forget(mutex_guard);
388
389 // Load the current state.
390 let mut state = this.lock.state.load(Ordering::Acquire);
391
392 // Make sure the number of readers doesn't overflow.
393 if state > core::isize::MAX as usize {
394 crate::abort();
395 }
396
397 // Increment the number of readers.
398 loop {
399 match this.lock.state.compare_exchange(
400 state,
401 state + ONE_READER,
402 Ordering::AcqRel,
403 Ordering::Acquire,
404 ) {
405 Ok(_) => {
406 return Poll::Ready(());
407 }
408 Err(s) => state = s,
409 }
410 }
411 }
412}
413
414pin_project_lite::pin_project! {
415 /// The future returned by [`RawRwLock::write`].
416
417 pub(super) struct RawWrite<'a> {
418 // The lock that is being acquired.
419 pub(super) lock: &'a RawRwLock,
420
421 // Our listener for the "no readers" event.
422 no_readers: Option<EventListener>,
423
424 // Current state fof this future.
425 #[pin]
426 state: WriteState<'a>,
427 }
428
429 impl PinnedDrop for RawWrite<'_> {
430 fn drop(this: Pin<&mut Self>) {
431 let this = this.project();
432
433 if matches!(this.state.project(), WriteStateProj::WaitingReaders) {
434 // Safety: we hold a write lock, more or less.
435 unsafe {
436 this.lock.write_unlock();
437 }
438 }
439 }
440 }
441}
442
443pin_project_lite::pin_project! {
444 #[project = WriteStateProj]
445 #[project_replace = WriteStateProjReplace]
446 enum WriteState<'a> {
447 // We are currently acquiring the inner mutex.
448 Acquiring { #[pin] lock: Lock<'a, ()> },
449
450 // We are currently waiting for readers to finish.
451 WaitingReaders,
452
453 // The future has completed.
454 Acquired,
455 }
456}
457
458impl<'a> EventListenerFuture for RawWrite<'a> {
459 type Output = ();
460
461 fn poll_with_strategy<'x, S: Strategy<'x>>(
462 self: Pin<&mut Self>,
463 strategy: &mut S,
464 cx: &mut S::Context,
465 ) -> Poll<()> {
466 let mut this = self.project();
467
468 loop {
469 match this.state.as_mut().project() {
470 WriteStateProj::Acquiring { lock } => {
471 // First grab the mutex.
472 let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx));
473 forget(mutex_guard);
474
475 // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
476 let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
477
478 // If we just acquired the lock, return.
479 if new_state == WRITER_BIT {
480 this.state.as_mut().set(WriteState::Acquired);
481 return Poll::Ready(());
482 }
483
484 // Start waiting for the readers to finish.
485 *this.no_readers = Some(this.lock.no_readers.listen());
486 this.state.as_mut().set(WriteState::WaitingReaders);
487 }
488
489 WriteStateProj::WaitingReaders => {
490 let load_ordering = if this.no_readers.is_some() {
491 Ordering::Acquire
492 } else {
493 Ordering::SeqCst
494 };
495
496 // Check the state again.
497 if this.lock.state.load(load_ordering) == WRITER_BIT {
498 // We are the only ones holding the lock, return `Ready`.
499 this.state.as_mut().set(WriteState::Acquired);
500 return Poll::Ready(());
501 }
502
503 // Wait for the readers to finish.
504 if this.no_readers.is_none() {
505 // Register a listener.
506 *this.no_readers = Some(this.lock.no_readers.listen());
507 } else {
508 // Wait for the readers to finish.
509 ready!(strategy.poll(this.no_readers, cx));
510 };
511 }
512 WriteStateProj::Acquired => panic!("Write lock already acquired"),
513 }
514 }
515 }
516}
517
518pin_project_lite::pin_project! {
519 /// The future returned by [`RawRwLock::upgrade`].
520
521 pub(super) struct RawUpgrade<'a> {
522 lock: Option<&'a RawRwLock>,
523
524 // The event listener we are waiting on.
525 listener: Option<EventListener>,
526
527 // Keeping this future `!Unpin` enables future optimizations.
528 #[pin]
529 _pin: PhantomPinned
530 }
531
532 impl PinnedDrop for RawUpgrade<'_> {
533 fn drop(this: Pin<&mut Self>) {
534 let this = this.project();
535 if let Some(lock) = this.lock {
536 // SAFETY: we are dropping the future that would give us a write lock,
537 // so we don't need said lock anymore.
538 unsafe {
539 lock.write_unlock();
540 }
541 }
542 }
543 }
544}
545
546impl<'a> EventListenerFuture for RawUpgrade<'a> {
547 type Output = &'a RawRwLock;
548
549 fn poll_with_strategy<'x, S: Strategy<'x>>(
550 self: Pin<&mut Self>,
551 strategy: &mut S,
552 cx: &mut S::Context,
553 ) -> Poll<&'a RawRwLock> {
554 let this = self.project();
555 let lock = this.lock.expect("cannot poll future after completion");
556
557 // If there are readers, we need to wait for them to finish.
558 loop {
559 let load_ordering = if this.listener.is_some() {
560 Ordering::Acquire
561 } else {
562 Ordering::SeqCst
563 };
564
565 // See if the number of readers is zero.
566 let state = lock.state.load(load_ordering);
567 if state == WRITER_BIT {
568 break;
569 }
570
571 // If there are readers, wait for them to finish.
572 if this.listener.is_none() {
573 // Start listening for "no readers" events.
574 *this.listener = Some(lock.no_readers.listen());
575 } else {
576 // Wait for the readers to finish.
577 ready!(strategy.poll(this.listener, cx));
578 };
579 }
580
581 // We are done.
582 Poll::Ready(this.lock.take().unwrap())
583 }
584}
585
586impl<'a> RawUpgrade<'a> {
587 /// Whether the future returned `Poll::Ready(..)` at some point.
588 #[inline]
589 pub(super) fn is_ready(&self) -> bool {
590 self.lock.is_none()
591 }
592}