rayon/iter/plumbing/mod.rs
1//! Traits and functions used to implement parallel iteration. These are
2//! low-level details -- users of parallel iterators should not need to
3//! interact with them directly. See [the `plumbing` README][r] for a general overview.
4//!
5//! [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
6
7use crate::join_context;
8
9use super::IndexedParallelIterator;
10
11use std::usize;
12
13/// The `ProducerCallback` trait is a kind of generic closure,
14/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in
15/// the plumbing README][r] for more details.
16///
17/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md#producer-callback
18/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
19pub trait ProducerCallback<T> {
20 /// The type of value returned by this callback. Analogous to
21 /// [`Output` from the `FnOnce` trait][Output].
22 ///
23 /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output
24 type Output;
25
26 /// Invokes the callback with the given producer as argument. The
27 /// key point of this trait is that this method is generic over
28 /// `P`, and hence implementors must be defined for any producer.
29 fn callback<P>(self, producer: P) -> Self::Output
30 where
31 P: Producer<Item = T>;
32}
33
34/// A `Producer` is effectively a "splittable `IntoIterator`". That
35/// is, a producer is a value which can be converted into an iterator
36/// at any time: at that point, it simply produces items on demand,
37/// like any iterator. But what makes a `Producer` special is that,
38/// *before* we convert to an iterator, we can also **split** it at a
39/// particular point using the `split_at` method. This will yield up
40/// two producers, one producing the items before that point, and one
41/// producing the items after that point (these two producers can then
42/// independently be split further, or be converted into iterators).
43/// In Rayon, this splitting is used to divide between threads.
44/// See [the `plumbing` README][r] for further details.
45///
46/// Note that each producer will always produce a fixed number of
47/// items N. However, this number N is not queryable through the API;
48/// the consumer is expected to track it.
49///
50/// NB. You might expect `Producer` to extend the `IntoIterator`
51/// trait. However, [rust-lang/rust#20671][20671] prevents us from
52/// declaring the DoubleEndedIterator and ExactSizeIterator
53/// constraints on a required IntoIterator trait, so we inline
54/// IntoIterator here until that issue is fixed.
55///
56/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
57/// [20671]: https://github.com/rust-lang/rust/issues/20671
58pub trait Producer: Send + Sized {
59 /// The type of item that will be produced by this producer once
60 /// it is converted into an iterator.
61 type Item;
62
63 /// The type of iterator we will become.
64 type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator;
65
66 /// Convert `self` into an iterator; at this point, no more parallel splits
67 /// are possible.
68 fn into_iter(self) -> Self::IntoIter;
69
70 /// The minimum number of items that we will process
71 /// sequentially. Defaults to 1, which means that we will split
72 /// all the way down to a single item. This can be raised higher
73 /// using the [`with_min_len`] method, which will force us to
74 /// create sequential tasks at a larger granularity. Note that
75 /// Rayon automatically normally attempts to adjust the size of
76 /// parallel splits to reduce overhead, so this should not be
77 /// needed.
78 ///
79 /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len
80 fn min_len(&self) -> usize {
81 1
82 }
83
84 /// The maximum number of items that we will process
85 /// sequentially. Defaults to MAX, which means that we can choose
86 /// not to split at all. This can be lowered using the
87 /// [`with_max_len`] method, which will force us to create more
88 /// parallel tasks. Note that Rayon automatically normally
89 /// attempts to adjust the size of parallel splits to reduce
90 /// overhead, so this should not be needed.
91 ///
92 /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len
93 fn max_len(&self) -> usize {
94 usize::MAX
95 }
96
97 /// Split into two producers; one produces items `0..index`, the
98 /// other `index..N`. Index must be less than or equal to `N`.
99 fn split_at(self, index: usize) -> (Self, Self);
100
101 /// Iterate the producer, feeding each element to `folder`, and
102 /// stop when the folder is full (or all elements have been consumed).
103 ///
104 /// The provided implementation is sufficient for most iterables.
105 fn fold_with<F>(self, folder: F) -> F
106 where
107 F: Folder<Self::Item>,
108 {
109 folder.consume_iter(self.into_iter())
110 }
111}
112
113/// A consumer is effectively a [generalized "fold" operation][fold],
114/// and in fact each consumer will eventually be converted into a
115/// [`Folder`]. What makes a consumer special is that, like a
116/// [`Producer`], it can be **split** into multiple consumers using
117/// the `split_at` method. When a consumer is split, it produces two
118/// consumers, as well as a **reducer**. The two consumers can be fed
119/// items independently, and when they are done the reducer is used to
120/// combine their two results into one. See [the `plumbing`
121/// README][r] for further details.
122///
123/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
124/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
125/// [`Folder`]: trait.Folder.html
126/// [`Producer`]: trait.Producer.html
127pub trait Consumer<Item>: Send + Sized {
128 /// The type of folder that this consumer can be converted into.
129 type Folder: Folder<Item, Result = Self::Result>;
130
131 /// The type of reducer that is produced if this consumer is split.
132 type Reducer: Reducer<Self::Result>;
133
134 /// The type of result that this consumer will ultimately produce.
135 type Result: Send;
136
137 /// Divide the consumer into two consumers, one processing items
138 /// `0..index` and one processing items from `index..`. Also
139 /// produces a reducer that can be used to reduce the results at
140 /// the end.
141 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
142
143 /// Convert the consumer into a folder that can consume items
144 /// sequentially, eventually producing a final result.
145 fn into_folder(self) -> Self::Folder;
146
147 /// Hint whether this `Consumer` would like to stop processing
148 /// further items, e.g. if a search has been completed.
149 fn full(&self) -> bool;
150}
151
152/// The `Folder` trait encapsulates [the standard fold
153/// operation][fold]. It can be fed many items using the `consume`
154/// method. At the end, once all items have been consumed, it can then
155/// be converted (using `complete`) into a final value.
156///
157/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
158pub trait Folder<Item>: Sized {
159 /// The type of result that will ultimately be produced by the folder.
160 type Result;
161
162 /// Consume next item and return new sequential state.
163 fn consume(self, item: Item) -> Self;
164
165 /// Consume items from the iterator until full, and return new sequential state.
166 ///
167 /// This method is **optional**. The default simply iterates over
168 /// `iter`, invoking `consume` and checking after each iteration
169 /// whether `full` returns false.
170 ///
171 /// The main reason to override it is if you can provide a more
172 /// specialized, efficient implementation.
173 fn consume_iter<I>(mut self, iter: I) -> Self
174 where
175 I: IntoIterator<Item = Item>,
176 {
177 for item in iter {
178 self = self.consume(item);
179 if self.full() {
180 break;
181 }
182 }
183 self
184 }
185
186 /// Finish consuming items, produce final result.
187 fn complete(self) -> Self::Result;
188
189 /// Hint whether this `Folder` would like to stop processing
190 /// further items, e.g. if a search has been completed.
191 fn full(&self) -> bool;
192}
193
194/// The reducer is the final step of a `Consumer` -- after a consumer
195/// has been split into two parts, and each of those parts has been
196/// fully processed, we are left with two results. The reducer is then
197/// used to combine those two results into one. See [the `plumbing`
198/// README][r] for further details.
199///
200/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
201pub trait Reducer<Result> {
202 /// Reduce two final results into one; this is executed after a
203 /// split.
204 fn reduce(self, left: Result, right: Result) -> Result;
205}
206
207/// A stateless consumer can be freely copied. These consumers can be
208/// used like regular consumers, but they also support a
209/// `split_off_left` method that does not take an index to split, but
210/// simply splits at some arbitrary point (`for_each`, for example,
211/// produces an unindexed consumer).
212pub trait UnindexedConsumer<I>: Consumer<I> {
213 /// Splits off a "left" consumer and returns it. The `self`
214 /// consumer should then be used to consume the "right" portion of
215 /// the data. (The ordering matters for methods like find_first --
216 /// values produced by the returned value are given precedence
217 /// over values produced by `self`.) Once the left and right
218 /// halves have been fully consumed, you should reduce the results
219 /// with the result of `to_reducer`.
220 fn split_off_left(&self) -> Self;
221
222 /// Creates a reducer that can be used to combine the results from
223 /// a split consumer.
224 fn to_reducer(&self) -> Self::Reducer;
225}
226
227/// A variant on `Producer` which does not know its exact length or
228/// cannot represent it in a `usize`. These producers act like
229/// ordinary producers except that they cannot be told to split at a
230/// particular point. Instead, you just ask them to split 'somewhere'.
231///
232/// (In principle, `Producer` could extend this trait; however, it
233/// does not because to do so would require producers to carry their
234/// own length with them.)
235pub trait UnindexedProducer: Send + Sized {
236 /// The type of item returned by this producer.
237 type Item;
238
239 /// Split midway into a new producer if possible, otherwise return `None`.
240 fn split(self) -> (Self, Option<Self>);
241
242 /// Iterate the producer, feeding each element to `folder`, and
243 /// stop when the folder is full (or all elements have been consumed).
244 fn fold_with<F>(self, folder: F) -> F
245 where
246 F: Folder<Self::Item>;
247}
248
249/// A splitter controls the policy for splitting into smaller work items.
250///
251/// Thief-splitting is an adaptive policy that starts by splitting into
252/// enough jobs for every worker thread, and then resets itself whenever a
253/// job is actually stolen into a different thread.
254#[derive(Clone, Copy)]
255struct Splitter {
256 /// The `splits` tell us approximately how many remaining times we'd
257 /// like to split this job. We always just divide it by two though, so
258 /// the effective number of pieces will be `next_power_of_two()`.
259 splits: usize,
260}
261
262impl Splitter {
263 #[inline]
264 fn new() -> Splitter {
265 Splitter {
266 splits: crate::current_num_threads(),
267 }
268 }
269
270 #[inline]
271 fn try_split(&mut self, stolen: bool) -> bool {
272 let Splitter { splits } = *self;
273
274 if stolen {
275 // This job was stolen! Reset the number of desired splits to the
276 // thread count, if that's more than we had remaining anyway.
277 self.splits = Ord::max(crate::current_num_threads(), self.splits / 2);
278 true
279 } else if splits > 0 {
280 // We have splits remaining, make it so.
281 self.splits /= 2;
282 true
283 } else {
284 // Not stolen, and no more splits -- we're done!
285 false
286 }
287 }
288}
289
290/// The length splitter is built on thief-splitting, but additionally takes
291/// into account the remaining length of the iterator.
292#[derive(Clone, Copy)]
293struct LengthSplitter {
294 inner: Splitter,
295
296 /// The smallest we're willing to divide into. Usually this is just 1,
297 /// but you can choose a larger working size with `with_min_len()`.
298 min: usize,
299}
300
301impl LengthSplitter {
302 /// Creates a new splitter based on lengths.
303 ///
304 /// The `min` is a hard lower bound. We'll never split below that, but
305 /// of course an iterator might start out smaller already.
306 ///
307 /// The `max` is an upper bound on the working size, used to determine
308 /// the minimum number of times we need to split to get under that limit.
309 /// The adaptive algorithm may very well split even further, but never
310 /// smaller than the `min`.
311 #[inline]
312 fn new(min: usize, max: usize, len: usize) -> LengthSplitter {
313 let mut splitter = LengthSplitter {
314 inner: Splitter::new(),
315 min: Ord::max(min, 1),
316 };
317
318 // Divide the given length by the max working length to get the minimum
319 // number of splits we need to get under that max. This rounds down,
320 // but the splitter actually gives `next_power_of_two()` pieces anyway.
321 // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces.
322 let min_splits = len / Ord::max(max, 1);
323
324 // Only update the value if it's not splitting enough already.
325 if min_splits > splitter.inner.splits {
326 splitter.inner.splits = min_splits;
327 }
328
329 splitter
330 }
331
332 #[inline]
333 fn try_split(&mut self, len: usize, stolen: bool) -> bool {
334 // If splitting wouldn't make us too small, try the inner splitter.
335 len / 2 >= self.min && self.inner.try_split(stolen)
336 }
337}
338
339/// This helper function is used to "connect" a parallel iterator to a
340/// consumer. It will convert the `par_iter` into a producer P and
341/// then pull items from P and feed them to `consumer`, splitting and
342/// creating parallel threads as needed.
343///
344/// This is useful when you are implementing your own parallel
345/// iterators: it is often used as the definition of the
346/// [`drive_unindexed`] or [`drive`] methods.
347///
348/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
349/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
350pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result
351where
352 I: IndexedParallelIterator,
353 C: Consumer<I::Item>,
354{
355 let len = par_iter.len();
356 return par_iter.with_producer(Callback { len, consumer });
357
358 struct Callback<C> {
359 len: usize,
360 consumer: C,
361 }
362
363 impl<C, I> ProducerCallback<I> for Callback<C>
364 where
365 C: Consumer<I>,
366 {
367 type Output = C::Result;
368 fn callback<P>(self, producer: P) -> C::Result
369 where
370 P: Producer<Item = I>,
371 {
372 bridge_producer_consumer(self.len, producer, self.consumer)
373 }
374 }
375}
376
377/// This helper function is used to "connect" a producer and a
378/// consumer. You may prefer to call [`bridge`], which wraps this
379/// function. This function will draw items from `producer` and feed
380/// them to `consumer`, splitting and creating parallel tasks when
381/// needed.
382///
383/// This is useful when you are implementing your own parallel
384/// iterators: it is often used as the definition of the
385/// [`drive_unindexed`] or [`drive`] methods.
386///
387/// [`bridge`]: fn.bridge.html
388/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
389/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
390pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result
391where
392 P: Producer,
393 C: Consumer<P::Item>,
394{
395 let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len);
396 return helper(len, false, splitter, producer, consumer);
397
398 fn helper<P, C>(
399 len: usize,
400 migrated: bool,
401 mut splitter: LengthSplitter,
402 producer: P,
403 consumer: C,
404 ) -> C::Result
405 where
406 P: Producer,
407 C: Consumer<P::Item>,
408 {
409 if consumer.full() {
410 consumer.into_folder().complete()
411 } else if splitter.try_split(len, migrated) {
412 let mid = len / 2;
413 let (left_producer, right_producer) = producer.split_at(mid);
414 let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
415 let (left_result, right_result) = join_context(
416 |context| {
417 helper(
418 mid,
419 context.migrated(),
420 splitter,
421 left_producer,
422 left_consumer,
423 )
424 },
425 |context| {
426 helper(
427 len - mid,
428 context.migrated(),
429 splitter,
430 right_producer,
431 right_consumer,
432 )
433 },
434 );
435 reducer.reduce(left_result, right_result)
436 } else {
437 producer.fold_with(consumer.into_folder()).complete()
438 }
439 }
440}
441
442/// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer.
443///
444/// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html
445pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result
446where
447 P: UnindexedProducer,
448 C: UnindexedConsumer<P::Item>,
449{
450 let splitter = Splitter::new();
451 bridge_unindexed_producer_consumer(false, splitter, producer, consumer)
452}
453
454fn bridge_unindexed_producer_consumer<P, C>(
455 migrated: bool,
456 mut splitter: Splitter,
457 producer: P,
458 consumer: C,
459) -> C::Result
460where
461 P: UnindexedProducer,
462 C: UnindexedConsumer<P::Item>,
463{
464 if consumer.full() {
465 consumer.into_folder().complete()
466 } else if splitter.try_split(migrated) {
467 match producer.split() {
468 (left_producer, Some(right_producer)) => {
469 let (reducer, left_consumer, right_consumer) =
470 (consumer.to_reducer(), consumer.split_off_left(), consumer);
471 let bridge = bridge_unindexed_producer_consumer;
472 let (left_result, right_result) = join_context(
473 |context| bridge(context.migrated(), splitter, left_producer, left_consumer),
474 |context| bridge(context.migrated(), splitter, right_producer, right_consumer),
475 );
476 reducer.reduce(left_result, right_result)
477 }
478 (producer, None) => producer.fold_with(consumer.into_folder()).complete(),
479 }
480 } else {
481 producer.fold_with(consumer.into_folder()).complete()
482 }
483}