crossbeam_utils/sync/
parker.rs

1use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
2use crate::primitive::sync::{Arc, Condvar, Mutex};
3use std::fmt;
4use std::marker::PhantomData;
5use std::time::{Duration, Instant};
6
7/// A thread parking primitive.
8///
9/// Conceptually, each `Parker` has an associated token which is initially not present:
10///
11/// * The [`park`] method blocks the current thread unless or until the token is available, at
12///   which point it automatically consumes the token.
13///
14/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
15///   a specified maximum time.
16///
17/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
18///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
19///   returning immediately.
20///
21/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
22/// [`park`] and [`unpark`].
23///
24/// # Examples
25///
26/// ```
27/// use std::thread;
28/// use std::time::Duration;
29/// use crossbeam_utils::sync::Parker;
30///
31/// let p = Parker::new();
32/// let u = p.unparker().clone();
33///
34/// // Make the token available.
35/// u.unpark();
36/// // Wakes up immediately and consumes the token.
37/// p.park();
38///
39/// thread::spawn(move || {
40///     thread::sleep(Duration::from_millis(500));
41///     u.unpark();
42/// });
43///
44/// // Wakes up when `u.unpark()` provides the token.
45/// p.park();
46/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
47/// ```
48///
49/// [`park`]: Parker::park
50/// [`park_timeout`]: Parker::park_timeout
51/// [`park_deadline`]: Parker::park_deadline
52/// [`unpark`]: Unparker::unpark
53pub struct Parker {
54    unparker: Unparker,
55    _marker: PhantomData<*const ()>,
56}
57
58unsafe impl Send for Parker {}
59
60impl Default for Parker {
61    fn default() -> Self {
62        Self {
63            unparker: Unparker {
64                inner: Arc::new(Inner {
65                    state: AtomicUsize::new(EMPTY),
66                    lock: Mutex::new(()),
67                    cvar: Condvar::new(),
68                }),
69            },
70            _marker: PhantomData,
71        }
72    }
73}
74
75impl Parker {
76    /// Creates a new `Parker`.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use crossbeam_utils::sync::Parker;
82    ///
83    /// let p = Parker::new();
84    /// ```
85    ///
86    pub fn new() -> Parker {
87        Self::default()
88    }
89
90    /// Blocks the current thread until the token is made available.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use crossbeam_utils::sync::Parker;
96    ///
97    /// let p = Parker::new();
98    /// let u = p.unparker().clone();
99    ///
100    /// // Make the token available.
101    /// u.unpark();
102    ///
103    /// // Wakes up immediately and consumes the token.
104    /// p.park();
105    /// ```
106    pub fn park(&self) {
107        self.unparker.inner.park(None);
108    }
109
110    /// Blocks the current thread until the token is made available, but only for a limited time.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use std::time::Duration;
116    /// use crossbeam_utils::sync::Parker;
117    ///
118    /// let p = Parker::new();
119    ///
120    /// // Waits for the token to become available, but will not wait longer than 500 ms.
121    /// p.park_timeout(Duration::from_millis(500));
122    /// ```
123    pub fn park_timeout(&self, timeout: Duration) {
124        match Instant::now().checked_add(timeout) {
125            Some(deadline) => self.park_deadline(deadline),
126            None => self.park(),
127        }
128    }
129
130    /// Blocks the current thread until the token is made available, or until a certain deadline.
131    ///
132    /// # Examples
133    ///
134    /// ```
135    /// use std::time::{Duration, Instant};
136    /// use crossbeam_utils::sync::Parker;
137    ///
138    /// let p = Parker::new();
139    /// let deadline = Instant::now() + Duration::from_millis(500);
140    ///
141    /// // Waits for the token to become available, but will not wait longer than 500 ms.
142    /// p.park_deadline(deadline);
143    /// ```
144    pub fn park_deadline(&self, deadline: Instant) {
145        self.unparker.inner.park(Some(deadline))
146    }
147
148    /// Returns a reference to an associated [`Unparker`].
149    ///
150    /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use crossbeam_utils::sync::Parker;
156    ///
157    /// let p = Parker::new();
158    /// let u = p.unparker().clone();
159    ///
160    /// // Make the token available.
161    /// u.unpark();
162    /// // Wakes up immediately and consumes the token.
163    /// p.park();
164    /// ```
165    ///
166    /// [`park`]: Parker::park
167    /// [`park_timeout`]: Parker::park_timeout
168    pub fn unparker(&self) -> &Unparker {
169        &self.unparker
170    }
171
172    /// Converts a `Parker` into a raw pointer.
173    ///
174    /// # Examples
175    ///
176    /// ```
177    /// use crossbeam_utils::sync::Parker;
178    ///
179    /// let p = Parker::new();
180    /// let raw = Parker::into_raw(p);
181    /// # let _ = unsafe { Parker::from_raw(raw) };
182    /// ```
183    pub fn into_raw(this: Parker) -> *const () {
184        Unparker::into_raw(this.unparker)
185    }
186
187    /// Converts a raw pointer into a `Parker`.
188    ///
189    /// # Safety
190    ///
191    /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
192    ///
193    /// # Examples
194    ///
195    /// ```
196    /// use crossbeam_utils::sync::Parker;
197    ///
198    /// let p = Parker::new();
199    /// let raw = Parker::into_raw(p);
200    /// let p = unsafe { Parker::from_raw(raw) };
201    /// ```
202    pub unsafe fn from_raw(ptr: *const ()) -> Parker {
203        Parker {
204            unparker: Unparker::from_raw(ptr),
205            _marker: PhantomData,
206        }
207    }
208}
209
210impl fmt::Debug for Parker {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        f.pad("Parker { .. }")
213    }
214}
215
216/// Unparks a thread parked by the associated [`Parker`].
217pub struct Unparker {
218    inner: Arc<Inner>,
219}
220
221unsafe impl Send for Unparker {}
222unsafe impl Sync for Unparker {}
223
224impl Unparker {
225    /// Atomically makes the token available if it is not already.
226    ///
227    /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
228    /// any.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use std::thread;
234    /// use std::time::Duration;
235    /// use crossbeam_utils::sync::Parker;
236    ///
237    /// let p = Parker::new();
238    /// let u = p.unparker().clone();
239    ///
240    /// thread::spawn(move || {
241    ///     thread::sleep(Duration::from_millis(500));
242    ///     u.unpark();
243    /// });
244    ///
245    /// // Wakes up when `u.unpark()` provides the token.
246    /// p.park();
247    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
248    /// ```
249    ///
250    /// [`park`]: Parker::park
251    /// [`park_timeout`]: Parker::park_timeout
252    pub fn unpark(&self) {
253        self.inner.unpark()
254    }
255
256    /// Converts an `Unparker` into a raw pointer.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use crossbeam_utils::sync::{Parker, Unparker};
262    ///
263    /// let p = Parker::new();
264    /// let u = p.unparker().clone();
265    /// let raw = Unparker::into_raw(u);
266    /// # let _ = unsafe { Unparker::from_raw(raw) };
267    /// ```
268    pub fn into_raw(this: Unparker) -> *const () {
269        Arc::into_raw(this.inner).cast::<()>()
270    }
271
272    /// Converts a raw pointer into an `Unparker`.
273    ///
274    /// # Safety
275    ///
276    /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// use crossbeam_utils::sync::{Parker, Unparker};
282    ///
283    /// let p = Parker::new();
284    /// let u = p.unparker().clone();
285    ///
286    /// let raw = Unparker::into_raw(u);
287    /// let u = unsafe { Unparker::from_raw(raw) };
288    /// ```
289    pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
290        Unparker {
291            inner: Arc::from_raw(ptr.cast::<Inner>()),
292        }
293    }
294}
295
296impl fmt::Debug for Unparker {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        f.pad("Unparker { .. }")
299    }
300}
301
302impl Clone for Unparker {
303    fn clone(&self) -> Unparker {
304        Unparker {
305            inner: self.inner.clone(),
306        }
307    }
308}
309
310const EMPTY: usize = 0;
311const PARKED: usize = 1;
312const NOTIFIED: usize = 2;
313
314struct Inner {
315    state: AtomicUsize,
316    lock: Mutex<()>,
317    cvar: Condvar,
318}
319
320impl Inner {
321    fn park(&self, deadline: Option<Instant>) {
322        // If we were previously notified then we consume this notification and return quickly.
323        if self
324            .state
325            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
326            .is_ok()
327        {
328            return;
329        }
330
331        // If the timeout is zero, then there is no need to actually block.
332        if let Some(deadline) = deadline {
333            if deadline <= Instant::now() {
334                return;
335            }
336        }
337
338        // Otherwise we need to coordinate going to sleep.
339        let mut m = self.lock.lock().unwrap();
340
341        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
342            Ok(_) => {}
343            // Consume this notification to avoid spurious wakeups in the next park.
344            Err(NOTIFIED) => {
345                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
346                // because `unpark` may have been called again since we read `NOTIFIED` in the
347                // `compare_exchange` above. We must perform an acquire operation that synchronizes
348                // with that `unpark` to observe any writes it made before the call to `unpark`. To
349                // do that we must read from the write it made to `state`.
350                let old = self.state.swap(EMPTY, SeqCst);
351                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
352                return;
353            }
354            Err(n) => panic!("inconsistent park_timeout state: {}", n),
355        }
356
357        loop {
358            // Block the current thread on the conditional variable.
359            m = match deadline {
360                None => self.cvar.wait(m).unwrap(),
361                Some(deadline) => {
362                    let now = Instant::now();
363                    if now < deadline {
364                        // We could check for a timeout here, in the return value of wait_timeout,
365                        // but in the case that a timeout and an unpark arrive simultaneously, we
366                        // prefer to report the former.
367                        self.cvar.wait_timeout(m, deadline - now).unwrap().0
368                    } else {
369                        // We've timed out; swap out the state back to empty on our way out
370                        match self.state.swap(EMPTY, SeqCst) {
371                            NOTIFIED | PARKED => return,
372                            n => panic!("inconsistent park_timeout state: {}", n),
373                        };
374                    }
375                }
376            };
377
378            if self
379                .state
380                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
381                .is_ok()
382            {
383                // got a notification
384                return;
385            }
386
387            // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
388            // in the branch above, when we discover the deadline is in the past
389        }
390    }
391
392    pub(crate) fn unpark(&self) {
393        // To ensure the unparked thread will observe any writes we made before this call, we must
394        // perform a release operation that `park` can synchronize with. To do that we must write
395        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
396        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
397        match self.state.swap(NOTIFIED, SeqCst) {
398            EMPTY => return,    // no one was waiting
399            NOTIFIED => return, // already unparked
400            PARKED => {}        // gotta go wake someone up
401            _ => panic!("inconsistent state in unpark"),
402        }
403
404        // There is a period between when the parked thread sets `state` to `PARKED` (or last
405        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
406        // If we were to notify during this period it would be ignored and then when the parked
407        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
408        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
409        //
410        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
411        // it doesn't get woken only to have to wait for us to release `lock`.
412        drop(self.lock.lock().unwrap());
413        self.cvar.notify_one();
414    }
415}