crossbeam_utils/
thread.rs

1//! Threads that can borrow variables from the stack.
2//!
3//! Create a scope when spawned threads need to access variables on the stack:
4//!
5//! ```
6//! use crossbeam_utils::thread;
7//!
8//! let people = vec![
9//!     "Alice".to_string(),
10//!     "Bob".to_string(),
11//!     "Carol".to_string(),
12//! ];
13//!
14//! thread::scope(|s| {
15//!     for person in &people {
16//!         s.spawn(move |_| {
17//!             println!("Hello, {}!", person);
18//!         });
19//!     }
20//! }).unwrap();
21//! ```
22//!
23//! # Why scoped threads?
24//!
25//! Suppose we wanted to re-write the previous example using plain threads:
26//!
27//! ```compile_fail,E0597
28//! use std::thread;
29//!
30//! let people = vec![
31//!     "Alice".to_string(),
32//!     "Bob".to_string(),
33//!     "Carol".to_string(),
34//! ];
35//!
36//! let mut threads = Vec::new();
37//!
38//! for person in &people {
39//!     threads.push(thread::spawn(move || {
40//!         println!("Hello, {}!", person);
41//!     }));
42//! }
43//!
44//! for thread in threads {
45//!     thread.join().unwrap();
46//! }
47//! ```
48//!
49//! This doesn't work because the borrow checker complains about `people` not living long enough:
50//!
51//! ```text
52//! error[E0597]: `people` does not live long enough
53//!   --> src/main.rs:12:20
54//!    |
55//! 12 |     for person in &people {
56//!    |                    ^^^^^^ borrowed value does not live long enough
57//! ...
58//! 21 | }
59//!    | - borrowed value only lives until here
60//!    |
61//!    = note: borrowed value must be valid for the static lifetime...
62//! ```
63//!
64//! The problem here is that spawned threads are not allowed to borrow variables on stack because
65//! the compiler cannot prove they will be joined before `people` is destroyed.
66//!
67//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68//! before the scope ends.
69//!
70//! # How scoped threads work
71//!
72//! If a variable is borrowed by a thread, the thread must complete before the variable is
73//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75//!
76//! A scope creates a clear boundary between variables outside the scope and threads inside the
77//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79//! can safely access variables outside it.
80//!
81//! # Nesting scoped threads
82//!
83//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85//! cannot be borrowed by scoped threads:
86//!
87//! ```compile_fail,E0521
88//! use crossbeam_utils::thread;
89//!
90//! thread::scope(|s| {
91//!     s.spawn(|_| {
92//!         // Not going to compile because we're trying to borrow `s`,
93//!         // which lives *inside* the scope! :(
94//!         s.spawn(|_| println!("nested thread"));
95//!     });
96//! });
97//! ```
98//!
99//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100//! argument, which can be used for spawning nested threads:
101//!
102//! ```
103//! use crossbeam_utils::thread;
104//!
105//! thread::scope(|s| {
106//!     // Note the `|s|` here.
107//!     s.spawn(|s| {
108//!         // Yay, this works because we're using a fresh argument `s`! :)
109//!         s.spawn(|_| println!("nested thread"));
110//!     });
111//! }).unwrap();
112//! ```
113
114use std::boxed::Box;
115use std::fmt;
116use std::io;
117use std::marker::PhantomData;
118use std::mem;
119use std::panic;
120use std::string::String;
121use std::sync::{Arc, Mutex};
122use std::thread;
123use std::vec::Vec;
124
125use crate::sync::WaitGroup;
126
127type SharedVec<T> = Arc<Mutex<Vec<T>>>;
128type SharedOption<T> = Arc<Mutex<Option<T>>>;
129
130/// Creates a new scope for spawning threads.
131///
132/// All child threads that haven't been manually joined will be automatically joined just before
133/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
134/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
135/// returned containing errors from panicked threads. Note that if panics are implemented by
136/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
137///
138/// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`].
139///
140/// # Examples
141///
142/// ```
143/// use crossbeam_utils::thread;
144///
145/// let var = vec![1, 2, 3];
146///
147/// thread::scope(|s| {
148///     s.spawn(|_| {
149///         println!("A child thread borrowing `var`: {:?}", var);
150///     });
151/// }).unwrap();
152/// ```
153pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
154where
155    F: FnOnce(&Scope<'env>) -> R,
156{
157    struct AbortOnPanic;
158    impl Drop for AbortOnPanic {
159        fn drop(&mut self) {
160            if thread::panicking() {
161                std::process::abort();
162            }
163        }
164    }
165
166    let wg = WaitGroup::new();
167    let scope = Scope::<'env> {
168        handles: SharedVec::default(),
169        wait_group: wg.clone(),
170        _marker: PhantomData,
171    };
172
173    // Execute the scoped function, but catch any panics.
174    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
175
176    // If an unwinding panic occurs before all threads are joined
177    // promote it to an aborting panic to prevent any threads from escaping the scope.
178    let guard = AbortOnPanic;
179
180    // Wait until all nested scopes are dropped.
181    drop(scope.wait_group);
182    wg.wait();
183
184    // Join all remaining spawned threads.
185    let panics: Vec<_> = scope
186        .handles
187        .lock()
188        .unwrap()
189        // Filter handles that haven't been joined, join them, and collect errors.
190        .drain(..)
191        .filter_map(|handle| handle.lock().unwrap().take())
192        .filter_map(|handle| handle.join().err())
193        .collect();
194
195    mem::forget(guard);
196
197    // If `f` has panicked, resume unwinding.
198    // If any of the child threads have panicked, return the panic errors.
199    // Otherwise, everything is OK and return the result of `f`.
200    match result {
201        Err(err) => panic::resume_unwind(err),
202        Ok(res) => {
203            if panics.is_empty() {
204                Ok(res)
205            } else {
206                Err(Box::new(panics))
207            }
208        }
209    }
210}
211
212/// A scope for spawning threads.
213pub struct Scope<'env> {
214    /// The list of the thread join handles.
215    handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
216
217    /// Used to wait until all subscopes all dropped.
218    wait_group: WaitGroup,
219
220    /// Borrows data with invariant lifetime `'env`.
221    _marker: PhantomData<&'env mut &'env ()>,
222}
223
224unsafe impl Sync for Scope<'_> {}
225
226impl<'env> Scope<'env> {
227    /// Spawns a scoped thread.
228    ///
229    /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
230    /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
231    /// allowing it to reference variables outside the scope.
232    ///
233    /// The scoped thread is passed a reference to this scope as an argument, which can be used for
234    /// spawning nested threads.
235    ///
236    /// The returned [handle](ScopedJoinHandle) can be used to manually
237    /// [join](ScopedJoinHandle::join) the thread before the scope exits.
238    ///
239    /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
240    /// stack size or the name of the thread, use this API instead.
241    ///
242    /// [`spawn`]: std::thread::spawn
243    ///
244    /// # Panics
245    ///
246    /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
247    /// to recover from such errors.
248    ///
249    /// # Examples
250    ///
251    /// ```
252    /// use crossbeam_utils::thread;
253    ///
254    /// thread::scope(|s| {
255    ///     let handle = s.spawn(|_| {
256    ///         println!("A child thread is running");
257    ///         42
258    ///     });
259    ///
260    ///     // Join the thread and retrieve its result.
261    ///     let res = handle.join().unwrap();
262    ///     assert_eq!(res, 42);
263    /// }).unwrap();
264    /// ```
265    pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
266    where
267        F: FnOnce(&Scope<'env>) -> T,
268        F: Send + 'env,
269        T: Send + 'env,
270    {
271        self.builder()
272            .spawn(f)
273            .expect("failed to spawn scoped thread")
274    }
275
276    /// Creates a builder that can configure a thread before spawning.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// use crossbeam_utils::thread;
282    ///
283    /// thread::scope(|s| {
284    ///     s.builder()
285    ///         .spawn(|_| println!("A child thread is running"))
286    ///         .unwrap();
287    /// }).unwrap();
288    /// ```
289    pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
290        ScopedThreadBuilder {
291            scope: self,
292            builder: thread::Builder::new(),
293        }
294    }
295}
296
297impl fmt::Debug for Scope<'_> {
298    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299        f.pad("Scope { .. }")
300    }
301}
302
303/// Configures the properties of a new thread.
304///
305/// The two configurable properties are:
306///
307/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
308/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
309///
310/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
311/// thread handle with the given configuration.
312///
313/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
314/// value. You may want to use this builder when you want to recover from a failure to launch a
315/// thread.
316///
317/// # Examples
318///
319/// ```
320/// use crossbeam_utils::thread;
321///
322/// thread::scope(|s| {
323///     s.builder()
324///         .spawn(|_| println!("Running a child thread"))
325///         .unwrap();
326/// }).unwrap();
327/// ```
328///
329/// [`name`]: ScopedThreadBuilder::name
330/// [`stack_size`]: ScopedThreadBuilder::stack_size
331/// [`spawn`]: ScopedThreadBuilder::spawn
332/// [`io::Result`]: std::io::Result
333/// [naming-threads]: std::thread#naming-threads
334/// [stack-size]: std::thread#stack-size
335#[derive(Debug)]
336pub struct ScopedThreadBuilder<'scope, 'env> {
337    scope: &'scope Scope<'env>,
338    builder: thread::Builder,
339}
340
341impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
342    /// Sets the name for the new thread.
343    ///
344    /// The name must not contain null bytes (`\0`).
345    ///
346    /// For more information about named threads, see [here][naming-threads].
347    ///
348    /// # Examples
349    ///
350    /// ```
351    /// use crossbeam_utils::thread;
352    /// use std::thread::current;
353    ///
354    /// thread::scope(|s| {
355    ///     s.builder()
356    ///         .name("my thread".to_string())
357    ///         .spawn(|_| assert_eq!(current().name(), Some("my thread")))
358    ///         .unwrap();
359    /// }).unwrap();
360    /// ```
361    ///
362    /// [naming-threads]: std::thread#naming-threads
363    pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
364        self.builder = self.builder.name(name);
365        self
366    }
367
368    /// Sets the size of the stack for the new thread.
369    ///
370    /// The stack size is measured in bytes.
371    ///
372    /// For more information about the stack size for threads, see [here][stack-size].
373    ///
374    /// # Examples
375    ///
376    /// ```
377    /// use crossbeam_utils::thread;
378    ///
379    /// thread::scope(|s| {
380    ///     s.builder()
381    ///         .stack_size(32 * 1024)
382    ///         .spawn(|_| println!("Running a child thread"))
383    ///         .unwrap();
384    /// }).unwrap();
385    /// ```
386    ///
387    /// [stack-size]: std::thread#stack-size
388    pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
389        self.builder = self.builder.stack_size(size);
390        self
391    }
392
393    /// Spawns a scoped thread with this configuration.
394    ///
395    /// The scoped thread is passed a reference to this scope as an argument, which can be used for
396    /// spawning nested threads.
397    ///
398    /// The returned handle can be used to manually join the thread before the scope exits.
399    ///
400    /// # Errors
401    ///
402    /// Unlike the [`Scope::spawn`] method, this method yields an
403    /// [`io::Result`] to capture any failure to create the thread at
404    /// the OS level.
405    ///
406    /// [`io::Result`]: std::io::Result
407    ///
408    /// # Panics
409    ///
410    /// Panics if a thread name was set and it contained null bytes.
411    ///
412    /// # Examples
413    ///
414    /// ```
415    /// use crossbeam_utils::thread;
416    ///
417    /// thread::scope(|s| {
418    ///     let handle = s.builder()
419    ///         .spawn(|_| {
420    ///             println!("A child thread is running");
421    ///             42
422    ///         })
423    ///         .unwrap();
424    ///
425    ///     // Join the thread and retrieve its result.
426    ///     let res = handle.join().unwrap();
427    ///     assert_eq!(res, 42);
428    /// }).unwrap();
429    /// ```
430    pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
431    where
432        F: FnOnce(&Scope<'env>) -> T,
433        F: Send + 'env,
434        T: Send + 'env,
435    {
436        // The result of `f` will be stored here.
437        let result = SharedOption::default();
438
439        // Spawn the thread and grab its join handle and thread handle.
440        let (handle, thread) = {
441            let result = Arc::clone(&result);
442
443            // A clone of the scope that will be moved into the new thread.
444            let scope = Scope::<'env> {
445                handles: Arc::clone(&self.scope.handles),
446                wait_group: self.scope.wait_group.clone(),
447                _marker: PhantomData,
448            };
449
450            // Spawn the thread.
451            let handle = {
452                let closure = move || {
453                    // Make sure the scope is inside the closure with the proper `'env` lifetime.
454                    let scope: Scope<'env> = scope;
455
456                    // Run the closure.
457                    let res = f(&scope);
458
459                    // Store the result if the closure didn't panic.
460                    *result.lock().unwrap() = Some(res);
461                };
462
463                // Allocate `closure` on the heap and erase the `'env` bound.
464                let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
465                let closure: Box<dyn FnOnce() + Send + 'static> =
466                    unsafe { mem::transmute(closure) };
467
468                // Finally, spawn the closure.
469                self.builder.spawn(closure)?
470            };
471
472            let thread = handle.thread().clone();
473            let handle = Arc::new(Mutex::new(Some(handle)));
474            (handle, thread)
475        };
476
477        // Add the handle to the shared list of join handles.
478        self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
479
480        Ok(ScopedJoinHandle {
481            handle,
482            result,
483            thread,
484            _marker: PhantomData,
485        })
486    }
487}
488
489unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
490unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
491
492/// A handle that can be used to join its scoped thread.
493///
494/// This struct is created by the [`Scope::spawn`] method and the
495/// [`ScopedThreadBuilder::spawn`] method.
496pub struct ScopedJoinHandle<'scope, T> {
497    /// A join handle to the spawned thread.
498    handle: SharedOption<thread::JoinHandle<()>>,
499
500    /// Holds the result of the inner closure.
501    result: SharedOption<T>,
502
503    /// A handle to the spawned thread.
504    thread: thread::Thread,
505
506    /// Borrows the parent scope with lifetime `'scope`.
507    _marker: PhantomData<&'scope ()>,
508}
509
510impl<T> ScopedJoinHandle<'_, T> {
511    /// Waits for the thread to finish and returns its result.
512    ///
513    /// If the child thread panics, an error is returned. Note that if panics are implemented by
514    /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
515    ///
516    /// # Panics
517    ///
518    /// This function may panic on some platforms if a thread attempts to join itself or otherwise
519    /// may create a deadlock with joining threads.
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// use crossbeam_utils::thread;
525    ///
526    /// thread::scope(|s| {
527    ///     let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
528    ///     let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
529    ///
530    ///     // Join the first thread and verify that it succeeded.
531    ///     let res = handle1.join();
532    ///     assert!(res.is_ok());
533    ///
534    ///     // Join the second thread and verify that it panicked.
535    ///     let res = handle2.join();
536    ///     assert!(res.is_err());
537    /// }).unwrap();
538    /// ```
539    pub fn join(self) -> thread::Result<T> {
540        // Take out the handle. The handle will surely be available because the root scope waits
541        // for nested scopes before joining remaining threads.
542        let handle = self.handle.lock().unwrap().take().unwrap();
543
544        // Join the thread and then take the result out of its inner closure.
545        handle
546            .join()
547            .map(|()| self.result.lock().unwrap().take().unwrap())
548    }
549
550    /// Returns a handle to the underlying thread.
551    ///
552    /// # Examples
553    ///
554    /// ```
555    /// use crossbeam_utils::thread;
556    ///
557    /// thread::scope(|s| {
558    ///     let handle = s.spawn(|_| println!("A child thread is running"));
559    ///     println!("The child thread ID: {:?}", handle.thread().id());
560    /// }).unwrap();
561    /// ```
562    pub fn thread(&self) -> &thread::Thread {
563        &self.thread
564    }
565}
566
567/// Unix-specific extensions.
568#[cfg(unix)]
569mod unix {
570    use super::ScopedJoinHandle;
571    use std::os::unix::thread::{JoinHandleExt, RawPthread};
572
573    impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
574        fn as_pthread_t(&self) -> RawPthread {
575            // Borrow the handle. The handle will surely be available because the root scope waits
576            // for nested scopes before joining remaining threads.
577            let handle = self.handle.lock().unwrap();
578            handle.as_ref().unwrap().as_pthread_t()
579        }
580        fn into_pthread_t(self) -> RawPthread {
581            self.as_pthread_t()
582        }
583    }
584}
585/// Windows-specific extensions.
586#[cfg(windows)]
587mod windows {
588    use super::ScopedJoinHandle;
589    use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
590
591    impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
592        fn as_raw_handle(&self) -> RawHandle {
593            // Borrow the handle. The handle will surely be available because the root scope waits
594            // for nested scopes before joining remaining threads.
595            let handle = self.handle.lock().unwrap();
596            handle.as_ref().unwrap().as_raw_handle()
597        }
598    }
599
600    impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
601        fn into_raw_handle(self) -> RawHandle {
602            self.as_raw_handle()
603        }
604    }
605}
606
607impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
608    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
609        f.pad("ScopedJoinHandle { .. }")
610    }
611}