crossbeam_utils/sync/sharded_lock.rs
1use std::boxed::Box;
2use std::cell::UnsafeCell;
3use std::collections::HashMap;
4use std::fmt;
5use std::marker::PhantomData;
6use std::mem;
7use std::ops::{Deref, DerefMut};
8use std::panic::{RefUnwindSafe, UnwindSafe};
9use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
10use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::thread::{self, ThreadId};
12use std::vec::Vec;
13
14use crate::sync::once_lock::OnceLock;
15use crate::CachePadded;
16
17/// The number of shards per sharded lock. Must be a power of two.
18const NUM_SHARDS: usize = 8;
19
20/// A shard containing a single reader-writer lock.
21struct Shard {
22 /// The inner reader-writer lock.
23 lock: RwLock<()>,
24
25 /// The write-guard keeping this shard locked.
26 ///
27 /// Write operations will lock each shard and store the guard here. These guards get dropped at
28 /// the same time the big guard is dropped.
29 write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
30}
31
32/// A sharded reader-writer lock.
33///
34/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
35/// are slower.
36///
37/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
38/// single cache line. Read operations will pick one of the shards depending on the current thread
39/// and lock it. Write operations need to lock all shards in succession.
40///
41/// By splitting the lock into shards, concurrent read operations will in most cases choose
42/// different shards and thus update different cache lines, which is good for scalability. However,
43/// write operations need to do more work and are therefore slower than usual.
44///
45/// The priority policy of the lock is dependent on the underlying operating system's
46/// implementation, and this type does not guarantee that any particular policy will be used.
47///
48/// # Poisoning
49///
50/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
51/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
52/// read operation, the lock will not be poisoned.
53///
54/// # Examples
55///
56/// ```
57/// use crossbeam_utils::sync::ShardedLock;
58///
59/// let lock = ShardedLock::new(5);
60///
61/// // Any number of read locks can be held at once.
62/// {
63/// let r1 = lock.read().unwrap();
64/// let r2 = lock.read().unwrap();
65/// assert_eq!(*r1, 5);
66/// assert_eq!(*r2, 5);
67/// } // Read locks are dropped at this point.
68///
69/// // However, only one write lock may be held.
70/// {
71/// let mut w = lock.write().unwrap();
72/// *w += 1;
73/// assert_eq!(*w, 6);
74/// } // Write lock is dropped here.
75/// ```
76///
77/// [`RwLock`]: std::sync::RwLock
78pub struct ShardedLock<T: ?Sized> {
79 /// A list of locks protecting the internal data.
80 shards: Box<[CachePadded<Shard>]>,
81
82 /// The internal data.
83 value: UnsafeCell<T>,
84}
85
86unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
87unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
88
89impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
90impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
91
92impl<T> ShardedLock<T> {
93 /// Creates a new sharded reader-writer lock.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use crossbeam_utils::sync::ShardedLock;
99 ///
100 /// let lock = ShardedLock::new(5);
101 /// ```
102 pub fn new(value: T) -> ShardedLock<T> {
103 ShardedLock {
104 shards: (0..NUM_SHARDS)
105 .map(|_| {
106 CachePadded::new(Shard {
107 lock: RwLock::new(()),
108 write_guard: UnsafeCell::new(None),
109 })
110 })
111 .collect::<Box<[_]>>(),
112 value: UnsafeCell::new(value),
113 }
114 }
115
116 /// Consumes this lock, returning the underlying data.
117 ///
118 /// # Errors
119 ///
120 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
121 /// operation panics.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use crossbeam_utils::sync::ShardedLock;
127 ///
128 /// let lock = ShardedLock::new(String::new());
129 /// {
130 /// let mut s = lock.write().unwrap();
131 /// *s = "modified".to_owned();
132 /// }
133 /// assert_eq!(lock.into_inner().unwrap(), "modified");
134 /// ```
135 pub fn into_inner(self) -> LockResult<T> {
136 let is_poisoned = self.is_poisoned();
137 let inner = self.value.into_inner();
138
139 if is_poisoned {
140 Err(PoisonError::new(inner))
141 } else {
142 Ok(inner)
143 }
144 }
145}
146
147impl<T: ?Sized> ShardedLock<T> {
148 /// Returns `true` if the lock is poisoned.
149 ///
150 /// If another thread can still access the lock, it may become poisoned at any time. A `false`
151 /// result should not be trusted without additional synchronization.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use crossbeam_utils::sync::ShardedLock;
157 /// use std::sync::Arc;
158 /// use std::thread;
159 ///
160 /// let lock = Arc::new(ShardedLock::new(0));
161 /// let c_lock = lock.clone();
162 ///
163 /// let _ = thread::spawn(move || {
164 /// let _lock = c_lock.write().unwrap();
165 /// panic!(); // the lock gets poisoned
166 /// }).join();
167 /// assert_eq!(lock.is_poisoned(), true);
168 /// ```
169 pub fn is_poisoned(&self) -> bool {
170 self.shards[0].lock.is_poisoned()
171 }
172
173 /// Returns a mutable reference to the underlying data.
174 ///
175 /// Since this call borrows the lock mutably, no actual locking needs to take place.
176 ///
177 /// # Errors
178 ///
179 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
180 /// operation panics.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use crossbeam_utils::sync::ShardedLock;
186 ///
187 /// let mut lock = ShardedLock::new(0);
188 /// *lock.get_mut().unwrap() = 10;
189 /// assert_eq!(*lock.read().unwrap(), 10);
190 /// ```
191 pub fn get_mut(&mut self) -> LockResult<&mut T> {
192 let is_poisoned = self.is_poisoned();
193 let inner = unsafe { &mut *self.value.get() };
194
195 if is_poisoned {
196 Err(PoisonError::new(inner))
197 } else {
198 Ok(inner)
199 }
200 }
201
202 /// Attempts to acquire this lock with shared read access.
203 ///
204 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
205 /// is returned which will release the shared access when it is dropped. This method does not
206 /// provide any guarantees with respect to the ordering of whether contentious readers or
207 /// writers will acquire the lock first.
208 ///
209 /// # Errors
210 ///
211 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
212 /// operation panics.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use crossbeam_utils::sync::ShardedLock;
218 ///
219 /// let lock = ShardedLock::new(1);
220 ///
221 /// match lock.try_read() {
222 /// Ok(n) => assert_eq!(*n, 1),
223 /// Err(_) => unreachable!(),
224 /// };
225 /// ```
226 pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
227 // Take the current thread index and map it to a shard index. Thread indices will tend to
228 // distribute shards among threads equally, thus reducing contention due to read-locking.
229 let current_index = current_index().unwrap_or(0);
230 let shard_index = current_index & (self.shards.len() - 1);
231
232 match self.shards[shard_index].lock.try_read() {
233 Ok(guard) => Ok(ShardedLockReadGuard {
234 lock: self,
235 _guard: guard,
236 _marker: PhantomData,
237 }),
238 Err(TryLockError::Poisoned(err)) => {
239 let guard = ShardedLockReadGuard {
240 lock: self,
241 _guard: err.into_inner(),
242 _marker: PhantomData,
243 };
244 Err(TryLockError::Poisoned(PoisonError::new(guard)))
245 }
246 Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
247 }
248 }
249
250 /// Locks with shared read access, blocking the current thread until it can be acquired.
251 ///
252 /// The calling thread will be blocked until there are no more writers which hold the lock.
253 /// There may be other readers currently inside the lock when this method returns. This method
254 /// does not provide any guarantees with respect to the ordering of whether contentious readers
255 /// or writers will acquire the lock first.
256 ///
257 /// Returns a guard which will release the shared access when dropped.
258 ///
259 /// # Errors
260 ///
261 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
262 /// operation panics.
263 ///
264 /// # Panics
265 ///
266 /// This method might panic when called if the lock is already held by the current thread.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use crossbeam_utils::sync::ShardedLock;
272 /// use std::sync::Arc;
273 /// use std::thread;
274 ///
275 /// let lock = Arc::new(ShardedLock::new(1));
276 /// let c_lock = lock.clone();
277 ///
278 /// let n = lock.read().unwrap();
279 /// assert_eq!(*n, 1);
280 ///
281 /// thread::spawn(move || {
282 /// let r = c_lock.read();
283 /// assert!(r.is_ok());
284 /// }).join().unwrap();
285 /// ```
286 pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
287 // Take the current thread index and map it to a shard index. Thread indices will tend to
288 // distribute shards among threads equally, thus reducing contention due to read-locking.
289 let current_index = current_index().unwrap_or(0);
290 let shard_index = current_index & (self.shards.len() - 1);
291
292 match self.shards[shard_index].lock.read() {
293 Ok(guard) => Ok(ShardedLockReadGuard {
294 lock: self,
295 _guard: guard,
296 _marker: PhantomData,
297 }),
298 Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
299 lock: self,
300 _guard: err.into_inner(),
301 _marker: PhantomData,
302 })),
303 }
304 }
305
306 /// Attempts to acquire this lock with exclusive write access.
307 ///
308 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
309 /// is returned which will release the exclusive access when it is dropped. This method does
310 /// not provide any guarantees with respect to the ordering of whether contentious readers or
311 /// writers will acquire the lock first.
312 ///
313 /// # Errors
314 ///
315 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
316 /// operation panics.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use crossbeam_utils::sync::ShardedLock;
322 ///
323 /// let lock = ShardedLock::new(1);
324 ///
325 /// let n = lock.read().unwrap();
326 /// assert_eq!(*n, 1);
327 ///
328 /// assert!(lock.try_write().is_err());
329 /// ```
330 pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
331 let mut poisoned = false;
332 let mut blocked = None;
333
334 // Write-lock each shard in succession.
335 for (i, shard) in self.shards.iter().enumerate() {
336 let guard = match shard.lock.try_write() {
337 Ok(guard) => guard,
338 Err(TryLockError::Poisoned(err)) => {
339 poisoned = true;
340 err.into_inner()
341 }
342 Err(TryLockError::WouldBlock) => {
343 blocked = Some(i);
344 break;
345 }
346 };
347
348 // Store the guard into the shard.
349 unsafe {
350 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
351 let dest: *mut _ = shard.write_guard.get();
352 *dest = Some(guard);
353 }
354 }
355
356 if let Some(i) = blocked {
357 // Unlock the shards in reverse order of locking.
358 for shard in self.shards[0..i].iter().rev() {
359 unsafe {
360 let dest: *mut _ = shard.write_guard.get();
361 let guard = (*dest).take();
362 drop(guard);
363 }
364 }
365 Err(TryLockError::WouldBlock)
366 } else if poisoned {
367 let guard = ShardedLockWriteGuard {
368 lock: self,
369 _marker: PhantomData,
370 };
371 Err(TryLockError::Poisoned(PoisonError::new(guard)))
372 } else {
373 Ok(ShardedLockWriteGuard {
374 lock: self,
375 _marker: PhantomData,
376 })
377 }
378 }
379
380 /// Locks with exclusive write access, blocking the current thread until it can be acquired.
381 ///
382 /// The calling thread will be blocked until there are no more writers which hold the lock.
383 /// There may be other readers currently inside the lock when this method returns. This method
384 /// does not provide any guarantees with respect to the ordering of whether contentious readers
385 /// or writers will acquire the lock first.
386 ///
387 /// Returns a guard which will release the exclusive access when dropped.
388 ///
389 /// # Errors
390 ///
391 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
392 /// operation panics.
393 ///
394 /// # Panics
395 ///
396 /// This method might panic when called if the lock is already held by the current thread.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use crossbeam_utils::sync::ShardedLock;
402 ///
403 /// let lock = ShardedLock::new(1);
404 ///
405 /// let mut n = lock.write().unwrap();
406 /// *n = 2;
407 ///
408 /// assert!(lock.try_read().is_err());
409 /// ```
410 pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
411 let mut poisoned = false;
412
413 // Write-lock each shard in succession.
414 for shard in self.shards.iter() {
415 let guard = match shard.lock.write() {
416 Ok(guard) => guard,
417 Err(err) => {
418 poisoned = true;
419 err.into_inner()
420 }
421 };
422
423 // Store the guard into the shard.
424 unsafe {
425 let guard: RwLockWriteGuard<'_, ()> = guard;
426 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
427 let dest: *mut _ = shard.write_guard.get();
428 *dest = Some(guard);
429 }
430 }
431
432 if poisoned {
433 Err(PoisonError::new(ShardedLockWriteGuard {
434 lock: self,
435 _marker: PhantomData,
436 }))
437 } else {
438 Ok(ShardedLockWriteGuard {
439 lock: self,
440 _marker: PhantomData,
441 })
442 }
443 }
444}
445
446impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
447 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448 match self.try_read() {
449 Ok(guard) => f
450 .debug_struct("ShardedLock")
451 .field("data", &&*guard)
452 .finish(),
453 Err(TryLockError::Poisoned(err)) => f
454 .debug_struct("ShardedLock")
455 .field("data", &&**err.get_ref())
456 .finish(),
457 Err(TryLockError::WouldBlock) => {
458 struct LockedPlaceholder;
459 impl fmt::Debug for LockedPlaceholder {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 f.write_str("<locked>")
462 }
463 }
464 f.debug_struct("ShardedLock")
465 .field("data", &LockedPlaceholder)
466 .finish()
467 }
468 }
469 }
470}
471
472impl<T: Default> Default for ShardedLock<T> {
473 fn default() -> ShardedLock<T> {
474 ShardedLock::new(Default::default())
475 }
476}
477
478impl<T> From<T> for ShardedLock<T> {
479 fn from(t: T) -> Self {
480 ShardedLock::new(t)
481 }
482}
483
484/// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
485#[clippy::has_significant_drop]
486pub struct ShardedLockReadGuard<'a, T: ?Sized> {
487 lock: &'a ShardedLock<T>,
488 _guard: RwLockReadGuard<'a, ()>,
489 _marker: PhantomData<RwLockReadGuard<'a, T>>,
490}
491
492unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
493
494impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
495 type Target = T;
496
497 fn deref(&self) -> &T {
498 unsafe { &*self.lock.value.get() }
499 }
500}
501
502impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
503 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504 f.debug_struct("ShardedLockReadGuard")
505 .field("lock", &self.lock)
506 .finish()
507 }
508}
509
510impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512 (**self).fmt(f)
513 }
514}
515
516/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
517#[clippy::has_significant_drop]
518pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
519 lock: &'a ShardedLock<T>,
520 _marker: PhantomData<RwLockWriteGuard<'a, T>>,
521}
522
523unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
524
525impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
526 fn drop(&mut self) {
527 // Unlock the shards in reverse order of locking.
528 for shard in self.lock.shards.iter().rev() {
529 unsafe {
530 let dest: *mut _ = shard.write_guard.get();
531 let guard = (*dest).take();
532 drop(guard);
533 }
534 }
535 }
536}
537
538impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540 f.debug_struct("ShardedLockWriteGuard")
541 .field("lock", &self.lock)
542 .finish()
543 }
544}
545
546impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 (**self).fmt(f)
549 }
550}
551
552impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
553 type Target = T;
554
555 fn deref(&self) -> &T {
556 unsafe { &*self.lock.value.get() }
557 }
558}
559
560impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
561 fn deref_mut(&mut self) -> &mut T {
562 unsafe { &mut *self.lock.value.get() }
563 }
564}
565
566/// Returns a `usize` that identifies the current thread.
567///
568/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
569/// usually tend to be consecutive numbers between 0 and the number of running threads.
570///
571/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
572/// tearing down.
573#[inline]
574fn current_index() -> Option<usize> {
575 REGISTRATION.try_with(|reg| reg.index).ok()
576}
577
578/// The global registry keeping track of registered threads and indices.
579struct ThreadIndices {
580 /// Mapping from `ThreadId` to thread index.
581 mapping: HashMap<ThreadId, usize>,
582
583 /// A list of free indices.
584 free_list: Vec<usize>,
585
586 /// The next index to allocate if the free list is empty.
587 next_index: usize,
588}
589
590fn thread_indices() -> &'static Mutex<ThreadIndices> {
591 static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
592 fn init() -> Mutex<ThreadIndices> {
593 Mutex::new(ThreadIndices {
594 mapping: HashMap::new(),
595 free_list: Vec::new(),
596 next_index: 0,
597 })
598 }
599 THREAD_INDICES.get_or_init(init)
600}
601
602/// A registration of a thread with an index.
603///
604/// When dropped, unregisters the thread and frees the reserved index.
605struct Registration {
606 index: usize,
607 thread_id: ThreadId,
608}
609
610impl Drop for Registration {
611 fn drop(&mut self) {
612 let mut indices = thread_indices().lock().unwrap();
613 indices.mapping.remove(&self.thread_id);
614 indices.free_list.push(self.index);
615 }
616}
617
618std::thread_local! {
619 static REGISTRATION: Registration = {
620 let thread_id = thread::current().id();
621 let mut indices = thread_indices().lock().unwrap();
622
623 let index = match indices.free_list.pop() {
624 Some(i) => i,
625 None => {
626 let i = indices.next_index;
627 indices.next_index += 1;
628 i
629 }
630 };
631 indices.mapping.insert(thread_id, index);
632
633 Registration {
634 index,
635 thread_id,
636 }
637 };
638}