crossbeam_utils/sync/
sharded_lock.rs

1use std::boxed::Box;
2use std::cell::UnsafeCell;
3use std::collections::HashMap;
4use std::fmt;
5use std::marker::PhantomData;
6use std::mem;
7use std::ops::{Deref, DerefMut};
8use std::panic::{RefUnwindSafe, UnwindSafe};
9use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
10use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::thread::{self, ThreadId};
12use std::vec::Vec;
13
14use crate::sync::once_lock::OnceLock;
15use crate::CachePadded;
16
17/// The number of shards per sharded lock. Must be a power of two.
18const NUM_SHARDS: usize = 8;
19
20/// A shard containing a single reader-writer lock.
21struct Shard {
22    /// The inner reader-writer lock.
23    lock: RwLock<()>,
24
25    /// The write-guard keeping this shard locked.
26    ///
27    /// Write operations will lock each shard and store the guard here. These guards get dropped at
28    /// the same time the big guard is dropped.
29    write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
30}
31
32/// A sharded reader-writer lock.
33///
34/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
35/// are slower.
36///
37/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
38/// single cache line. Read operations will pick one of the shards depending on the current thread
39/// and lock it. Write operations need to lock all shards in succession.
40///
41/// By splitting the lock into shards, concurrent read operations will in most cases choose
42/// different shards and thus update different cache lines, which is good for scalability. However,
43/// write operations need to do more work and are therefore slower than usual.
44///
45/// The priority policy of the lock is dependent on the underlying operating system's
46/// implementation, and this type does not guarantee that any particular policy will be used.
47///
48/// # Poisoning
49///
50/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
51/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
52/// read operation, the lock will not be poisoned.
53///
54/// # Examples
55///
56/// ```
57/// use crossbeam_utils::sync::ShardedLock;
58///
59/// let lock = ShardedLock::new(5);
60///
61/// // Any number of read locks can be held at once.
62/// {
63///     let r1 = lock.read().unwrap();
64///     let r2 = lock.read().unwrap();
65///     assert_eq!(*r1, 5);
66///     assert_eq!(*r2, 5);
67/// } // Read locks are dropped at this point.
68///
69/// // However, only one write lock may be held.
70/// {
71///     let mut w = lock.write().unwrap();
72///     *w += 1;
73///     assert_eq!(*w, 6);
74/// } // Write lock is dropped here.
75/// ```
76///
77/// [`RwLock`]: std::sync::RwLock
78pub struct ShardedLock<T: ?Sized> {
79    /// A list of locks protecting the internal data.
80    shards: Box<[CachePadded<Shard>]>,
81
82    /// The internal data.
83    value: UnsafeCell<T>,
84}
85
86unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
87unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
88
89impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
90impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
91
92impl<T> ShardedLock<T> {
93    /// Creates a new sharded reader-writer lock.
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use crossbeam_utils::sync::ShardedLock;
99    ///
100    /// let lock = ShardedLock::new(5);
101    /// ```
102    pub fn new(value: T) -> ShardedLock<T> {
103        ShardedLock {
104            shards: (0..NUM_SHARDS)
105                .map(|_| {
106                    CachePadded::new(Shard {
107                        lock: RwLock::new(()),
108                        write_guard: UnsafeCell::new(None),
109                    })
110                })
111                .collect::<Box<[_]>>(),
112            value: UnsafeCell::new(value),
113        }
114    }
115
116    /// Consumes this lock, returning the underlying data.
117    ///
118    /// # Errors
119    ///
120    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
121    /// operation panics.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use crossbeam_utils::sync::ShardedLock;
127    ///
128    /// let lock = ShardedLock::new(String::new());
129    /// {
130    ///     let mut s = lock.write().unwrap();
131    ///     *s = "modified".to_owned();
132    /// }
133    /// assert_eq!(lock.into_inner().unwrap(), "modified");
134    /// ```
135    pub fn into_inner(self) -> LockResult<T> {
136        let is_poisoned = self.is_poisoned();
137        let inner = self.value.into_inner();
138
139        if is_poisoned {
140            Err(PoisonError::new(inner))
141        } else {
142            Ok(inner)
143        }
144    }
145}
146
147impl<T: ?Sized> ShardedLock<T> {
148    /// Returns `true` if the lock is poisoned.
149    ///
150    /// If another thread can still access the lock, it may become poisoned at any time. A `false`
151    /// result should not be trusted without additional synchronization.
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use crossbeam_utils::sync::ShardedLock;
157    /// use std::sync::Arc;
158    /// use std::thread;
159    ///
160    /// let lock = Arc::new(ShardedLock::new(0));
161    /// let c_lock = lock.clone();
162    ///
163    /// let _ = thread::spawn(move || {
164    ///     let _lock = c_lock.write().unwrap();
165    ///     panic!(); // the lock gets poisoned
166    /// }).join();
167    /// assert_eq!(lock.is_poisoned(), true);
168    /// ```
169    pub fn is_poisoned(&self) -> bool {
170        self.shards[0].lock.is_poisoned()
171    }
172
173    /// Returns a mutable reference to the underlying data.
174    ///
175    /// Since this call borrows the lock mutably, no actual locking needs to take place.
176    ///
177    /// # Errors
178    ///
179    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
180    /// operation panics.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use crossbeam_utils::sync::ShardedLock;
186    ///
187    /// let mut lock = ShardedLock::new(0);
188    /// *lock.get_mut().unwrap() = 10;
189    /// assert_eq!(*lock.read().unwrap(), 10);
190    /// ```
191    pub fn get_mut(&mut self) -> LockResult<&mut T> {
192        let is_poisoned = self.is_poisoned();
193        let inner = unsafe { &mut *self.value.get() };
194
195        if is_poisoned {
196            Err(PoisonError::new(inner))
197        } else {
198            Ok(inner)
199        }
200    }
201
202    /// Attempts to acquire this lock with shared read access.
203    ///
204    /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
205    /// is returned which will release the shared access when it is dropped. This method does not
206    /// provide any guarantees with respect to the ordering of whether contentious readers or
207    /// writers will acquire the lock first.
208    ///
209    /// # Errors
210    ///
211    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
212    /// operation panics.
213    ///
214    /// # Examples
215    ///
216    /// ```
217    /// use crossbeam_utils::sync::ShardedLock;
218    ///
219    /// let lock = ShardedLock::new(1);
220    ///
221    /// match lock.try_read() {
222    ///     Ok(n) => assert_eq!(*n, 1),
223    ///     Err(_) => unreachable!(),
224    /// };
225    /// ```
226    pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
227        // Take the current thread index and map it to a shard index. Thread indices will tend to
228        // distribute shards among threads equally, thus reducing contention due to read-locking.
229        let current_index = current_index().unwrap_or(0);
230        let shard_index = current_index & (self.shards.len() - 1);
231
232        match self.shards[shard_index].lock.try_read() {
233            Ok(guard) => Ok(ShardedLockReadGuard {
234                lock: self,
235                _guard: guard,
236                _marker: PhantomData,
237            }),
238            Err(TryLockError::Poisoned(err)) => {
239                let guard = ShardedLockReadGuard {
240                    lock: self,
241                    _guard: err.into_inner(),
242                    _marker: PhantomData,
243                };
244                Err(TryLockError::Poisoned(PoisonError::new(guard)))
245            }
246            Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
247        }
248    }
249
250    /// Locks with shared read access, blocking the current thread until it can be acquired.
251    ///
252    /// The calling thread will be blocked until there are no more writers which hold the lock.
253    /// There may be other readers currently inside the lock when this method returns. This method
254    /// does not provide any guarantees with respect to the ordering of whether contentious readers
255    /// or writers will acquire the lock first.
256    ///
257    /// Returns a guard which will release the shared access when dropped.
258    ///
259    /// # Errors
260    ///
261    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
262    /// operation panics.
263    ///
264    /// # Panics
265    ///
266    /// This method might panic when called if the lock is already held by the current thread.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use crossbeam_utils::sync::ShardedLock;
272    /// use std::sync::Arc;
273    /// use std::thread;
274    ///
275    /// let lock = Arc::new(ShardedLock::new(1));
276    /// let c_lock = lock.clone();
277    ///
278    /// let n = lock.read().unwrap();
279    /// assert_eq!(*n, 1);
280    ///
281    /// thread::spawn(move || {
282    ///     let r = c_lock.read();
283    ///     assert!(r.is_ok());
284    /// }).join().unwrap();
285    /// ```
286    pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
287        // Take the current thread index and map it to a shard index. Thread indices will tend to
288        // distribute shards among threads equally, thus reducing contention due to read-locking.
289        let current_index = current_index().unwrap_or(0);
290        let shard_index = current_index & (self.shards.len() - 1);
291
292        match self.shards[shard_index].lock.read() {
293            Ok(guard) => Ok(ShardedLockReadGuard {
294                lock: self,
295                _guard: guard,
296                _marker: PhantomData,
297            }),
298            Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
299                lock: self,
300                _guard: err.into_inner(),
301                _marker: PhantomData,
302            })),
303        }
304    }
305
306    /// Attempts to acquire this lock with exclusive write access.
307    ///
308    /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
309    /// is returned which will release the exclusive access when it is dropped. This method does
310    /// not provide any guarantees with respect to the ordering of whether contentious readers or
311    /// writers will acquire the lock first.
312    ///
313    /// # Errors
314    ///
315    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
316    /// operation panics.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use crossbeam_utils::sync::ShardedLock;
322    ///
323    /// let lock = ShardedLock::new(1);
324    ///
325    /// let n = lock.read().unwrap();
326    /// assert_eq!(*n, 1);
327    ///
328    /// assert!(lock.try_write().is_err());
329    /// ```
330    pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
331        let mut poisoned = false;
332        let mut blocked = None;
333
334        // Write-lock each shard in succession.
335        for (i, shard) in self.shards.iter().enumerate() {
336            let guard = match shard.lock.try_write() {
337                Ok(guard) => guard,
338                Err(TryLockError::Poisoned(err)) => {
339                    poisoned = true;
340                    err.into_inner()
341                }
342                Err(TryLockError::WouldBlock) => {
343                    blocked = Some(i);
344                    break;
345                }
346            };
347
348            // Store the guard into the shard.
349            unsafe {
350                let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
351                let dest: *mut _ = shard.write_guard.get();
352                *dest = Some(guard);
353            }
354        }
355
356        if let Some(i) = blocked {
357            // Unlock the shards in reverse order of locking.
358            for shard in self.shards[0..i].iter().rev() {
359                unsafe {
360                    let dest: *mut _ = shard.write_guard.get();
361                    let guard = (*dest).take();
362                    drop(guard);
363                }
364            }
365            Err(TryLockError::WouldBlock)
366        } else if poisoned {
367            let guard = ShardedLockWriteGuard {
368                lock: self,
369                _marker: PhantomData,
370            };
371            Err(TryLockError::Poisoned(PoisonError::new(guard)))
372        } else {
373            Ok(ShardedLockWriteGuard {
374                lock: self,
375                _marker: PhantomData,
376            })
377        }
378    }
379
380    /// Locks with exclusive write access, blocking the current thread until it can be acquired.
381    ///
382    /// The calling thread will be blocked until there are no more writers which hold the lock.
383    /// There may be other readers currently inside the lock when this method returns. This method
384    /// does not provide any guarantees with respect to the ordering of whether contentious readers
385    /// or writers will acquire the lock first.
386    ///
387    /// Returns a guard which will release the exclusive access when dropped.
388    ///
389    /// # Errors
390    ///
391    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
392    /// operation panics.
393    ///
394    /// # Panics
395    ///
396    /// This method might panic when called if the lock is already held by the current thread.
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// use crossbeam_utils::sync::ShardedLock;
402    ///
403    /// let lock = ShardedLock::new(1);
404    ///
405    /// let mut n = lock.write().unwrap();
406    /// *n = 2;
407    ///
408    /// assert!(lock.try_read().is_err());
409    /// ```
410    pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
411        let mut poisoned = false;
412
413        // Write-lock each shard in succession.
414        for shard in self.shards.iter() {
415            let guard = match shard.lock.write() {
416                Ok(guard) => guard,
417                Err(err) => {
418                    poisoned = true;
419                    err.into_inner()
420                }
421            };
422
423            // Store the guard into the shard.
424            unsafe {
425                let guard: RwLockWriteGuard<'_, ()> = guard;
426                let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
427                let dest: *mut _ = shard.write_guard.get();
428                *dest = Some(guard);
429            }
430        }
431
432        if poisoned {
433            Err(PoisonError::new(ShardedLockWriteGuard {
434                lock: self,
435                _marker: PhantomData,
436            }))
437        } else {
438            Ok(ShardedLockWriteGuard {
439                lock: self,
440                _marker: PhantomData,
441            })
442        }
443    }
444}
445
446impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
447    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448        match self.try_read() {
449            Ok(guard) => f
450                .debug_struct("ShardedLock")
451                .field("data", &&*guard)
452                .finish(),
453            Err(TryLockError::Poisoned(err)) => f
454                .debug_struct("ShardedLock")
455                .field("data", &&**err.get_ref())
456                .finish(),
457            Err(TryLockError::WouldBlock) => {
458                struct LockedPlaceholder;
459                impl fmt::Debug for LockedPlaceholder {
460                    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461                        f.write_str("<locked>")
462                    }
463                }
464                f.debug_struct("ShardedLock")
465                    .field("data", &LockedPlaceholder)
466                    .finish()
467            }
468        }
469    }
470}
471
472impl<T: Default> Default for ShardedLock<T> {
473    fn default() -> ShardedLock<T> {
474        ShardedLock::new(Default::default())
475    }
476}
477
478impl<T> From<T> for ShardedLock<T> {
479    fn from(t: T) -> Self {
480        ShardedLock::new(t)
481    }
482}
483
484/// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
485#[clippy::has_significant_drop]
486pub struct ShardedLockReadGuard<'a, T: ?Sized> {
487    lock: &'a ShardedLock<T>,
488    _guard: RwLockReadGuard<'a, ()>,
489    _marker: PhantomData<RwLockReadGuard<'a, T>>,
490}
491
492unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
493
494impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
495    type Target = T;
496
497    fn deref(&self) -> &T {
498        unsafe { &*self.lock.value.get() }
499    }
500}
501
502impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
503    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504        f.debug_struct("ShardedLockReadGuard")
505            .field("lock", &self.lock)
506            .finish()
507    }
508}
509
510impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
511    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512        (**self).fmt(f)
513    }
514}
515
516/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
517#[clippy::has_significant_drop]
518pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
519    lock: &'a ShardedLock<T>,
520    _marker: PhantomData<RwLockWriteGuard<'a, T>>,
521}
522
523unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
524
525impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
526    fn drop(&mut self) {
527        // Unlock the shards in reverse order of locking.
528        for shard in self.lock.shards.iter().rev() {
529            unsafe {
530                let dest: *mut _ = shard.write_guard.get();
531                let guard = (*dest).take();
532                drop(guard);
533            }
534        }
535    }
536}
537
538impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
539    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540        f.debug_struct("ShardedLockWriteGuard")
541            .field("lock", &self.lock)
542            .finish()
543    }
544}
545
546impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        (**self).fmt(f)
549    }
550}
551
552impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
553    type Target = T;
554
555    fn deref(&self) -> &T {
556        unsafe { &*self.lock.value.get() }
557    }
558}
559
560impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
561    fn deref_mut(&mut self) -> &mut T {
562        unsafe { &mut *self.lock.value.get() }
563    }
564}
565
566/// Returns a `usize` that identifies the current thread.
567///
568/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
569/// usually tend to be consecutive numbers between 0 and the number of running threads.
570///
571/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
572/// tearing down.
573#[inline]
574fn current_index() -> Option<usize> {
575    REGISTRATION.try_with(|reg| reg.index).ok()
576}
577
578/// The global registry keeping track of registered threads and indices.
579struct ThreadIndices {
580    /// Mapping from `ThreadId` to thread index.
581    mapping: HashMap<ThreadId, usize>,
582
583    /// A list of free indices.
584    free_list: Vec<usize>,
585
586    /// The next index to allocate if the free list is empty.
587    next_index: usize,
588}
589
590fn thread_indices() -> &'static Mutex<ThreadIndices> {
591    static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
592    fn init() -> Mutex<ThreadIndices> {
593        Mutex::new(ThreadIndices {
594            mapping: HashMap::new(),
595            free_list: Vec::new(),
596            next_index: 0,
597        })
598    }
599    THREAD_INDICES.get_or_init(init)
600}
601
602/// A registration of a thread with an index.
603///
604/// When dropped, unregisters the thread and frees the reserved index.
605struct Registration {
606    index: usize,
607    thread_id: ThreadId,
608}
609
610impl Drop for Registration {
611    fn drop(&mut self) {
612        let mut indices = thread_indices().lock().unwrap();
613        indices.mapping.remove(&self.thread_id);
614        indices.free_list.push(self.index);
615    }
616}
617
618std::thread_local! {
619    static REGISTRATION: Registration = {
620        let thread_id = thread::current().id();
621        let mut indices = thread_indices().lock().unwrap();
622
623        let index = match indices.free_list.pop() {
624            Some(i) => i,
625            None => {
626                let i = indices.next_index;
627                indices.next_index += 1;
628                i
629            }
630        };
631        indices.mapping.insert(thread_id, index);
632
633        Registration {
634            index,
635            thread_id,
636        }
637    };
638}