async_lock/rwlock/
raw.rs

1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use core::marker::PhantomPinned;
10use core::mem::forget;
11use core::pin::Pin;
12use core::task::Poll;
13
14use crate::sync::atomic::{AtomicUsize, Ordering};
15
16use event_listener::{Event, EventListener};
17use event_listener_strategy::{EventListenerFuture, Strategy};
18
19use crate::futures::Lock;
20use crate::Mutex;
21
22const WRITER_BIT: usize = 1;
23const ONE_READER: usize = 2;
24
25/// A "raw" RwLock that doesn't hold any data.
26pub(super) struct RawRwLock {
27    /// Acquired by the writer.
28    mutex: Mutex<()>,
29
30    /// Event triggered when the last reader is dropped.
31    no_readers: Event,
32
33    /// Event triggered when the writer is dropped.
34    no_writer: Event,
35
36    /// Current state of the lock.
37    ///
38    /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
39    /// trying to acquire it.
40    ///
41    /// The upper bits contain the number of currently active readers. Each active reader
42    /// increments the state by `ONE_READER`.
43    state: AtomicUsize,
44}
45
46impl RawRwLock {
47    const_fn! {
48        const_if: #[cfg(not(loom))];
49        #[inline]
50        pub(super) const fn new() -> Self {
51            RawRwLock {
52                mutex: Mutex::new(()),
53                no_readers: Event::new(),
54                no_writer: Event::new(),
55                state: AtomicUsize::new(0),
56            }
57        }
58    }
59
60    /// Returns `true` iff a read lock was successfully acquired.
61    pub(super) fn try_read(&self) -> bool {
62        let mut state = self.state.load(Ordering::Acquire);
63
64        loop {
65            // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
66            // a read lock here.
67            if state & WRITER_BIT != 0 {
68                return false;
69            }
70
71            // Make sure the number of readers doesn't overflow.
72            if state > core::isize::MAX as usize {
73                crate::abort();
74            }
75
76            // Increment the number of readers.
77            match self.state.compare_exchange(
78                state,
79                state + ONE_READER,
80                Ordering::AcqRel,
81                Ordering::Acquire,
82            ) {
83                Ok(_) => return true,
84                Err(s) => state = s,
85            }
86        }
87    }
88
89    #[inline]
90    pub(super) fn read(&self) -> RawRead<'_> {
91        RawRead {
92            lock: self,
93            state: self.state.load(Ordering::Acquire),
94            listener: None,
95            _pin: PhantomPinned,
96        }
97    }
98
99    /// Returns `true` iff an upgradable read lock was successfully acquired.
100
101    pub(super) fn try_upgradable_read(&self) -> bool {
102        // First try grabbing the mutex.
103        let lock = if let Some(lock) = self.mutex.try_lock() {
104            lock
105        } else {
106            return false;
107        };
108
109        forget(lock);
110
111        let mut state = self.state.load(Ordering::Acquire);
112
113        // Make sure the number of readers doesn't overflow.
114        if state > core::isize::MAX as usize {
115            crate::abort();
116        }
117
118        // Increment the number of readers.
119        loop {
120            match self.state.compare_exchange(
121                state,
122                state + ONE_READER,
123                Ordering::AcqRel,
124                Ordering::Acquire,
125            ) {
126                Ok(_) => return true,
127                Err(s) => state = s,
128            }
129        }
130    }
131
132    #[inline]
133
134    pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
135        RawUpgradableRead {
136            lock: self,
137            acquire: self.mutex.lock(),
138        }
139    }
140
141    /// Returs `true` iff a write lock was successfully acquired.
142
143    pub(super) fn try_write(&self) -> bool {
144        // First try grabbing the mutex.
145        let lock = if let Some(lock) = self.mutex.try_lock() {
146            lock
147        } else {
148            return false;
149        };
150
151        // If there are no readers, grab the write lock.
152        if self
153            .state
154            .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
155            .is_ok()
156        {
157            forget(lock);
158            true
159        } else {
160            drop(lock);
161            false
162        }
163    }
164
165    #[inline]
166
167    pub(super) fn write(&self) -> RawWrite<'_> {
168        RawWrite {
169            lock: self,
170            no_readers: None,
171            state: WriteState::Acquiring {
172                lock: self.mutex.lock(),
173            },
174        }
175    }
176
177    /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
178    ///
179    /// # Safety
180    ///
181    /// Caller must hold an upgradable read lock.
182    /// This will attempt to upgrade it to a write lock.
183
184    pub(super) unsafe fn try_upgrade(&self) -> bool {
185        self.state
186            .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
187            .is_ok()
188    }
189
190    /// # Safety
191    ///
192    /// Caller must hold an upgradable read lock.
193    /// This will upgrade it to a write lock.
194
195    pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
196        // Set `WRITER_BIT` and decrement the number of readers at the same time.
197        self.state
198            .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
199
200        RawUpgrade {
201            lock: Some(self),
202            listener: None,
203            _pin: PhantomPinned,
204        }
205    }
206
207    /// # Safety
208    ///
209    /// Caller must hold an upgradable read lock.
210    /// This will downgrade it to a stadard read lock.
211    #[inline]
212
213    pub(super) unsafe fn downgrade_upgradable_read(&self) {
214        self.mutex.unlock_unchecked();
215    }
216
217    /// # Safety
218    ///
219    /// Caller must hold a write lock.
220    /// This will downgrade it to a read lock.
221
222    pub(super) unsafe fn downgrade_write(&self) {
223        // Atomically downgrade state.
224        self.state
225            .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
226
227        // Release the writer mutex.
228        self.mutex.unlock_unchecked();
229
230        // Trigger the "no writer" event.
231        self.no_writer.notify(1);
232    }
233
234    /// # Safety
235    ///
236    /// Caller must hold a write lock.
237    /// This will downgrade it to an upgradable read lock.
238
239    pub(super) unsafe fn downgrade_to_upgradable(&self) {
240        // Atomically downgrade state.
241        self.state
242            .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
243    }
244
245    /// # Safety
246    ///
247    /// Caller must hold a read lock .
248    /// This will unlock that lock.
249
250    pub(super) unsafe fn read_unlock(&self) {
251        // Decrement the number of readers.
252        if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
253            // If this was the last reader, trigger the "no readers" event.
254            self.no_readers.notify(1);
255        }
256    }
257
258    /// # Safety
259    ///
260    /// Caller must hold an upgradable read lock.
261    /// This will unlock that lock.
262
263    pub(super) unsafe fn upgradable_read_unlock(&self) {
264        // Decrement the number of readers.
265        if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
266            // If this was the last reader, trigger the "no readers" event.
267            self.no_readers.notify(1);
268        }
269
270        // SAFETY: upgradable read guards acquire the writer mutex upon creation.
271        self.mutex.unlock_unchecked();
272    }
273
274    /// # Safety
275    ///
276    /// Caller must hold a write lock.
277    /// This will unlock that lock.
278
279    pub(super) unsafe fn write_unlock(&self) {
280        // Unset `WRITER_BIT`.
281        self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
282        // Trigger the "no writer" event.
283        self.no_writer.notify(1);
284
285        // Release the writer lock.
286        // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
287        self.mutex.unlock_unchecked();
288    }
289}
290
291pin_project_lite::pin_project! {
292    /// The future returned by [`RawRwLock::read`].
293
294    pub(super) struct RawRead<'a> {
295        // The lock that is being acquired.
296        pub(super) lock: &'a RawRwLock,
297
298        // The last-observed state of the lock.
299        state: usize,
300
301        // The listener for the "no writers" event.
302        listener: Option<EventListener>,
303
304        // Making this type `!Unpin` enables future optimizations.
305        #[pin]
306        _pin: PhantomPinned
307    }
308}
309
310impl<'a> EventListenerFuture for RawRead<'a> {
311    type Output = ();
312
313    fn poll_with_strategy<'x, S: Strategy<'x>>(
314        self: Pin<&mut Self>,
315        strategy: &mut S,
316        cx: &mut S::Context,
317    ) -> Poll<()> {
318        let this = self.project();
319
320        loop {
321            if *this.state & WRITER_BIT == 0 {
322                // Make sure the number of readers doesn't overflow.
323                if *this.state > core::isize::MAX as usize {
324                    crate::abort();
325                }
326
327                // If nobody is holding a write lock or attempting to acquire it, increment the
328                // number of readers.
329                match this.lock.state.compare_exchange(
330                    *this.state,
331                    *this.state + ONE_READER,
332                    Ordering::AcqRel,
333                    Ordering::Acquire,
334                ) {
335                    Ok(_) => return Poll::Ready(()),
336                    Err(s) => *this.state = s,
337                }
338            } else {
339                // Start listening for "no writer" events.
340                let load_ordering = if this.listener.is_none() {
341                    *this.listener = Some(this.lock.no_writer.listen());
342
343                    // Make sure there really is no writer.
344                    Ordering::SeqCst
345                } else {
346                    // Wait for the writer to finish.
347                    ready!(strategy.poll(this.listener, cx));
348
349                    // Notify the next reader waiting in list.
350                    this.lock.no_writer.notify(1);
351
352                    // Check the state again.
353                    Ordering::Acquire
354                };
355
356                // Reload the state.
357                *this.state = this.lock.state.load(load_ordering);
358            }
359        }
360    }
361}
362
363pin_project_lite::pin_project! {
364    /// The future returned by [`RawRwLock::upgradable_read`].
365    pub(super) struct RawUpgradableRead<'a> {
366        // The lock that is being acquired.
367        pub(super) lock: &'a RawRwLock,
368
369        // The mutex we are trying to acquire.
370        #[pin]
371        acquire: Lock<'a, ()>,
372    }
373}
374
375impl<'a> EventListenerFuture for RawUpgradableRead<'a> {
376    type Output = ();
377
378    fn poll_with_strategy<'x, S: Strategy<'x>>(
379        self: Pin<&mut Self>,
380        strategy: &mut S,
381        cx: &mut S::Context,
382    ) -> Poll<()> {
383        let this = self.project();
384
385        // Acquire the mutex.
386        let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx));
387        forget(mutex_guard);
388
389        // Load the current state.
390        let mut state = this.lock.state.load(Ordering::Acquire);
391
392        // Make sure the number of readers doesn't overflow.
393        if state > core::isize::MAX as usize {
394            crate::abort();
395        }
396
397        // Increment the number of readers.
398        loop {
399            match this.lock.state.compare_exchange(
400                state,
401                state + ONE_READER,
402                Ordering::AcqRel,
403                Ordering::Acquire,
404            ) {
405                Ok(_) => {
406                    return Poll::Ready(());
407                }
408                Err(s) => state = s,
409            }
410        }
411    }
412}
413
414pin_project_lite::pin_project! {
415    /// The future returned by [`RawRwLock::write`].
416
417    pub(super) struct RawWrite<'a> {
418        // The lock that is being acquired.
419        pub(super) lock: &'a RawRwLock,
420
421        // Our listener for the "no readers" event.
422        no_readers: Option<EventListener>,
423
424        // Current state fof this future.
425        #[pin]
426        state: WriteState<'a>,
427    }
428
429    impl PinnedDrop for RawWrite<'_> {
430        fn drop(this: Pin<&mut Self>) {
431            let this = this.project();
432
433            if matches!(this.state.project(), WriteStateProj::WaitingReaders) {
434                // Safety: we hold a write lock, more or less.
435                unsafe {
436                    this.lock.write_unlock();
437                }
438            }
439        }
440    }
441}
442
443pin_project_lite::pin_project! {
444    #[project = WriteStateProj]
445    #[project_replace = WriteStateProjReplace]
446    enum WriteState<'a> {
447        // We are currently acquiring the inner mutex.
448        Acquiring { #[pin] lock: Lock<'a, ()> },
449
450        // We are currently waiting for readers to finish.
451        WaitingReaders,
452
453        // The future has completed.
454        Acquired,
455    }
456}
457
458impl<'a> EventListenerFuture for RawWrite<'a> {
459    type Output = ();
460
461    fn poll_with_strategy<'x, S: Strategy<'x>>(
462        self: Pin<&mut Self>,
463        strategy: &mut S,
464        cx: &mut S::Context,
465    ) -> Poll<()> {
466        let mut this = self.project();
467
468        loop {
469            match this.state.as_mut().project() {
470                WriteStateProj::Acquiring { lock } => {
471                    // First grab the mutex.
472                    let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx));
473                    forget(mutex_guard);
474
475                    // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
476                    let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
477
478                    // If we just acquired the lock, return.
479                    if new_state == WRITER_BIT {
480                        this.state.as_mut().set(WriteState::Acquired);
481                        return Poll::Ready(());
482                    }
483
484                    // Start waiting for the readers to finish.
485                    *this.no_readers = Some(this.lock.no_readers.listen());
486                    this.state.as_mut().set(WriteState::WaitingReaders);
487                }
488
489                WriteStateProj::WaitingReaders => {
490                    let load_ordering = if this.no_readers.is_some() {
491                        Ordering::Acquire
492                    } else {
493                        Ordering::SeqCst
494                    };
495
496                    // Check the state again.
497                    if this.lock.state.load(load_ordering) == WRITER_BIT {
498                        // We are the only ones holding the lock, return `Ready`.
499                        this.state.as_mut().set(WriteState::Acquired);
500                        return Poll::Ready(());
501                    }
502
503                    // Wait for the readers to finish.
504                    if this.no_readers.is_none() {
505                        // Register a listener.
506                        *this.no_readers = Some(this.lock.no_readers.listen());
507                    } else {
508                        // Wait for the readers to finish.
509                        ready!(strategy.poll(this.no_readers, cx));
510                    };
511                }
512                WriteStateProj::Acquired => panic!("Write lock already acquired"),
513            }
514        }
515    }
516}
517
518pin_project_lite::pin_project! {
519    /// The future returned by [`RawRwLock::upgrade`].
520
521    pub(super) struct RawUpgrade<'a> {
522        lock: Option<&'a RawRwLock>,
523
524        // The event listener we are waiting on.
525        listener: Option<EventListener>,
526
527        // Keeping this future `!Unpin` enables future optimizations.
528        #[pin]
529        _pin: PhantomPinned
530    }
531
532    impl PinnedDrop for RawUpgrade<'_> {
533        fn drop(this: Pin<&mut Self>) {
534            let this = this.project();
535            if let Some(lock) = this.lock {
536                // SAFETY: we are dropping the future that would give us a write lock,
537                // so we don't need said lock anymore.
538                unsafe {
539                    lock.write_unlock();
540                }
541            }
542        }
543    }
544}
545
546impl<'a> EventListenerFuture for RawUpgrade<'a> {
547    type Output = &'a RawRwLock;
548
549    fn poll_with_strategy<'x, S: Strategy<'x>>(
550        self: Pin<&mut Self>,
551        strategy: &mut S,
552        cx: &mut S::Context,
553    ) -> Poll<&'a RawRwLock> {
554        let this = self.project();
555        let lock = this.lock.expect("cannot poll future after completion");
556
557        // If there are readers, we need to wait for them to finish.
558        loop {
559            let load_ordering = if this.listener.is_some() {
560                Ordering::Acquire
561            } else {
562                Ordering::SeqCst
563            };
564
565            // See if the number of readers is zero.
566            let state = lock.state.load(load_ordering);
567            if state == WRITER_BIT {
568                break;
569            }
570
571            // If there are readers, wait for them to finish.
572            if this.listener.is_none() {
573                // Start listening for "no readers" events.
574                *this.listener = Some(lock.no_readers.listen());
575            } else {
576                // Wait for the readers to finish.
577                ready!(strategy.poll(this.listener, cx));
578            };
579        }
580
581        // We are done.
582        Poll::Ready(this.lock.take().unwrap())
583    }
584}
585
586impl<'a> RawUpgrade<'a> {
587    /// Whether the future returned `Poll::Ready(..)` at some point.
588    #[inline]
589    pub(super) fn is_ready(&self) -> bool {
590        self.lock.is_none()
591    }
592}