rayon/iter/
panic_fuse.rs

1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6/// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case
7/// of panics, to halt all threads as soon as possible.
8///
9/// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`]
10///
11/// [`panic_fuse()`]: trait.ParallelIterator.html#method.panic_fuse
12/// [`ParallelIterator`]: trait.ParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Debug, Clone)]
15pub struct PanicFuse<I: ParallelIterator> {
16    base: I,
17}
18
19/// Helper that sets a bool to `true` if dropped while unwinding.
20#[derive(Clone)]
21struct Fuse<'a>(&'a AtomicBool);
22
23impl<'a> Drop for Fuse<'a> {
24    #[inline]
25    fn drop(&mut self) {
26        if thread::panicking() {
27            self.0.store(true, Ordering::Relaxed);
28        }
29    }
30}
31
32impl<'a> Fuse<'a> {
33    #[inline]
34    fn panicked(&self) -> bool {
35        self.0.load(Ordering::Relaxed)
36    }
37}
38
39impl<I> PanicFuse<I>
40where
41    I: ParallelIterator,
42{
43    /// Creates a new `PanicFuse` iterator.
44    pub(super) fn new(base: I) -> PanicFuse<I> {
45        PanicFuse { base }
46    }
47}
48
49impl<I> ParallelIterator for PanicFuse<I>
50where
51    I: ParallelIterator,
52{
53    type Item = I::Item;
54
55    fn drive_unindexed<C>(self, consumer: C) -> C::Result
56    where
57        C: UnindexedConsumer<Self::Item>,
58    {
59        let panicked = AtomicBool::new(false);
60        let consumer1 = PanicFuseConsumer {
61            base: consumer,
62            fuse: Fuse(&panicked),
63        };
64        self.base.drive_unindexed(consumer1)
65    }
66
67    fn opt_len(&self) -> Option<usize> {
68        self.base.opt_len()
69    }
70}
71
72impl<I> IndexedParallelIterator for PanicFuse<I>
73where
74    I: IndexedParallelIterator,
75{
76    fn drive<C>(self, consumer: C) -> C::Result
77    where
78        C: Consumer<Self::Item>,
79    {
80        let panicked = AtomicBool::new(false);
81        let consumer1 = PanicFuseConsumer {
82            base: consumer,
83            fuse: Fuse(&panicked),
84        };
85        self.base.drive(consumer1)
86    }
87
88    fn len(&self) -> usize {
89        self.base.len()
90    }
91
92    fn with_producer<CB>(self, callback: CB) -> CB::Output
93    where
94        CB: ProducerCallback<Self::Item>,
95    {
96        return self.base.with_producer(Callback { callback });
97
98        struct Callback<CB> {
99            callback: CB,
100        }
101
102        impl<T, CB> ProducerCallback<T> for Callback<CB>
103        where
104            CB: ProducerCallback<T>,
105        {
106            type Output = CB::Output;
107
108            fn callback<P>(self, base: P) -> CB::Output
109            where
110                P: Producer<Item = T>,
111            {
112                let panicked = AtomicBool::new(false);
113                let producer = PanicFuseProducer {
114                    base,
115                    fuse: Fuse(&panicked),
116                };
117                self.callback.callback(producer)
118            }
119        }
120    }
121}
122
123/// ////////////////////////////////////////////////////////////////////////
124/// Producer implementation
125
126struct PanicFuseProducer<'a, P> {
127    base: P,
128    fuse: Fuse<'a>,
129}
130
131impl<'a, P> Producer for PanicFuseProducer<'a, P>
132where
133    P: Producer,
134{
135    type Item = P::Item;
136    type IntoIter = PanicFuseIter<'a, P::IntoIter>;
137
138    fn into_iter(self) -> Self::IntoIter {
139        PanicFuseIter {
140            base: self.base.into_iter(),
141            fuse: self.fuse,
142        }
143    }
144
145    fn min_len(&self) -> usize {
146        self.base.min_len()
147    }
148    fn max_len(&self) -> usize {
149        self.base.max_len()
150    }
151
152    fn split_at(self, index: usize) -> (Self, Self) {
153        let (left, right) = self.base.split_at(index);
154        (
155            PanicFuseProducer {
156                base: left,
157                fuse: self.fuse.clone(),
158            },
159            PanicFuseProducer {
160                base: right,
161                fuse: self.fuse,
162            },
163        )
164    }
165
166    fn fold_with<G>(self, folder: G) -> G
167    where
168        G: Folder<Self::Item>,
169    {
170        let folder1 = PanicFuseFolder {
171            base: folder,
172            fuse: self.fuse,
173        };
174        self.base.fold_with(folder1).base
175    }
176}
177
178struct PanicFuseIter<'a, I> {
179    base: I,
180    fuse: Fuse<'a>,
181}
182
183impl<'a, I> Iterator for PanicFuseIter<'a, I>
184where
185    I: Iterator,
186{
187    type Item = I::Item;
188
189    fn next(&mut self) -> Option<Self::Item> {
190        if self.fuse.panicked() {
191            None
192        } else {
193            self.base.next()
194        }
195    }
196
197    fn size_hint(&self) -> (usize, Option<usize>) {
198        self.base.size_hint()
199    }
200}
201
202impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I>
203where
204    I: DoubleEndedIterator,
205{
206    fn next_back(&mut self) -> Option<Self::Item> {
207        if self.fuse.panicked() {
208            None
209        } else {
210            self.base.next_back()
211        }
212    }
213}
214
215impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I>
216where
217    I: ExactSizeIterator,
218{
219    fn len(&self) -> usize {
220        self.base.len()
221    }
222}
223
224/// ////////////////////////////////////////////////////////////////////////
225/// Consumer implementation
226
227struct PanicFuseConsumer<'a, C> {
228    base: C,
229    fuse: Fuse<'a>,
230}
231
232impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C>
233where
234    C: Consumer<T>,
235{
236    type Folder = PanicFuseFolder<'a, C::Folder>;
237    type Reducer = PanicFuseReducer<'a, C::Reducer>;
238    type Result = C::Result;
239
240    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
241        let (left, right, reducer) = self.base.split_at(index);
242        (
243            PanicFuseConsumer {
244                base: left,
245                fuse: self.fuse.clone(),
246            },
247            PanicFuseConsumer {
248                base: right,
249                fuse: self.fuse.clone(),
250            },
251            PanicFuseReducer {
252                base: reducer,
253                _fuse: self.fuse,
254            },
255        )
256    }
257
258    fn into_folder(self) -> Self::Folder {
259        PanicFuseFolder {
260            base: self.base.into_folder(),
261            fuse: self.fuse,
262        }
263    }
264
265    fn full(&self) -> bool {
266        self.fuse.panicked() || self.base.full()
267    }
268}
269
270impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C>
271where
272    C: UnindexedConsumer<T>,
273{
274    fn split_off_left(&self) -> Self {
275        PanicFuseConsumer {
276            base: self.base.split_off_left(),
277            fuse: self.fuse.clone(),
278        }
279    }
280
281    fn to_reducer(&self) -> Self::Reducer {
282        PanicFuseReducer {
283            base: self.base.to_reducer(),
284            _fuse: self.fuse.clone(),
285        }
286    }
287}
288
289struct PanicFuseFolder<'a, C> {
290    base: C,
291    fuse: Fuse<'a>,
292}
293
294impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C>
295where
296    C: Folder<T>,
297{
298    type Result = C::Result;
299
300    fn consume(mut self, item: T) -> Self {
301        self.base = self.base.consume(item);
302        self
303    }
304
305    fn consume_iter<I>(mut self, iter: I) -> Self
306    where
307        I: IntoIterator<Item = T>,
308    {
309        fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a {
310            move |_| !fuse.panicked()
311        }
312
313        self.base = {
314            let fuse = &self.fuse;
315            let iter = iter.into_iter().take_while(cool(fuse));
316            self.base.consume_iter(iter)
317        };
318        self
319    }
320
321    fn complete(self) -> C::Result {
322        self.base.complete()
323    }
324
325    fn full(&self) -> bool {
326        self.fuse.panicked() || self.base.full()
327    }
328}
329
330struct PanicFuseReducer<'a, C> {
331    base: C,
332    _fuse: Fuse<'a>,
333}
334
335impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C>
336where
337    C: Reducer<T>,
338{
339    fn reduce(self, left: T, right: T) -> T {
340        self.base.reduce(left, right)
341    }
342}