rayon_core/sleep/
counters.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3pub(super) struct AtomicCounters {
4    /// Packs together a number of counters. The counters are ordered as
5    /// follows, from least to most significant bits (here, we assuming
6    /// that [`THREADS_BITS`] is equal to 10):
7    ///
8    /// * Bits 0..10: Stores the number of **sleeping threads**
9    /// * Bits 10..20: Stores the number of **inactive threads**
10    /// * Bits 20..: Stores the **job event counter** (JEC)
11    ///
12    /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
13    /// that the total number of bits (and hence the number of bits used for the
14    /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
15    value: AtomicUsize,
16}
17
18#[derive(Copy, Clone)]
19pub(super) struct Counters {
20    word: usize,
21}
22
23/// A value read from the **Jobs Event Counter**.
24/// See the [`README.md`](README.md) for more
25/// coverage of how the jobs event counter works.
26#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
27pub(super) struct JobsEventCounter(usize);
28
29impl JobsEventCounter {
30    pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
31
32    #[inline]
33    pub(super) fn as_usize(self) -> usize {
34        self.0
35    }
36
37    /// The JEC "is sleepy" if the last thread to increment it was in the
38    /// process of becoming sleepy. This is indicated by its value being *even*.
39    /// When new jobs are posted, they check if the JEC is sleepy, and if so
40    /// they incremented it.
41    #[inline]
42    pub(super) fn is_sleepy(self) -> bool {
43        (self.as_usize() & 1) == 0
44    }
45
46    /// The JEC "is active" if the last thread to increment it was posting new
47    /// work. This is indicated by its value being *odd*. When threads get
48    /// sleepy, they will check if the JEC is active, and increment it.
49    #[inline]
50    pub(super) fn is_active(self) -> bool {
51        !self.is_sleepy()
52    }
53}
54
55/// Number of bits used for the thread counters.
56#[cfg(target_pointer_width = "64")]
57const THREADS_BITS: usize = 16;
58
59#[cfg(target_pointer_width = "32")]
60const THREADS_BITS: usize = 8;
61
62/// Bits to shift to select the sleeping threads
63/// (used with `select_bits`).
64#[allow(clippy::erasing_op)]
65const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
66
67/// Bits to shift to select the inactive threads
68/// (used with `select_bits`).
69#[allow(clippy::identity_op)]
70const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
71
72/// Bits to shift to select the JEC
73/// (use JOBS_BITS).
74const JEC_SHIFT: usize = 2 * THREADS_BITS;
75
76/// Max value for the thread counters.
77pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
78
79/// Constant that can be added to add one sleeping thread.
80const ONE_SLEEPING: usize = 1;
81
82/// Constant that can be added to add one inactive thread.
83/// An inactive thread is either idle, sleepy, or sleeping.
84const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
85
86/// Constant that can be added to add one to the JEC.
87const ONE_JEC: usize = 1 << JEC_SHIFT;
88
89impl AtomicCounters {
90    #[inline]
91    pub(super) fn new() -> AtomicCounters {
92        AtomicCounters {
93            value: AtomicUsize::new(0),
94        }
95    }
96
97    /// Load and return the current value of the various counters.
98    /// This value can then be given to other method which will
99    /// attempt to update the counters via compare-and-swap.
100    #[inline]
101    pub(super) fn load(&self, ordering: Ordering) -> Counters {
102        Counters::new(self.value.load(ordering))
103    }
104
105    #[inline]
106    fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
107        self.value
108            .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
109            .is_ok()
110    }
111
112    /// Adds an inactive thread. This cannot fail.
113    ///
114    /// This should be invoked when a thread enters its idle loop looking
115    /// for work. It is decremented when work is found. Note that it is
116    /// not decremented if the thread transitions from idle to sleepy or sleeping;
117    /// so the number of inactive threads is always greater-than-or-equal
118    /// to the number of sleeping threads.
119    #[inline]
120    pub(super) fn add_inactive_thread(&self) {
121        self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
122    }
123
124    /// Increments the jobs event counter if `increment_when`, when applied to
125    /// the current value, is true. Used to toggle the JEC from even (sleepy) to
126    /// odd (active) or vice versa. Returns the final value of the counters, for
127    /// which `increment_when` is guaranteed to return false.
128    pub(super) fn increment_jobs_event_counter_if(
129        &self,
130        increment_when: impl Fn(JobsEventCounter) -> bool,
131    ) -> Counters {
132        loop {
133            let old_value = self.load(Ordering::SeqCst);
134            if increment_when(old_value.jobs_counter()) {
135                let new_value = old_value.increment_jobs_counter();
136                if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
137                    return new_value;
138                }
139            } else {
140                return old_value;
141            }
142        }
143    }
144
145    /// Subtracts an inactive thread. This cannot fail. It is invoked
146    /// when a thread finds work and hence becomes active. It returns the
147    /// number of sleeping threads to wake up (if any).
148    ///
149    /// See `add_inactive_thread`.
150    #[inline]
151    pub(super) fn sub_inactive_thread(&self) -> usize {
152        let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
153        debug_assert!(
154            old_value.inactive_threads() > 0,
155            "sub_inactive_thread: old_value {:?} has no inactive threads",
156            old_value,
157        );
158        debug_assert!(
159            old_value.sleeping_threads() <= old_value.inactive_threads(),
160            "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
161            old_value,
162            old_value.sleeping_threads(),
163            old_value.inactive_threads(),
164        );
165
166        // Current heuristic: whenever an inactive thread goes away, if
167        // there are any sleeping threads, wake 'em up.
168        let sleeping_threads = old_value.sleeping_threads();
169        std::cmp::min(sleeping_threads, 2)
170    }
171
172    /// Subtracts a sleeping thread. This cannot fail, but it is only
173    /// safe to do if you you know the number of sleeping threads is
174    /// non-zero (i.e., because you have just awoken a sleeping
175    /// thread).
176    #[inline]
177    pub(super) fn sub_sleeping_thread(&self) {
178        let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
179        debug_assert!(
180            old_value.sleeping_threads() > 0,
181            "sub_sleeping_thread: old_value {:?} had no sleeping threads",
182            old_value,
183        );
184        debug_assert!(
185            old_value.sleeping_threads() <= old_value.inactive_threads(),
186            "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
187            old_value,
188            old_value.sleeping_threads(),
189            old_value.inactive_threads(),
190        );
191    }
192
193    #[inline]
194    pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
195        debug_assert!(
196            old_value.inactive_threads() > 0,
197            "try_add_sleeping_thread: old_value {:?} has no inactive threads",
198            old_value,
199        );
200        debug_assert!(
201            old_value.sleeping_threads() < THREADS_MAX,
202            "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
203            old_value,
204        );
205
206        let mut new_value = old_value;
207        new_value.word += ONE_SLEEPING;
208
209        self.try_exchange(old_value, new_value, Ordering::SeqCst)
210    }
211}
212
213#[inline]
214fn select_thread(word: usize, shift: usize) -> usize {
215    (word >> shift) & THREADS_MAX
216}
217
218#[inline]
219fn select_jec(word: usize) -> usize {
220    word >> JEC_SHIFT
221}
222
223impl Counters {
224    #[inline]
225    fn new(word: usize) -> Counters {
226        Counters { word }
227    }
228
229    #[inline]
230    fn increment_jobs_counter(self) -> Counters {
231        // We can freely add to JEC because it occupies the most significant bits.
232        // Thus it doesn't overflow into the other counters, just wraps itself.
233        Counters {
234            word: self.word.wrapping_add(ONE_JEC),
235        }
236    }
237
238    #[inline]
239    pub(super) fn jobs_counter(self) -> JobsEventCounter {
240        JobsEventCounter(select_jec(self.word))
241    }
242
243    /// The number of threads that are not actively
244    /// executing work. They may be idle, sleepy, or asleep.
245    #[inline]
246    pub(super) fn inactive_threads(self) -> usize {
247        select_thread(self.word, INACTIVE_SHIFT)
248    }
249
250    #[inline]
251    pub(super) fn awake_but_idle_threads(self) -> usize {
252        debug_assert!(
253            self.sleeping_threads() <= self.inactive_threads(),
254            "sleeping threads: {} > raw idle threads {}",
255            self.sleeping_threads(),
256            self.inactive_threads()
257        );
258        self.inactive_threads() - self.sleeping_threads()
259    }
260
261    #[inline]
262    pub(super) fn sleeping_threads(self) -> usize {
263        select_thread(self.word, SLEEPING_SHIFT)
264    }
265}
266
267impl std::fmt::Debug for Counters {
268    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        let word = format!("{:016x}", self.word);
270        fmt.debug_struct("Counters")
271            .field("word", &word)
272            .field("jobs", &self.jobs_counter().0)
273            .field("inactive", &self.inactive_threads())
274            .field("sleeping", &self.sleeping_threads())
275            .finish()
276    }
277}