rayon_core/scope/
mod.rs

1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use crate::broadcast::BroadcastContext;
9use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
10use crate::latch::{CountLatch, Latch};
11use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
12use crate::unwind;
13use std::any::Any;
14use std::fmt;
15use std::marker::PhantomData;
16use std::mem::ManuallyDrop;
17use std::ptr;
18use std::sync::atomic::{AtomicPtr, Ordering};
19use std::sync::Arc;
20
21#[cfg(test)]
22mod test;
23
24/// Represents a fork-join scope which can be used to spawn any number of tasks.
25/// See [`scope()`] for more information.
26///
27///[`scope()`]: fn.scope.html
28pub struct Scope<'scope> {
29    base: ScopeBase<'scope>,
30}
31
32/// Represents a fork-join scope which can be used to spawn any number of tasks.
33/// Those spawned from the same thread are prioritized in relative FIFO order.
34/// See [`scope_fifo()`] for more information.
35///
36///[`scope_fifo()`]: fn.scope_fifo.html
37pub struct ScopeFifo<'scope> {
38    base: ScopeBase<'scope>,
39    fifos: Vec<JobFifo>,
40}
41
42struct ScopeBase<'scope> {
43    /// thread registry where `scope()` was executed or where `in_place_scope()`
44    /// should spawn jobs.
45    registry: Arc<Registry>,
46
47    /// if some job panicked, the error is stored here; it will be
48    /// propagated to the one who created the scope
49    panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
50
51    /// latch to track job counts
52    job_completed_latch: CountLatch,
53
54    /// You can think of a scope as containing a list of closures to execute,
55    /// all of which outlive `'scope`.  They're not actually required to be
56    /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
57    /// the closures are only *moved* across threads to be executed.
58    marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
59}
60
61/// Creates a "fork-join" scope `s` and invokes the closure with a
62/// reference to `s`. This closure can then spawn asynchronous tasks
63/// into `s`. Those tasks may run asynchronously with respect to the
64/// closure; they may themselves spawn additional tasks into `s`. When
65/// the closure returns, it will block until all tasks that have been
66/// spawned into `s` complete.
67///
68/// `scope()` is a more flexible building block compared to `join()`,
69/// since a loop can be used to spawn any number of tasks without
70/// recursing. However, that flexibility comes at a performance price:
71/// tasks spawned using `scope()` must be allocated onto the heap,
72/// whereas `join()` can make exclusive use of the stack. **Prefer
73/// `join()` (or, even better, parallel iterators) where possible.**
74///
75/// # Example
76///
77/// The Rayon `join()` function launches two closures and waits for them
78/// to stop. One could implement `join()` using a scope like so, although
79/// it would be less efficient than the real implementation:
80///
81/// ```rust
82/// # use rayon_core as rayon;
83/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
84///     where A: FnOnce() -> RA + Send,
85///           B: FnOnce() -> RB + Send,
86///           RA: Send,
87///           RB: Send,
88/// {
89///     let mut result_a: Option<RA> = None;
90///     let mut result_b: Option<RB> = None;
91///     rayon::scope(|s| {
92///         s.spawn(|_| result_a = Some(oper_a()));
93///         s.spawn(|_| result_b = Some(oper_b()));
94///     });
95///     (result_a.unwrap(), result_b.unwrap())
96/// }
97/// ```
98///
99/// # A note on threading
100///
101/// The closure given to `scope()` executes in the Rayon thread-pool,
102/// as do those given to `spawn()`. This means that you can't access
103/// thread-local variables (well, you can, but they may have
104/// unexpected values).
105///
106/// # Task execution
107///
108/// Task execution potentially starts as soon as `spawn()` is called.
109/// The task will end sometime before `scope()` returns. Note that the
110/// *closure* given to scope may return much earlier. In general
111/// the lifetime of a scope created like `scope(body)` goes something like this:
112///
113/// - Scope begins when `scope(body)` is called
114/// - Scope body `body()` is invoked
115///     - Scope tasks may be spawned
116/// - Scope body returns
117/// - Scope tasks execute, possibly spawning more tasks
118/// - Once all tasks are done, scope ends and `scope()` returns
119///
120/// To see how and when tasks are joined, consider this example:
121///
122/// ```rust
123/// # use rayon_core as rayon;
124/// // point start
125/// rayon::scope(|s| {
126///     s.spawn(|s| { // task s.1
127///         s.spawn(|s| { // task s.1.1
128///             rayon::scope(|t| {
129///                 t.spawn(|_| ()); // task t.1
130///                 t.spawn(|_| ()); // task t.2
131///             });
132///         });
133///     });
134///     s.spawn(|s| { // task s.2
135///     });
136///     // point mid
137/// });
138/// // point end
139/// ```
140///
141/// The various tasks that are run will execute roughly like so:
142///
143/// ```notrust
144/// | (start)
145/// |
146/// | (scope `s` created)
147/// +-----------------------------------------------+ (task s.2)
148/// +-------+ (task s.1)                            |
149/// |       |                                       |
150/// |       +---+ (task s.1.1)                      |
151/// |       |   |                                   |
152/// |       |   | (scope `t` created)               |
153/// |       |   +----------------+ (task t.2)       |
154/// |       |   +---+ (task t.1) |                  |
155/// | (mid) |   |   |            |                  |
156/// :       |   + <-+------------+ (scope `t` ends) |
157/// :       |   |                                   |
158/// |<------+---+-----------------------------------+ (scope `s` ends)
159/// |
160/// | (end)
161/// ```
162///
163/// The point here is that everything spawned into scope `s` will
164/// terminate (at latest) at the same point -- right before the
165/// original call to `rayon::scope` returns. This includes new
166/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
167/// scope is created (such as `t`), the things spawned into that scope
168/// will be joined before that scope returns, which in turn occurs
169/// before the creating task (task `s.1.1` in this case) finishes.
170///
171/// There is no guaranteed order of execution for spawns in a scope,
172/// given that other threads may steal tasks at any time. However, they
173/// are generally prioritized in a LIFO order on the thread from which
174/// they were spawned. So in this example, absent any stealing, we can
175/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
176/// threads always steal from the other end of the deque, like FIFO
177/// order.  The idea is that "recent" tasks are most likely to be fresh
178/// in the local CPU's cache, while other threads can steal older
179/// "stale" tasks.  For an alternate approach, consider
180/// [`scope_fifo()`] instead.
181///
182/// [`scope_fifo()`]: fn.scope_fifo.html
183///
184/// # Accessing stack data
185///
186/// In general, spawned tasks may access stack data in place that
187/// outlives the scope itself. Other data must be fully owned by the
188/// spawned task.
189///
190/// ```rust
191/// # use rayon_core as rayon;
192/// let ok: Vec<i32> = vec![1, 2, 3];
193/// rayon::scope(|s| {
194///     let bad: Vec<i32> = vec![4, 5, 6];
195///     s.spawn(|_| {
196///         // We can access `ok` because outlives the scope `s`.
197///         println!("ok: {:?}", ok);
198///
199///         // If we just try to use `bad` here, the closure will borrow `bad`
200///         // (because we are just printing it out, and that only requires a
201///         // borrow), which will result in a compilation error. Read on
202///         // for options.
203///         // println!("bad: {:?}", bad);
204///    });
205/// });
206/// ```
207///
208/// As the comments example above suggest, to reference `bad` we must
209/// take ownership of it. One way to do this is to detach the closure
210/// from the surrounding stack frame, using the `move` keyword. This
211/// will cause it to take ownership of *all* the variables it touches,
212/// in this case including both `ok` *and* `bad`:
213///
214/// ```rust
215/// # use rayon_core as rayon;
216/// let ok: Vec<i32> = vec![1, 2, 3];
217/// rayon::scope(|s| {
218///     let bad: Vec<i32> = vec![4, 5, 6];
219///     s.spawn(move |_| {
220///         println!("ok: {:?}", ok);
221///         println!("bad: {:?}", bad);
222///     });
223///
224///     // That closure is fine, but now we can't use `ok` anywhere else,
225///     // since it is owned by the previous task:
226///     // s.spawn(|_| println!("ok: {:?}", ok));
227/// });
228/// ```
229///
230/// While this works, it could be a problem if we want to use `ok` elsewhere.
231/// There are two choices. We can keep the closure as a `move` closure, but
232/// instead of referencing the variable `ok`, we create a shadowed variable that
233/// is a borrow of `ok` and capture *that*:
234///
235/// ```rust
236/// # use rayon_core as rayon;
237/// let ok: Vec<i32> = vec![1, 2, 3];
238/// rayon::scope(|s| {
239///     let bad: Vec<i32> = vec![4, 5, 6];
240///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
241///     s.spawn(move |_| {
242///         println!("ok: {:?}", ok); // captures the shadowed version
243///         println!("bad: {:?}", bad);
244///     });
245///
246///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
247///     // can be shared freely. Note that we need a `move` closure here though,
248///     // because otherwise we'd be trying to borrow the shadowed `ok`,
249///     // and that doesn't outlive `scope`.
250///     s.spawn(move |_| println!("ok: {:?}", ok));
251/// });
252/// ```
253///
254/// Another option is not to use the `move` keyword but instead to take ownership
255/// of individual variables:
256///
257/// ```rust
258/// # use rayon_core as rayon;
259/// let ok: Vec<i32> = vec![1, 2, 3];
260/// rayon::scope(|s| {
261///     let bad: Vec<i32> = vec![4, 5, 6];
262///     s.spawn(|_| {
263///         // Transfer ownership of `bad` into a local variable (also named `bad`).
264///         // This will force the closure to take ownership of `bad` from the environment.
265///         let bad = bad;
266///         println!("ok: {:?}", ok); // `ok` is only borrowed.
267///         println!("bad: {:?}", bad); // refers to our local variable, above.
268///     });
269///
270///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
271/// });
272/// ```
273///
274/// # Panics
275///
276/// If a panic occurs, either in the closure given to `scope()` or in
277/// any of the spawned jobs, that panic will be propagated and the
278/// call to `scope()` will panic. If multiple panics occurs, it is
279/// non-deterministic which of their panic values will propagate.
280/// Regardless, once a task is spawned using `scope.spawn()`, it will
281/// execute, even if the spawning task should later panic. `scope()`
282/// returns once all spawned jobs have completed, and any panics are
283/// propagated at that point.
284pub fn scope<'scope, OP, R>(op: OP) -> R
285where
286    OP: FnOnce(&Scope<'scope>) -> R + Send,
287    R: Send,
288{
289    in_worker(|owner_thread, _| {
290        let scope = Scope::<'scope>::new(Some(owner_thread), None);
291        scope.base.complete(Some(owner_thread), || op(&scope))
292    })
293}
294
295/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
296/// closure with a reference to `s`. This closure can then spawn
297/// asynchronous tasks into `s`. Those tasks may run asynchronously with
298/// respect to the closure; they may themselves spawn additional tasks
299/// into `s`. When the closure returns, it will block until all tasks
300/// that have been spawned into `s` complete.
301///
302/// # Task execution
303///
304/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
305/// difference in the order of execution. Consider a similar example:
306///
307/// [`scope()`]: fn.scope.html
308///
309/// ```rust
310/// # use rayon_core as rayon;
311/// // point start
312/// rayon::scope_fifo(|s| {
313///     s.spawn_fifo(|s| { // task s.1
314///         s.spawn_fifo(|s| { // task s.1.1
315///             rayon::scope_fifo(|t| {
316///                 t.spawn_fifo(|_| ()); // task t.1
317///                 t.spawn_fifo(|_| ()); // task t.2
318///             });
319///         });
320///     });
321///     s.spawn_fifo(|s| { // task s.2
322///     });
323///     // point mid
324/// });
325/// // point end
326/// ```
327///
328/// The various tasks that are run will execute roughly like so:
329///
330/// ```notrust
331/// | (start)
332/// |
333/// | (FIFO scope `s` created)
334/// +--------------------+ (task s.1)
335/// +-------+ (task s.2) |
336/// |       |            +---+ (task s.1.1)
337/// |       |            |   |
338/// |       |            |   | (FIFO scope `t` created)
339/// |       |            |   +----------------+ (task t.1)
340/// |       |            |   +---+ (task t.2) |
341/// | (mid) |            |   |   |            |
342/// :       |            |   + <-+------------+ (scope `t` ends)
343/// :       |            |   |
344/// |<------+------------+---+ (scope `s` ends)
345/// |
346/// | (end)
347/// ```
348///
349/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
350/// the thread from which they were spawned, as opposed to `scope()`'s
351/// LIFO.  So in this example, we can expect `s.1` to execute before
352/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
353/// FIFO order, as usual. Overall, this has roughly the same order as
354/// the now-deprecated [`breadth_first`] option, except the effect is
355/// isolated to a particular scope. If spawns are intermingled from any
356/// combination of `scope()` and `scope_fifo()`, or from different
357/// threads, their order is only specified with respect to spawns in the
358/// same scope and thread.
359///
360/// For more details on this design, see Rayon [RFC #1].
361///
362/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
363/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
364///
365/// # Panics
366///
367/// If a panic occurs, either in the closure given to `scope_fifo()` or
368/// in any of the spawned jobs, that panic will be propagated and the
369/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
370/// non-deterministic which of their panic values will propagate.
371/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
372/// will execute, even if the spawning task should later panic.
373/// `scope_fifo()` returns once all spawned jobs have completed, and any
374/// panics are propagated at that point.
375pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
376where
377    OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
378    R: Send,
379{
380    in_worker(|owner_thread, _| {
381        let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
382        scope.base.complete(Some(owner_thread), || op(&scope))
383    })
384}
385
386/// Creates a "fork-join" scope `s` and invokes the closure with a
387/// reference to `s`. This closure can then spawn asynchronous tasks
388/// into `s`. Those tasks may run asynchronously with respect to the
389/// closure; they may themselves spawn additional tasks into `s`. When
390/// the closure returns, it will block until all tasks that have been
391/// spawned into `s` complete.
392///
393/// This is just like `scope()` except the closure runs on the same thread
394/// that calls `in_place_scope()`. Only work that it spawns runs in the
395/// thread pool.
396///
397/// # Panics
398///
399/// If a panic occurs, either in the closure given to `in_place_scope()` or in
400/// any of the spawned jobs, that panic will be propagated and the
401/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
402/// non-deterministic which of their panic values will propagate.
403/// Regardless, once a task is spawned using `scope.spawn()`, it will
404/// execute, even if the spawning task should later panic. `in_place_scope()`
405/// returns once all spawned jobs have completed, and any panics are
406/// propagated at that point.
407pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
408where
409    OP: FnOnce(&Scope<'scope>) -> R,
410{
411    do_in_place_scope(None, op)
412}
413
414pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
415where
416    OP: FnOnce(&Scope<'scope>) -> R,
417{
418    let thread = unsafe { WorkerThread::current().as_ref() };
419    let scope = Scope::<'scope>::new(thread, registry);
420    scope.base.complete(thread, || op(&scope))
421}
422
423/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
424/// closure with a reference to `s`. This closure can then spawn
425/// asynchronous tasks into `s`. Those tasks may run asynchronously with
426/// respect to the closure; they may themselves spawn additional tasks
427/// into `s`. When the closure returns, it will block until all tasks
428/// that have been spawned into `s` complete.
429///
430/// This is just like `scope_fifo()` except the closure runs on the same thread
431/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
432/// thread pool.
433///
434/// # Panics
435///
436/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
437/// any of the spawned jobs, that panic will be propagated and the
438/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
439/// non-deterministic which of their panic values will propagate.
440/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
441/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
442/// returns once all spawned jobs have completed, and any panics are
443/// propagated at that point.
444pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
445where
446    OP: FnOnce(&ScopeFifo<'scope>) -> R,
447{
448    do_in_place_scope_fifo(None, op)
449}
450
451pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
452where
453    OP: FnOnce(&ScopeFifo<'scope>) -> R,
454{
455    let thread = unsafe { WorkerThread::current().as_ref() };
456    let scope = ScopeFifo::<'scope>::new(thread, registry);
457    scope.base.complete(thread, || op(&scope))
458}
459
460impl<'scope> Scope<'scope> {
461    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
462        let base = ScopeBase::new(owner, registry);
463        Scope { base }
464    }
465
466    /// Spawns a job into the fork-join scope `self`. This job will
467    /// execute sometime before the fork-join scope completes.  The
468    /// job is specified as a closure, and this closure receives its
469    /// own reference to the scope `self` as argument. This can be
470    /// used to inject new jobs into `self`.
471    ///
472    /// # Returns
473    ///
474    /// Nothing. The spawned closures cannot pass back values to the
475    /// caller directly, though they can write to local variables on
476    /// the stack (if those variables outlive the scope) or
477    /// communicate through shared channels.
478    ///
479    /// (The intention is to eventually integrate with Rust futures to
480    /// support spawns of functions that compute a value.)
481    ///
482    /// # Examples
483    ///
484    /// ```rust
485    /// # use rayon_core as rayon;
486    /// let mut value_a = None;
487    /// let mut value_b = None;
488    /// let mut value_c = None;
489    /// rayon::scope(|s| {
490    ///     s.spawn(|s1| {
491    ///           // ^ this is the same scope as `s`; this handle `s1`
492    ///           //   is intended for use by the spawned task,
493    ///           //   since scope handles cannot cross thread boundaries.
494    ///
495    ///         value_a = Some(22);
496    ///
497    ///         // the scope `s` will not end until all these tasks are done
498    ///         s1.spawn(|_| {
499    ///             value_b = Some(44);
500    ///         });
501    ///     });
502    ///
503    ///     s.spawn(|_| {
504    ///         value_c = Some(66);
505    ///     });
506    /// });
507    /// assert_eq!(value_a, Some(22));
508    /// assert_eq!(value_b, Some(44));
509    /// assert_eq!(value_c, Some(66));
510    /// ```
511    ///
512    /// # See also
513    ///
514    /// The [`scope` function] has more extensive documentation about
515    /// task spawning.
516    ///
517    /// [`scope` function]: fn.scope.html
518    pub fn spawn<BODY>(&self, body: BODY)
519    where
520        BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
521    {
522        let scope_ptr = ScopePtr(self);
523        let job = HeapJob::new(move || unsafe {
524            // SAFETY: this job will execute before the scope ends.
525            let scope = scope_ptr.as_ref();
526            ScopeBase::execute_job(&scope.base, move || body(scope))
527        });
528        let job_ref = self.base.heap_job_ref(job);
529
530        // Since `Scope` implements `Sync`, we can't be sure that we're still in a
531        // thread of this pool, so we can't just push to the local worker thread.
532        // Also, this might be an in-place scope.
533        self.base.registry.inject_or_push(job_ref);
534    }
535
536    /// Spawns a job into every thread of the fork-join scope `self`. This job will
537    /// execute on each thread sometime before the fork-join scope completes.  The
538    /// job is specified as a closure, and this closure receives its own reference
539    /// to the scope `self` as argument, as well as a `BroadcastContext`.
540    pub fn spawn_broadcast<BODY>(&self, body: BODY)
541    where
542        BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
543    {
544        let scope_ptr = ScopePtr(self);
545        let job = ArcJob::new(move || unsafe {
546            // SAFETY: this job will execute before the scope ends.
547            let scope = scope_ptr.as_ref();
548            let body = &body;
549            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
550            ScopeBase::execute_job(&scope.base, func)
551        });
552        self.base.inject_broadcast(job)
553    }
554}
555
556impl<'scope> ScopeFifo<'scope> {
557    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
558        let base = ScopeBase::new(owner, registry);
559        let num_threads = base.registry.num_threads();
560        let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
561        ScopeFifo { base, fifos }
562    }
563
564    /// Spawns a job into the fork-join scope `self`. This job will
565    /// execute sometime before the fork-join scope completes.  The
566    /// job is specified as a closure, and this closure receives its
567    /// own reference to the scope `self` as argument. This can be
568    /// used to inject new jobs into `self`.
569    ///
570    /// # See also
571    ///
572    /// This method is akin to [`Scope::spawn()`], but with a FIFO
573    /// priority.  The [`scope_fifo` function] has more details about
574    /// this distinction.
575    ///
576    /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
577    /// [`scope_fifo` function]: fn.scope_fifo.html
578    pub fn spawn_fifo<BODY>(&self, body: BODY)
579    where
580        BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
581    {
582        let scope_ptr = ScopePtr(self);
583        let job = HeapJob::new(move || unsafe {
584            // SAFETY: this job will execute before the scope ends.
585            let scope = scope_ptr.as_ref();
586            ScopeBase::execute_job(&scope.base, move || body(scope))
587        });
588        let job_ref = self.base.heap_job_ref(job);
589
590        // If we're in the pool, use our scope's private fifo for this thread to execute
591        // in a locally-FIFO order. Otherwise, just use the pool's global injector.
592        match self.base.registry.current_thread() {
593            Some(worker) => {
594                let fifo = &self.fifos[worker.index()];
595                // SAFETY: this job will execute before the scope ends.
596                unsafe { worker.push(fifo.push(job_ref)) };
597            }
598            None => self.base.registry.inject(job_ref),
599        }
600    }
601
602    /// Spawns a job into every thread of the fork-join scope `self`. This job will
603    /// execute on each thread sometime before the fork-join scope completes.  The
604    /// job is specified as a closure, and this closure receives its own reference
605    /// to the scope `self` as argument, as well as a `BroadcastContext`.
606    pub fn spawn_broadcast<BODY>(&self, body: BODY)
607    where
608        BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
609    {
610        let scope_ptr = ScopePtr(self);
611        let job = ArcJob::new(move || unsafe {
612            // SAFETY: this job will execute before the scope ends.
613            let scope = scope_ptr.as_ref();
614            let body = &body;
615            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
616            ScopeBase::execute_job(&scope.base, func)
617        });
618        self.base.inject_broadcast(job)
619    }
620}
621
622impl<'scope> ScopeBase<'scope> {
623    /// Creates the base of a new scope for the given registry
624    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
625        let registry = registry.unwrap_or_else(|| match owner {
626            Some(owner) => owner.registry(),
627            None => global_registry(),
628        });
629
630        ScopeBase {
631            registry: Arc::clone(registry),
632            panic: AtomicPtr::new(ptr::null_mut()),
633            job_completed_latch: CountLatch::new(owner),
634            marker: PhantomData,
635        }
636    }
637
638    fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
639    where
640        FUNC: FnOnce() + Send + 'scope,
641    {
642        unsafe {
643            self.job_completed_latch.increment();
644            job.into_job_ref()
645        }
646    }
647
648    fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
649    where
650        FUNC: Fn() + Send + Sync + 'scope,
651    {
652        let n_threads = self.registry.num_threads();
653        let job_refs = (0..n_threads).map(|_| unsafe {
654            self.job_completed_latch.increment();
655            ArcJob::as_job_ref(&job)
656        });
657
658        self.registry.inject_broadcast(job_refs);
659    }
660
661    /// Executes `func` as a job, either aborting or executing as
662    /// appropriate.
663    fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
664    where
665        FUNC: FnOnce() -> R,
666    {
667        let result = unsafe { Self::execute_job_closure(self, func) };
668        self.job_completed_latch.wait(owner);
669        self.maybe_propagate_panic();
670        result.unwrap() // only None if `op` panicked, and that would have been propagated
671    }
672
673    /// Executes `func` as a job, either aborting or executing as
674    /// appropriate.
675    unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
676    where
677        FUNC: FnOnce(),
678    {
679        let _: Option<()> = Self::execute_job_closure(this, func);
680    }
681
682    /// Executes `func` as a job in scope. Adjusts the "job completed"
683    /// counters and also catches any panic and stores it into
684    /// `scope`.
685    unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
686    where
687        FUNC: FnOnce() -> R,
688    {
689        let result = match unwind::halt_unwinding(func) {
690            Ok(r) => Some(r),
691            Err(err) => {
692                (*this).job_panicked(err);
693                None
694            }
695        };
696        Latch::set(&(*this).job_completed_latch);
697        result
698    }
699
700    fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
701        // capture the first error we see, free the rest
702        if self.panic.load(Ordering::Relaxed).is_null() {
703            let nil = ptr::null_mut();
704            let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
705            let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
706            if self
707                .panic
708                .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
709                .is_ok()
710            {
711                // ownership now transferred into self.panic
712            } else {
713                // another panic raced in ahead of us, so drop ours
714                let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
715            }
716        }
717    }
718
719    fn maybe_propagate_panic(&self) {
720        // propagate panic, if any occurred; at this point, all
721        // outstanding jobs have completed, so we can use a relaxed
722        // ordering:
723        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
724        if !panic.is_null() {
725            let value = unsafe { Box::from_raw(panic) };
726            unwind::resume_unwinding(*value);
727        }
728    }
729}
730
731impl<'scope> fmt::Debug for Scope<'scope> {
732    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
733        fmt.debug_struct("Scope")
734            .field("pool_id", &self.base.registry.id())
735            .field("panic", &self.base.panic)
736            .field("job_completed_latch", &self.base.job_completed_latch)
737            .finish()
738    }
739}
740
741impl<'scope> fmt::Debug for ScopeFifo<'scope> {
742    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
743        fmt.debug_struct("ScopeFifo")
744            .field("num_fifos", &self.fifos.len())
745            .field("pool_id", &self.base.registry.id())
746            .field("panic", &self.base.panic)
747            .field("job_completed_latch", &self.base.job_completed_latch)
748            .finish()
749    }
750}
751
752/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
753///
754/// Unsafe code is still required to dereference the pointer, but that's fine in
755/// scope jobs that are guaranteed to execute before the scope ends.
756struct ScopePtr<T>(*const T);
757
758// SAFETY: !Send for raw pointers is not for safety, just as a lint
759unsafe impl<T: Sync> Send for ScopePtr<T> {}
760
761// SAFETY: !Sync for raw pointers is not for safety, just as a lint
762unsafe impl<T: Sync> Sync for ScopePtr<T> {}
763
764impl<T> ScopePtr<T> {
765    // Helper to avoid disjoint captures of `scope_ptr.0`
766    unsafe fn as_ref(&self) -> &T {
767        &*self.0
768    }
769}