async_lock/rwlock/
raw.rs

1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use core::marker::PhantomPinned;
10use core::mem::forget;
11use core::pin::Pin;
12use core::task::Poll;
13
14use crate::sync::atomic::{AtomicUsize, Ordering};
15
16use event_listener::{Event, EventListener};
17use event_listener_strategy::{EventListenerFuture, Strategy};
18
19use crate::futures::Lock;
20use crate::Mutex;
21
22const WRITER_BIT: usize = 1;
23const ONE_READER: usize = 2;
24
25/// A "raw" RwLock that doesn't hold any data.
26pub(super) struct RawRwLock {
27    /// Acquired by the writer.
28    mutex: Mutex<()>,
29
30    /// Event triggered when the last reader is dropped.
31    no_readers: Event,
32
33    /// Event triggered when the writer is dropped.
34    no_writer: Event,
35
36    /// Current state of the lock.
37    ///
38    /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
39    /// trying to acquire it.
40    ///
41    /// The upper bits contain the number of currently active readers. Each active reader
42    /// increments the state by `ONE_READER`.
43    state: AtomicUsize,
44}
45
46impl RawRwLock {
47    const_fn! {
48        const_if: #[cfg(not(loom))];
49        #[inline]
50        pub(super) const fn new() -> Self {
51            RawRwLock {
52                mutex: Mutex::new(()),
53                no_readers: Event::new(),
54                no_writer: Event::new(),
55                state: AtomicUsize::new(0),
56            }
57        }
58    }
59
60    /// Returns `true` iff a read lock was successfully acquired.
61    pub(super) fn try_read(&self) -> bool {
62        let mut state = self.state.load(Ordering::Acquire);
63
64        loop {
65            // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
66            // a read lock here.
67            if state & WRITER_BIT != 0 {
68                return false;
69            }
70
71            // Make sure the number of readers doesn't overflow.
72            if state > isize::MAX as usize {
73                crate::abort();
74            }
75
76            // Increment the number of readers.
77            match self.state.compare_exchange(
78                state,
79                state + ONE_READER,
80                Ordering::AcqRel,
81                Ordering::Acquire,
82            ) {
83                Ok(_) => return true,
84                Err(s) => state = s,
85            }
86        }
87    }
88
89    #[inline]
90    pub(super) fn read(&self) -> RawRead<'_> {
91        RawRead {
92            lock: self,
93            state: self.state.load(Ordering::Acquire),
94            listener: None,
95            _pin: PhantomPinned,
96        }
97    }
98
99    /// Returns `true` iff an upgradable read lock was successfully acquired.
100    pub(super) fn try_upgradable_read(&self) -> bool {
101        // First try grabbing the mutex.
102        let lock = if let Some(lock) = self.mutex.try_lock() {
103            lock
104        } else {
105            return false;
106        };
107
108        forget(lock);
109
110        let mut state = self.state.load(Ordering::Acquire);
111
112        // Make sure the number of readers doesn't overflow.
113        if state > isize::MAX as usize {
114            crate::abort();
115        }
116
117        // Increment the number of readers.
118        loop {
119            match self.state.compare_exchange(
120                state,
121                state + ONE_READER,
122                Ordering::AcqRel,
123                Ordering::Acquire,
124            ) {
125                Ok(_) => return true,
126                Err(s) => state = s,
127            }
128        }
129    }
130
131    #[inline]
132    pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
133        RawUpgradableRead {
134            lock: self,
135            acquire: self.mutex.lock(),
136        }
137    }
138
139    /// Returns `true` iff a write lock was successfully acquired.
140    pub(super) fn try_write(&self) -> bool {
141        // First try grabbing the mutex.
142        let lock = if let Some(lock) = self.mutex.try_lock() {
143            lock
144        } else {
145            return false;
146        };
147
148        // If there are no readers, grab the write lock.
149        if self
150            .state
151            .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
152            .is_ok()
153        {
154            forget(lock);
155            true
156        } else {
157            drop(lock);
158            false
159        }
160    }
161
162    #[inline]
163    pub(super) fn write(&self) -> RawWrite<'_> {
164        RawWrite {
165            lock: self,
166            no_readers: None,
167            state: WriteState::Acquiring {
168                lock: self.mutex.lock(),
169            },
170        }
171    }
172
173    /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
174    ///
175    /// # Safety
176    ///
177    /// Caller must hold an upgradable read lock.
178    /// This will attempt to upgrade it to a write lock.
179    pub(super) unsafe fn try_upgrade(&self) -> bool {
180        self.state
181            .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
182            .is_ok()
183    }
184
185    /// # Safety
186    ///
187    /// Caller must hold an upgradable read lock.
188    /// This will upgrade it to a write lock.
189    pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
190        // Set `WRITER_BIT` and decrement the number of readers at the same time.
191        self.state
192            .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
193
194        RawUpgrade {
195            lock: Some(self),
196            listener: None,
197            _pin: PhantomPinned,
198        }
199    }
200
201    /// # Safety
202    ///
203    /// Caller must hold an upgradable read lock.
204    /// This will downgrade it to a standard read lock.
205    #[inline]
206    pub(super) unsafe fn downgrade_upgradable_read(&self) {
207        self.mutex.unlock_unchecked();
208    }
209
210    /// # Safety
211    ///
212    /// Caller must hold a write lock.
213    /// This will downgrade it to a read lock.
214    pub(super) unsafe fn downgrade_write(&self) {
215        // Atomically downgrade state.
216        self.state
217            .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
218
219        // Release the writer mutex.
220        self.mutex.unlock_unchecked();
221
222        // Trigger the "no writer" event.
223        self.no_writer.notify(1);
224    }
225
226    /// # Safety
227    ///
228    /// Caller must hold a write lock.
229    /// This will downgrade it to an upgradable read lock.
230    pub(super) unsafe fn downgrade_to_upgradable(&self) {
231        // Atomically downgrade state.
232        self.state
233            .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
234    }
235
236    /// # Safety
237    ///
238    /// Caller must hold a read lock .
239    /// This will unlock that lock.
240    pub(super) unsafe fn read_unlock(&self) {
241        // Decrement the number of readers.
242        if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
243            // If this was the last reader, trigger the "no readers" event.
244            self.no_readers.notify(1);
245        }
246    }
247
248    /// # Safety
249    ///
250    /// Caller must hold an upgradable read lock.
251    /// This will unlock that lock.
252    pub(super) unsafe fn upgradable_read_unlock(&self) {
253        // Decrement the number of readers.
254        if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
255            // If this was the last reader, trigger the "no readers" event.
256            self.no_readers.notify(1);
257        }
258
259        // SAFETY: upgradable read guards acquire the writer mutex upon creation.
260        self.mutex.unlock_unchecked();
261    }
262
263    /// # Safety
264    ///
265    /// Caller must hold a write lock.
266    /// This will unlock that lock.
267    pub(super) unsafe fn write_unlock(&self) {
268        // Unset `WRITER_BIT`.
269        self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
270        // Trigger the "no writer" event.
271        self.no_writer.notify(1);
272
273        // Release the writer lock.
274        // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
275        self.mutex.unlock_unchecked();
276    }
277}
278
279pin_project_lite::pin_project! {
280    /// The future returned by [`RawRwLock::read`].
281
282    pub(super) struct RawRead<'a> {
283        // The lock that is being acquired.
284        pub(super) lock: &'a RawRwLock,
285
286        // The last-observed state of the lock.
287        state: usize,
288
289        // The listener for the "no writers" event.
290        listener: Option<EventListener>,
291
292        // Making this type `!Unpin` enables future optimizations.
293        #[pin]
294        _pin: PhantomPinned
295    }
296}
297
298impl EventListenerFuture for RawRead<'_> {
299    type Output = ();
300
301    fn poll_with_strategy<'x, S: Strategy<'x>>(
302        self: Pin<&mut Self>,
303        strategy: &mut S,
304        cx: &mut S::Context,
305    ) -> Poll<()> {
306        let this = self.project();
307
308        loop {
309            if *this.state & WRITER_BIT == 0 {
310                // Make sure the number of readers doesn't overflow.
311                if *this.state > isize::MAX as usize {
312                    crate::abort();
313                }
314
315                // If nobody is holding a write lock or attempting to acquire it, increment the
316                // number of readers.
317                match this.lock.state.compare_exchange(
318                    *this.state,
319                    *this.state + ONE_READER,
320                    Ordering::AcqRel,
321                    Ordering::Acquire,
322                ) {
323                    Ok(_) => return Poll::Ready(()),
324                    Err(s) => *this.state = s,
325                }
326            } else {
327                // Start listening for "no writer" events.
328                let load_ordering = if this.listener.is_none() {
329                    *this.listener = Some(this.lock.no_writer.listen());
330
331                    // Make sure there really is no writer.
332                    Ordering::SeqCst
333                } else {
334                    // Wait for the writer to finish.
335                    ready!(strategy.poll(this.listener, cx));
336
337                    // Notify the next reader waiting in list.
338                    this.lock.no_writer.notify(1);
339
340                    // Check the state again.
341                    Ordering::Acquire
342                };
343
344                // Reload the state.
345                *this.state = this.lock.state.load(load_ordering);
346            }
347        }
348    }
349}
350
351pin_project_lite::pin_project! {
352    /// The future returned by [`RawRwLock::upgradable_read`].
353    pub(super) struct RawUpgradableRead<'a> {
354        // The lock that is being acquired.
355        pub(super) lock: &'a RawRwLock,
356
357        // The mutex we are trying to acquire.
358        #[pin]
359        acquire: Lock<'a, ()>,
360    }
361}
362
363impl EventListenerFuture for RawUpgradableRead<'_> {
364    type Output = ();
365
366    fn poll_with_strategy<'x, S: Strategy<'x>>(
367        self: Pin<&mut Self>,
368        strategy: &mut S,
369        cx: &mut S::Context,
370    ) -> Poll<()> {
371        let this = self.project();
372
373        // Acquire the mutex.
374        let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx));
375        forget(mutex_guard);
376
377        // Load the current state.
378        let mut state = this.lock.state.load(Ordering::Acquire);
379
380        // Make sure the number of readers doesn't overflow.
381        if state > isize::MAX as usize {
382            crate::abort();
383        }
384
385        // Increment the number of readers.
386        loop {
387            match this.lock.state.compare_exchange(
388                state,
389                state + ONE_READER,
390                Ordering::AcqRel,
391                Ordering::Acquire,
392            ) {
393                Ok(_) => {
394                    return Poll::Ready(());
395                }
396                Err(s) => state = s,
397            }
398        }
399    }
400}
401
402pin_project_lite::pin_project! {
403    /// The future returned by [`RawRwLock::write`].
404
405    pub(super) struct RawWrite<'a> {
406        // The lock that is being acquired.
407        pub(super) lock: &'a RawRwLock,
408
409        // Our listener for the "no readers" event.
410        no_readers: Option<EventListener>,
411
412        // Current state of this future.
413        #[pin]
414        state: WriteState<'a>,
415    }
416
417    impl PinnedDrop for RawWrite<'_> {
418        fn drop(this: Pin<&mut Self>) {
419            let this = this.project();
420
421            if matches!(this.state.project(), WriteStateProj::WaitingReaders) {
422                // Safety: we hold a write lock, more or less.
423                unsafe {
424                    this.lock.write_unlock();
425                }
426            }
427        }
428    }
429}
430
431pin_project_lite::pin_project! {
432    #[project = WriteStateProj]
433    #[project_replace = WriteStateProjReplace]
434    enum WriteState<'a> {
435        // We are currently acquiring the inner mutex.
436        Acquiring { #[pin] lock: Lock<'a, ()> },
437
438        // We are currently waiting for readers to finish.
439        WaitingReaders,
440
441        // The future has completed.
442        Acquired,
443    }
444}
445
446impl EventListenerFuture for RawWrite<'_> {
447    type Output = ();
448
449    fn poll_with_strategy<'x, S: Strategy<'x>>(
450        self: Pin<&mut Self>,
451        strategy: &mut S,
452        cx: &mut S::Context,
453    ) -> Poll<()> {
454        let mut this = self.project();
455
456        loop {
457            match this.state.as_mut().project() {
458                WriteStateProj::Acquiring { lock } => {
459                    // First grab the mutex.
460                    let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx));
461                    forget(mutex_guard);
462
463                    // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
464                    let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
465
466                    // If we just acquired the lock, return.
467                    if new_state == WRITER_BIT {
468                        this.state.as_mut().set(WriteState::Acquired);
469                        return Poll::Ready(());
470                    }
471
472                    // Start waiting for the readers to finish.
473                    *this.no_readers = Some(this.lock.no_readers.listen());
474                    this.state.as_mut().set(WriteState::WaitingReaders);
475                }
476
477                WriteStateProj::WaitingReaders => {
478                    let load_ordering = if this.no_readers.is_some() {
479                        Ordering::Acquire
480                    } else {
481                        Ordering::SeqCst
482                    };
483
484                    // Check the state again.
485                    if this.lock.state.load(load_ordering) == WRITER_BIT {
486                        // We are the only ones holding the lock, return `Ready`.
487                        this.state.as_mut().set(WriteState::Acquired);
488                        return Poll::Ready(());
489                    }
490
491                    // Wait for the readers to finish.
492                    if this.no_readers.is_none() {
493                        // Register a listener.
494                        *this.no_readers = Some(this.lock.no_readers.listen());
495                    } else {
496                        // Wait for the readers to finish.
497                        ready!(strategy.poll(this.no_readers, cx));
498                    };
499                }
500                WriteStateProj::Acquired => panic!("Write lock already acquired"),
501            }
502        }
503    }
504}
505
506pin_project_lite::pin_project! {
507    /// The future returned by [`RawRwLock::upgrade`].
508
509    pub(super) struct RawUpgrade<'a> {
510        lock: Option<&'a RawRwLock>,
511
512        // The event listener we are waiting on.
513        listener: Option<EventListener>,
514
515        // Keeping this future `!Unpin` enables future optimizations.
516        #[pin]
517        _pin: PhantomPinned
518    }
519
520    impl PinnedDrop for RawUpgrade<'_> {
521        fn drop(this: Pin<&mut Self>) {
522            let this = this.project();
523            if let Some(lock) = this.lock {
524                // SAFETY: we are dropping the future that would give us a write lock,
525                // so we don't need said lock anymore.
526                unsafe {
527                    lock.write_unlock();
528                }
529            }
530        }
531    }
532}
533
534impl<'a> EventListenerFuture for RawUpgrade<'a> {
535    type Output = &'a RawRwLock;
536
537    fn poll_with_strategy<'x, S: Strategy<'x>>(
538        self: Pin<&mut Self>,
539        strategy: &mut S,
540        cx: &mut S::Context,
541    ) -> Poll<&'a RawRwLock> {
542        let this = self.project();
543        let lock = this.lock.expect("cannot poll future after completion");
544
545        // If there are readers, we need to wait for them to finish.
546        loop {
547            let load_ordering = if this.listener.is_some() {
548                Ordering::Acquire
549            } else {
550                Ordering::SeqCst
551            };
552
553            // See if the number of readers is zero.
554            let state = lock.state.load(load_ordering);
555            if state == WRITER_BIT {
556                break;
557            }
558
559            // If there are readers, wait for them to finish.
560            if this.listener.is_none() {
561                // Start listening for "no readers" events.
562                *this.listener = Some(lock.no_readers.listen());
563            } else {
564                // Wait for the readers to finish.
565                ready!(strategy.poll(this.listener, cx));
566            };
567        }
568
569        // We are done.
570        Poll::Ready(this.lock.take().unwrap())
571    }
572}
573
574impl RawUpgrade<'_> {
575    /// Whether the future returned `Poll::Ready(..)` at some point.
576    #[inline]
577    pub(super) fn is_ready(&self) -> bool {
578        self.lock.is_none()
579    }
580}