rayon_core/
latch.rs

1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::usize;
6
7use crate::registry::{Registry, WorkerThread};
8use crate::sync::{Condvar, Mutex};
9
10/// We define various kinds of latches, which are all a primitive signaling
11/// mechanism. A latch starts as false. Eventually someone calls `set()` and
12/// it becomes true. You can test if it has been set by calling `probe()`.
13///
14/// Some kinds of latches, but not all, support a `wait()` operation
15/// that will wait until the latch is set, blocking efficiently. That
16/// is not part of the trait since it is not possibly to do with all
17/// latches.
18///
19/// The intention is that `set()` is called once, but `probe()` may be
20/// called any number of times. Once `probe()` returns true, the memory
21/// effects that occurred before `set()` become visible.
22///
23/// It'd probably be better to refactor the API into two paired types,
24/// but that's a bit of work, and this is not a public API.
25///
26/// ## Memory ordering
27///
28/// Latches need to guarantee two things:
29///
30/// - Once `probe()` returns true, all memory effects from the `set()`
31///   are visible (in other words, the set should synchronize-with
32///   the probe).
33/// - Once `set()` occurs, the next `probe()` *will* observe it.  This
34///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
35///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
36pub(super) trait Latch {
37    /// Set the latch, signalling others.
38    ///
39    /// # WARNING
40    ///
41    /// Setting a latch triggers other threads to wake up and (in some
42    /// cases) complete. This may, in turn, cause memory to be
43    /// deallocated and so forth. One must be very careful about this,
44    /// and it's typically better to read all the fields you will need
45    /// to access *before* a latch is set!
46    ///
47    /// This function operates on `*const Self` instead of `&self` to allow it
48    /// to become dangling during this call. The caller must ensure that the
49    /// pointer is valid upon entry, and not invalidated during the call by any
50    /// actions other than `set` itself.
51    unsafe fn set(this: *const Self);
52}
53
54pub(super) trait AsCoreLatch {
55    fn as_core_latch(&self) -> &CoreLatch;
56}
57
58/// Latch is not set, owning thread is awake
59const UNSET: usize = 0;
60
61/// Latch is not set, owning thread is going to sleep on this latch
62/// (but has not yet fallen asleep).
63const SLEEPY: usize = 1;
64
65/// Latch is not set, owning thread is asleep on this latch and
66/// must be awoken.
67const SLEEPING: usize = 2;
68
69/// Latch is set.
70const SET: usize = 3;
71
72/// Spin latches are the simplest, most efficient kind, but they do
73/// not support a `wait()` operation. They just have a boolean flag
74/// that becomes true when `set()` is called.
75#[derive(Debug)]
76pub(super) struct CoreLatch {
77    state: AtomicUsize,
78}
79
80impl CoreLatch {
81    #[inline]
82    fn new() -> Self {
83        Self {
84            state: AtomicUsize::new(0),
85        }
86    }
87
88    /// Invoked by owning thread as it prepares to sleep. Returns true
89    /// if the owning thread may proceed to fall asleep, false if the
90    /// latch was set in the meantime.
91    #[inline]
92    pub(super) fn get_sleepy(&self) -> bool {
93        self.state
94            .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
95            .is_ok()
96    }
97
98    /// Invoked by owning thread as it falls asleep sleep. Returns
99    /// true if the owning thread should block, or false if the latch
100    /// was set in the meantime.
101    #[inline]
102    pub(super) fn fall_asleep(&self) -> bool {
103        self.state
104            .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
105            .is_ok()
106    }
107
108    /// Invoked by owning thread as it falls asleep sleep. Returns
109    /// true if the owning thread should block, or false if the latch
110    /// was set in the meantime.
111    #[inline]
112    pub(super) fn wake_up(&self) {
113        if !self.probe() {
114            let _ =
115                self.state
116                    .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
117        }
118    }
119
120    /// Set the latch. If this returns true, the owning thread was sleeping
121    /// and must be awoken.
122    ///
123    /// This is private because, typically, setting a latch involves
124    /// doing some wakeups; those are encapsulated in the surrounding
125    /// latch code.
126    #[inline]
127    unsafe fn set(this: *const Self) -> bool {
128        let old_state = (*this).state.swap(SET, Ordering::AcqRel);
129        old_state == SLEEPING
130    }
131
132    /// Test if this latch has been set.
133    #[inline]
134    pub(super) fn probe(&self) -> bool {
135        self.state.load(Ordering::Acquire) == SET
136    }
137}
138
139impl AsCoreLatch for CoreLatch {
140    #[inline]
141    fn as_core_latch(&self) -> &CoreLatch {
142        self
143    }
144}
145
146/// Spin latches are the simplest, most efficient kind, but they do
147/// not support a `wait()` operation. They just have a boolean flag
148/// that becomes true when `set()` is called.
149pub(super) struct SpinLatch<'r> {
150    core_latch: CoreLatch,
151    registry: &'r Arc<Registry>,
152    target_worker_index: usize,
153    cross: bool,
154}
155
156impl<'r> SpinLatch<'r> {
157    /// Creates a new spin latch that is owned by `thread`. This means
158    /// that `thread` is the only thread that should be blocking on
159    /// this latch -- it also means that when the latch is set, we
160    /// will wake `thread` if it is sleeping.
161    #[inline]
162    pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
163        SpinLatch {
164            core_latch: CoreLatch::new(),
165            registry: thread.registry(),
166            target_worker_index: thread.index(),
167            cross: false,
168        }
169    }
170
171    /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
172    /// need to make sure the registry is kept alive after setting, so we can
173    /// safely call the notification.
174    #[inline]
175    pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
176        SpinLatch {
177            cross: true,
178            ..SpinLatch::new(thread)
179        }
180    }
181
182    #[inline]
183    pub(super) fn probe(&self) -> bool {
184        self.core_latch.probe()
185    }
186}
187
188impl<'r> AsCoreLatch for SpinLatch<'r> {
189    #[inline]
190    fn as_core_latch(&self) -> &CoreLatch {
191        &self.core_latch
192    }
193}
194
195impl<'r> Latch for SpinLatch<'r> {
196    #[inline]
197    unsafe fn set(this: *const Self) {
198        let cross_registry;
199
200        let registry: &Registry = if (*this).cross {
201            // Ensure the registry stays alive while we notify it.
202            // Otherwise, it would be possible that we set the spin
203            // latch and the other thread sees it and exits, causing
204            // the registry to be deallocated, all before we get a
205            // chance to invoke `registry.notify_worker_latch_is_set`.
206            cross_registry = Arc::clone((*this).registry);
207            &cross_registry
208        } else {
209            // If this is not a "cross-registry" spin-latch, then the
210            // thread which is performing `set` is itself ensuring
211            // that the registry stays alive. However, that doesn't
212            // include this *particular* `Arc` handle if the waiting
213            // thread then exits, so we must completely dereference it.
214            (*this).registry
215        };
216        let target_worker_index = (*this).target_worker_index;
217
218        // NOTE: Once we `set`, the target may proceed and invalidate `this`!
219        if CoreLatch::set(&(*this).core_latch) {
220            // Subtle: at this point, we can no longer read from
221            // `self`, because the thread owning this spin latch may
222            // have awoken and deallocated the latch. Therefore, we
223            // only use fields whose values we already read.
224            registry.notify_worker_latch_is_set(target_worker_index);
225        }
226    }
227}
228
229/// A Latch starts as false and eventually becomes true. You can block
230/// until it becomes true.
231#[derive(Debug)]
232pub(super) struct LockLatch {
233    m: Mutex<bool>,
234    v: Condvar,
235}
236
237impl LockLatch {
238    #[inline]
239    pub(super) fn new() -> LockLatch {
240        LockLatch {
241            m: Mutex::new(false),
242            v: Condvar::new(),
243        }
244    }
245
246    /// Block until latch is set, then resets this lock latch so it can be reused again.
247    pub(super) fn wait_and_reset(&self) {
248        let mut guard = self.m.lock().unwrap();
249        while !*guard {
250            guard = self.v.wait(guard).unwrap();
251        }
252        *guard = false;
253    }
254
255    /// Block until latch is set.
256    pub(super) fn wait(&self) {
257        let mut guard = self.m.lock().unwrap();
258        while !*guard {
259            guard = self.v.wait(guard).unwrap();
260        }
261    }
262}
263
264impl Latch for LockLatch {
265    #[inline]
266    unsafe fn set(this: *const Self) {
267        let mut guard = (*this).m.lock().unwrap();
268        *guard = true;
269        (*this).v.notify_all();
270    }
271}
272
273/// Once latches are used to implement one-time blocking, primarily
274/// for the termination flag of the threads in the pool.
275///
276/// Note: like a `SpinLatch`, once-latches are always associated with
277/// some registry that is probing them, which must be tickled when
278/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
279/// reference to that registry. This is because in some cases the
280/// registry owns the once-latch, and that would create a cycle. So a
281/// `OnceLatch` must be given a reference to its owning registry when
282/// it is set. For this reason, it does not implement the `Latch`
283/// trait (but it doesn't have to, as it is not used in those generic
284/// contexts).
285#[derive(Debug)]
286pub(super) struct OnceLatch {
287    core_latch: CoreLatch,
288}
289
290impl OnceLatch {
291    #[inline]
292    pub(super) fn new() -> OnceLatch {
293        Self {
294            core_latch: CoreLatch::new(),
295        }
296    }
297
298    /// Set the latch, then tickle the specific worker thread,
299    /// which should be the one that owns this latch.
300    #[inline]
301    pub(super) unsafe fn set_and_tickle_one(
302        this: *const Self,
303        registry: &Registry,
304        target_worker_index: usize,
305    ) {
306        if CoreLatch::set(&(*this).core_latch) {
307            registry.notify_worker_latch_is_set(target_worker_index);
308        }
309    }
310}
311
312impl AsCoreLatch for OnceLatch {
313    #[inline]
314    fn as_core_latch(&self) -> &CoreLatch {
315        &self.core_latch
316    }
317}
318
319/// Counting latches are used to implement scopes. They track a
320/// counter. Unlike other latches, calling `set()` does not
321/// necessarily make the latch be considered `set()`; instead, it just
322/// decrements the counter. The latch is only "set" (in the sense that
323/// `probe()` returns true) once the counter reaches zero.
324#[derive(Debug)]
325pub(super) struct CountLatch {
326    counter: AtomicUsize,
327    kind: CountLatchKind,
328}
329
330enum CountLatchKind {
331    /// A latch for scopes created on a rayon thread which will participate in work-
332    /// stealing while it waits for completion. This thread is not necessarily part
333    /// of the same registry as the scope itself!
334    Stealing {
335        latch: CoreLatch,
336        /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
337        /// with registry B, when a job completes in a thread of registry B, we may
338        /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
339        /// That means we need a reference to registry A (since at that point we will
340        /// only have a reference to registry B), so we stash it here.
341        registry: Arc<Registry>,
342        /// The index of the worker to wake in `registry`
343        worker_index: usize,
344    },
345
346    /// A latch for scopes created on a non-rayon thread which will block to wait.
347    Blocking { latch: LockLatch },
348}
349
350impl std::fmt::Debug for CountLatchKind {
351    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352        match self {
353            CountLatchKind::Stealing { latch, .. } => {
354                f.debug_tuple("Stealing").field(latch).finish()
355            }
356            CountLatchKind::Blocking { latch, .. } => {
357                f.debug_tuple("Blocking").field(latch).finish()
358            }
359        }
360    }
361}
362
363impl CountLatch {
364    pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
365        Self::with_count(1, owner)
366    }
367
368    pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
369        Self {
370            counter: AtomicUsize::new(count),
371            kind: match owner {
372                Some(owner) => CountLatchKind::Stealing {
373                    latch: CoreLatch::new(),
374                    registry: Arc::clone(owner.registry()),
375                    worker_index: owner.index(),
376                },
377                None => CountLatchKind::Blocking {
378                    latch: LockLatch::new(),
379                },
380            },
381        }
382    }
383
384    #[inline]
385    pub(super) fn increment(&self) {
386        let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
387        debug_assert!(old_counter != 0);
388    }
389
390    pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
391        match &self.kind {
392            CountLatchKind::Stealing {
393                latch,
394                registry,
395                worker_index,
396            } => unsafe {
397                let owner = owner.expect("owner thread");
398                debug_assert_eq!(registry.id(), owner.registry().id());
399                debug_assert_eq!(*worker_index, owner.index());
400                owner.wait_until(latch);
401            },
402            CountLatchKind::Blocking { latch } => latch.wait(),
403        }
404    }
405}
406
407impl Latch for CountLatch {
408    #[inline]
409    unsafe fn set(this: *const Self) {
410        if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
411            // NOTE: Once we call `set` on the internal `latch`,
412            // the target may proceed and invalidate `this`!
413            match (*this).kind {
414                CountLatchKind::Stealing {
415                    ref latch,
416                    ref registry,
417                    worker_index,
418                } => {
419                    let registry = Arc::clone(registry);
420                    if CoreLatch::set(latch) {
421                        registry.notify_worker_latch_is_set(worker_index);
422                    }
423                }
424                CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
425            }
426        }
427    }
428}
429
430/// `&L` without any implication of `dereferenceable` for `Latch::set`
431pub(super) struct LatchRef<'a, L> {
432    inner: *const L,
433    marker: PhantomData<&'a L>,
434}
435
436impl<L> LatchRef<'_, L> {
437    pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
438        LatchRef {
439            inner,
440            marker: PhantomData,
441        }
442    }
443}
444
445unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
446
447impl<L> Deref for LatchRef<'_, L> {
448    type Target = L;
449
450    fn deref(&self) -> &L {
451        // SAFETY: if we have &self, the inner latch is still alive
452        unsafe { &*self.inner }
453    }
454}
455
456impl<L: Latch> Latch for LatchRef<'_, L> {
457    #[inline]
458    unsafe fn set(this: *const Self) {
459        L::set((*this).inner);
460    }
461}