crossbeam_deque/
deque.rs

1use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2use std::boxed::Box;
3use std::cell::{Cell, UnsafeCell};
4use std::cmp;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
10use std::sync::Arc;
11
12use crossbeam_epoch::{self as epoch, Atomic, Owned};
13use crossbeam_utils::{Backoff, CachePadded};
14
15// Minimum buffer capacity.
16const MIN_CAP: usize = 64;
17// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
18const MAX_BATCH: usize = 32;
19// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
20// deallocated as soon as possible.
21const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
22
23/// A buffer that holds tasks in a worker queue.
24///
25/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
26/// *not* deallocate the buffer.
27struct Buffer<T> {
28    /// Pointer to the allocated memory.
29    ptr: *mut T,
30
31    /// Capacity of the buffer. Always a power of two.
32    cap: usize,
33}
34
35unsafe impl<T> Send for Buffer<T> {}
36
37impl<T> Buffer<T> {
38    /// Allocates a new buffer with the specified capacity.
39    fn alloc(cap: usize) -> Buffer<T> {
40        debug_assert_eq!(cap, cap.next_power_of_two());
41
42        let ptr = Box::into_raw(
43            (0..cap)
44                .map(|_| MaybeUninit::<T>::uninit())
45                .collect::<Box<[_]>>(),
46        )
47        .cast::<T>();
48
49        Buffer { ptr, cap }
50    }
51
52    /// Deallocates the buffer.
53    unsafe fn dealloc(self) {
54        drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
55            self.ptr.cast::<MaybeUninit<T>>(),
56            self.cap,
57        )));
58    }
59
60    /// Returns a pointer to the task at the specified `index`.
61    unsafe fn at(&self, index: isize) -> *mut T {
62        // `self.cap` is always a power of two.
63        // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
64        // don't actually have the right to access this memory.
65        self.ptr.offset(index & (self.cap - 1) as isize)
66    }
67
68    /// Writes `task` into the specified `index`.
69    ///
70    /// This method might be concurrently called with another `read` at the same index, which is
71    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
72    /// that would be more expensive and difficult to implement generically for all types `T`.
73    /// Hence, as a hack, we use a volatile write instead.
74    unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
75        ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
76    }
77
78    /// Reads a task from the specified `index`.
79    ///
80    /// This method might be concurrently called with another `write` at the same index, which is
81    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
82    /// that would be more expensive and difficult to implement generically for all types `T`.
83    /// Hence, as a hack, we use a volatile load instead.
84    unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
85        ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
86    }
87}
88
89impl<T> Clone for Buffer<T> {
90    fn clone(&self) -> Buffer<T> {
91        *self
92    }
93}
94
95impl<T> Copy for Buffer<T> {}
96
97/// Internal queue data shared between the worker and stealers.
98///
99/// The implementation is based on the following work:
100///
101/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
102/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
103///    PPoPP 2013.][weak-mem]
104/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
105///    atomics. OOPSLA 2013.][checker]
106///
107/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
108/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
109/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
110struct Inner<T> {
111    /// The front index.
112    front: AtomicIsize,
113
114    /// The back index.
115    back: AtomicIsize,
116
117    /// The underlying buffer.
118    buffer: CachePadded<Atomic<Buffer<T>>>,
119}
120
121impl<T> Drop for Inner<T> {
122    fn drop(&mut self) {
123        // Load the back index, front index, and buffer.
124        let b = *self.back.get_mut();
125        let f = *self.front.get_mut();
126
127        unsafe {
128            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
129
130            // Go through the buffer from front to back and drop all tasks in the queue.
131            let mut i = f;
132            while i != b {
133                buffer.deref().at(i).drop_in_place();
134                i = i.wrapping_add(1);
135            }
136
137            // Free the memory allocated by the buffer.
138            buffer.into_owned().into_box().dealloc();
139        }
140    }
141}
142
143/// Worker queue flavor: FIFO or LIFO.
144#[derive(Clone, Copy, Debug, Eq, PartialEq)]
145enum Flavor {
146    /// The first-in first-out flavor.
147    Fifo,
148
149    /// The last-in first-out flavor.
150    Lifo,
151}
152
153/// A worker queue.
154///
155/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
156/// tasks from it. Task schedulers typically create a single worker queue per thread.
157///
158/// # Examples
159///
160/// A FIFO worker:
161///
162/// ```
163/// use crossbeam_deque::{Steal, Worker};
164///
165/// let w = Worker::new_fifo();
166/// let s = w.stealer();
167///
168/// w.push(1);
169/// w.push(2);
170/// w.push(3);
171///
172/// assert_eq!(s.steal(), Steal::Success(1));
173/// assert_eq!(w.pop(), Some(2));
174/// assert_eq!(w.pop(), Some(3));
175/// ```
176///
177/// A LIFO worker:
178///
179/// ```
180/// use crossbeam_deque::{Steal, Worker};
181///
182/// let w = Worker::new_lifo();
183/// let s = w.stealer();
184///
185/// w.push(1);
186/// w.push(2);
187/// w.push(3);
188///
189/// assert_eq!(s.steal(), Steal::Success(1));
190/// assert_eq!(w.pop(), Some(3));
191/// assert_eq!(w.pop(), Some(2));
192/// ```
193pub struct Worker<T> {
194    /// A reference to the inner representation of the queue.
195    inner: Arc<CachePadded<Inner<T>>>,
196
197    /// A copy of `inner.buffer` for quick access.
198    buffer: Cell<Buffer<T>>,
199
200    /// The flavor of the queue.
201    flavor: Flavor,
202
203    /// Indicates that the worker cannot be shared among threads.
204    _marker: PhantomData<*mut ()>, // !Send + !Sync
205}
206
207unsafe impl<T: Send> Send for Worker<T> {}
208
209impl<T> Worker<T> {
210    /// Creates a FIFO worker queue.
211    ///
212    /// Tasks are pushed and popped from opposite ends.
213    ///
214    /// # Examples
215    ///
216    /// ```
217    /// use crossbeam_deque::Worker;
218    ///
219    /// let w = Worker::<i32>::new_fifo();
220    /// ```
221    pub fn new_fifo() -> Worker<T> {
222        let buffer = Buffer::alloc(MIN_CAP);
223
224        let inner = Arc::new(CachePadded::new(Inner {
225            front: AtomicIsize::new(0),
226            back: AtomicIsize::new(0),
227            buffer: CachePadded::new(Atomic::new(buffer)),
228        }));
229
230        Worker {
231            inner,
232            buffer: Cell::new(buffer),
233            flavor: Flavor::Fifo,
234            _marker: PhantomData,
235        }
236    }
237
238    /// Creates a LIFO worker queue.
239    ///
240    /// Tasks are pushed and popped from the same end.
241    ///
242    /// # Examples
243    ///
244    /// ```
245    /// use crossbeam_deque::Worker;
246    ///
247    /// let w = Worker::<i32>::new_lifo();
248    /// ```
249    pub fn new_lifo() -> Worker<T> {
250        let buffer = Buffer::alloc(MIN_CAP);
251
252        let inner = Arc::new(CachePadded::new(Inner {
253            front: AtomicIsize::new(0),
254            back: AtomicIsize::new(0),
255            buffer: CachePadded::new(Atomic::new(buffer)),
256        }));
257
258        Worker {
259            inner,
260            buffer: Cell::new(buffer),
261            flavor: Flavor::Lifo,
262            _marker: PhantomData,
263        }
264    }
265
266    /// Creates a stealer for this queue.
267    ///
268    /// The returned stealer can be shared among threads and cloned.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// use crossbeam_deque::Worker;
274    ///
275    /// let w = Worker::<i32>::new_lifo();
276    /// let s = w.stealer();
277    /// ```
278    pub fn stealer(&self) -> Stealer<T> {
279        Stealer {
280            inner: self.inner.clone(),
281            flavor: self.flavor,
282        }
283    }
284
285    /// Resizes the internal buffer to the new capacity of `new_cap`.
286    #[cold]
287    unsafe fn resize(&self, new_cap: usize) {
288        // Load the back index, front index, and buffer.
289        let b = self.inner.back.load(Ordering::Relaxed);
290        let f = self.inner.front.load(Ordering::Relaxed);
291        let buffer = self.buffer.get();
292
293        // Allocate a new buffer and copy data from the old buffer to the new one.
294        let new = Buffer::alloc(new_cap);
295        let mut i = f;
296        while i != b {
297            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
298            i = i.wrapping_add(1);
299        }
300
301        let guard = &epoch::pin();
302
303        // Replace the old buffer with the new one.
304        self.buffer.replace(new);
305        let old =
306            self.inner
307                .buffer
308                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
309
310        // Destroy the old buffer later.
311        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
312
313        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
314        // it as soon as possible.
315        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
316            guard.flush();
317        }
318    }
319
320    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
321    /// buffer.
322    fn reserve(&self, reserve_cap: usize) {
323        if reserve_cap > 0 {
324            // Compute the current length.
325            let b = self.inner.back.load(Ordering::Relaxed);
326            let f = self.inner.front.load(Ordering::SeqCst);
327            let len = b.wrapping_sub(f) as usize;
328
329            // The current capacity.
330            let cap = self.buffer.get().cap;
331
332            // Is there enough capacity to push `reserve_cap` tasks?
333            if cap - len < reserve_cap {
334                // Keep doubling the capacity as much as is needed.
335                let mut new_cap = cap * 2;
336                while new_cap - len < reserve_cap {
337                    new_cap *= 2;
338                }
339
340                // Resize the buffer.
341                unsafe {
342                    self.resize(new_cap);
343                }
344            }
345        }
346    }
347
348    /// Returns `true` if the queue is empty.
349    ///
350    /// ```
351    /// use crossbeam_deque::Worker;
352    ///
353    /// let w = Worker::new_lifo();
354    ///
355    /// assert!(w.is_empty());
356    /// w.push(1);
357    /// assert!(!w.is_empty());
358    /// ```
359    pub fn is_empty(&self) -> bool {
360        let b = self.inner.back.load(Ordering::Relaxed);
361        let f = self.inner.front.load(Ordering::SeqCst);
362        b.wrapping_sub(f) <= 0
363    }
364
365    /// Returns the number of tasks in the deque.
366    ///
367    /// ```
368    /// use crossbeam_deque::Worker;
369    ///
370    /// let w = Worker::new_lifo();
371    ///
372    /// assert_eq!(w.len(), 0);
373    /// w.push(1);
374    /// assert_eq!(w.len(), 1);
375    /// w.push(1);
376    /// assert_eq!(w.len(), 2);
377    /// ```
378    pub fn len(&self) -> usize {
379        let b = self.inner.back.load(Ordering::Relaxed);
380        let f = self.inner.front.load(Ordering::SeqCst);
381        b.wrapping_sub(f).max(0) as usize
382    }
383
384    /// Pushes a task into the queue.
385    ///
386    /// # Examples
387    ///
388    /// ```
389    /// use crossbeam_deque::Worker;
390    ///
391    /// let w = Worker::new_lifo();
392    /// w.push(1);
393    /// w.push(2);
394    /// ```
395    pub fn push(&self, task: T) {
396        // Load the back index, front index, and buffer.
397        let b = self.inner.back.load(Ordering::Relaxed);
398        let f = self.inner.front.load(Ordering::Acquire);
399        let mut buffer = self.buffer.get();
400
401        // Calculate the length of the queue.
402        let len = b.wrapping_sub(f);
403
404        // Is the queue full?
405        if len >= buffer.cap as isize {
406            // Yes. Grow the underlying buffer.
407            unsafe {
408                self.resize(2 * buffer.cap);
409            }
410            buffer = self.buffer.get();
411        }
412
413        // Write `task` into the slot.
414        unsafe {
415            buffer.write(b, MaybeUninit::new(task));
416        }
417
418        atomic::fence(Ordering::Release);
419
420        // Increment the back index.
421        //
422        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
423        // races because it doesn't understand fences.
424        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
425    }
426
427    /// Pops a task from the queue.
428    ///
429    /// # Examples
430    ///
431    /// ```
432    /// use crossbeam_deque::Worker;
433    ///
434    /// let w = Worker::new_fifo();
435    /// w.push(1);
436    /// w.push(2);
437    ///
438    /// assert_eq!(w.pop(), Some(1));
439    /// assert_eq!(w.pop(), Some(2));
440    /// assert_eq!(w.pop(), None);
441    /// ```
442    pub fn pop(&self) -> Option<T> {
443        // Load the back and front index.
444        let b = self.inner.back.load(Ordering::Relaxed);
445        let f = self.inner.front.load(Ordering::Relaxed);
446
447        // Calculate the length of the queue.
448        let len = b.wrapping_sub(f);
449
450        // Is the queue empty?
451        if len <= 0 {
452            return None;
453        }
454
455        match self.flavor {
456            // Pop from the front of the queue.
457            Flavor::Fifo => {
458                // Try incrementing the front index to pop the task.
459                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
460                let new_f = f.wrapping_add(1);
461
462                if b.wrapping_sub(new_f) < 0 {
463                    self.inner.front.store(f, Ordering::Relaxed);
464                    return None;
465                }
466
467                unsafe {
468                    // Read the popped task.
469                    let buffer = self.buffer.get();
470                    let task = buffer.read(f).assume_init();
471
472                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
473                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
474                        self.resize(buffer.cap / 2);
475                    }
476
477                    Some(task)
478                }
479            }
480
481            // Pop from the back of the queue.
482            Flavor::Lifo => {
483                // Decrement the back index.
484                let b = b.wrapping_sub(1);
485                self.inner.back.store(b, Ordering::Relaxed);
486
487                atomic::fence(Ordering::SeqCst);
488
489                // Load the front index.
490                let f = self.inner.front.load(Ordering::Relaxed);
491
492                // Compute the length after the back index was decremented.
493                let len = b.wrapping_sub(f);
494
495                if len < 0 {
496                    // The queue is empty. Restore the back index to the original task.
497                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
498                    None
499                } else {
500                    // Read the task to be popped.
501                    let buffer = self.buffer.get();
502                    let mut task = unsafe { Some(buffer.read(b)) };
503
504                    // Are we popping the last task from the queue?
505                    if len == 0 {
506                        // Try incrementing the front index.
507                        if self
508                            .inner
509                            .front
510                            .compare_exchange(
511                                f,
512                                f.wrapping_add(1),
513                                Ordering::SeqCst,
514                                Ordering::Relaxed,
515                            )
516                            .is_err()
517                        {
518                            // Failed. We didn't pop anything. Reset to `None`.
519                            task.take();
520                        }
521
522                        // Restore the back index to the original task.
523                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
524                    } else {
525                        // Shrink the buffer if `len` is less than one fourth of the capacity.
526                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
527                            unsafe {
528                                self.resize(buffer.cap / 2);
529                            }
530                        }
531                    }
532
533                    task.map(|t| unsafe { t.assume_init() })
534                }
535            }
536        }
537    }
538}
539
540impl<T> fmt::Debug for Worker<T> {
541    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542        f.pad("Worker { .. }")
543    }
544}
545
546/// A stealer handle of a worker queue.
547///
548/// Stealers can be shared among threads.
549///
550/// Task schedulers typically have a single worker queue per worker thread.
551///
552/// # Examples
553///
554/// ```
555/// use crossbeam_deque::{Steal, Worker};
556///
557/// let w = Worker::new_lifo();
558/// w.push(1);
559/// w.push(2);
560///
561/// let s = w.stealer();
562/// assert_eq!(s.steal(), Steal::Success(1));
563/// assert_eq!(s.steal(), Steal::Success(2));
564/// assert_eq!(s.steal(), Steal::Empty);
565/// ```
566pub struct Stealer<T> {
567    /// A reference to the inner representation of the queue.
568    inner: Arc<CachePadded<Inner<T>>>,
569
570    /// The flavor of the queue.
571    flavor: Flavor,
572}
573
574unsafe impl<T: Send> Send for Stealer<T> {}
575unsafe impl<T: Send> Sync for Stealer<T> {}
576
577impl<T> Stealer<T> {
578    /// Returns `true` if the queue is empty.
579    ///
580    /// ```
581    /// use crossbeam_deque::Worker;
582    ///
583    /// let w = Worker::new_lifo();
584    /// let s = w.stealer();
585    ///
586    /// assert!(s.is_empty());
587    /// w.push(1);
588    /// assert!(!s.is_empty());
589    /// ```
590    pub fn is_empty(&self) -> bool {
591        let f = self.inner.front.load(Ordering::Acquire);
592        atomic::fence(Ordering::SeqCst);
593        let b = self.inner.back.load(Ordering::Acquire);
594        b.wrapping_sub(f) <= 0
595    }
596
597    /// Returns the number of tasks in the deque.
598    ///
599    /// ```
600    /// use crossbeam_deque::Worker;
601    ///
602    /// let w = Worker::new_lifo();
603    /// let s = w.stealer();
604    ///
605    /// assert_eq!(s.len(), 0);
606    /// w.push(1);
607    /// assert_eq!(s.len(), 1);
608    /// w.push(2);
609    /// assert_eq!(s.len(), 2);
610    /// ```
611    pub fn len(&self) -> usize {
612        let f = self.inner.front.load(Ordering::Acquire);
613        atomic::fence(Ordering::SeqCst);
614        let b = self.inner.back.load(Ordering::Acquire);
615        b.wrapping_sub(f).max(0) as usize
616    }
617
618    /// Steals a task from the queue.
619    ///
620    /// # Examples
621    ///
622    /// ```
623    /// use crossbeam_deque::{Steal, Worker};
624    ///
625    /// let w = Worker::new_lifo();
626    /// w.push(1);
627    /// w.push(2);
628    ///
629    /// let s = w.stealer();
630    /// assert_eq!(s.steal(), Steal::Success(1));
631    /// assert_eq!(s.steal(), Steal::Success(2));
632    /// ```
633    pub fn steal(&self) -> Steal<T> {
634        // Load the front index.
635        let f = self.inner.front.load(Ordering::Acquire);
636
637        // A SeqCst fence is needed here.
638        //
639        // If the current thread is already pinned (reentrantly), we must manually issue the
640        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
641        // have to.
642        if epoch::is_pinned() {
643            atomic::fence(Ordering::SeqCst);
644        }
645
646        let guard = &epoch::pin();
647
648        // Load the back index.
649        let b = self.inner.back.load(Ordering::Acquire);
650
651        // Is the queue empty?
652        if b.wrapping_sub(f) <= 0 {
653            return Steal::Empty;
654        }
655
656        // Load the buffer and read the task at the front.
657        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
658        let task = unsafe { buffer.deref().read(f) };
659
660        // Try incrementing the front index to steal the task.
661        // If the buffer has been swapped or the increment fails, we retry.
662        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
663            || self
664                .inner
665                .front
666                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
667                .is_err()
668        {
669            // We didn't steal this task, forget it.
670            return Steal::Retry;
671        }
672
673        // Return the stolen task.
674        Steal::Success(unsafe { task.assume_init() })
675    }
676
677    /// Steals a batch of tasks and pushes them into another worker.
678    ///
679    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
680    /// steal around half of the tasks in the queue, but also not more than some constant limit.
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// use crossbeam_deque::Worker;
686    ///
687    /// let w1 = Worker::new_fifo();
688    /// w1.push(1);
689    /// w1.push(2);
690    /// w1.push(3);
691    /// w1.push(4);
692    ///
693    /// let s = w1.stealer();
694    /// let w2 = Worker::new_fifo();
695    ///
696    /// let _ = s.steal_batch(&w2);
697    /// assert_eq!(w2.pop(), Some(1));
698    /// assert_eq!(w2.pop(), Some(2));
699    /// ```
700    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
701        self.steal_batch_with_limit(dest, MAX_BATCH)
702    }
703
704    /// Steals no more than `limit` of tasks and pushes them into another worker.
705    ///
706    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
707    /// steal around half of the tasks in the queue, but also not more than the given limit.
708    ///
709    /// # Examples
710    ///
711    /// ```
712    /// use crossbeam_deque::Worker;
713    ///
714    /// let w1 = Worker::new_fifo();
715    /// w1.push(1);
716    /// w1.push(2);
717    /// w1.push(3);
718    /// w1.push(4);
719    /// w1.push(5);
720    /// w1.push(6);
721    ///
722    /// let s = w1.stealer();
723    /// let w2 = Worker::new_fifo();
724    ///
725    /// let _ = s.steal_batch_with_limit(&w2, 2);
726    /// assert_eq!(w2.pop(), Some(1));
727    /// assert_eq!(w2.pop(), Some(2));
728    /// assert_eq!(w2.pop(), None);
729    ///
730    /// w1.push(7);
731    /// w1.push(8);
732    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
733    /// // half of the elements are currently popped, but the number of popped elements is considered
734    /// // an implementation detail that may be changed in the future.
735    /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
736    /// assert_eq!(w2.len(), 3);
737    /// ```
738    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
739        assert!(limit > 0);
740        if Arc::ptr_eq(&self.inner, &dest.inner) {
741            if dest.is_empty() {
742                return Steal::Empty;
743            } else {
744                return Steal::Success(());
745            }
746        }
747
748        // Load the front index.
749        let mut f = self.inner.front.load(Ordering::Acquire);
750
751        // A SeqCst fence is needed here.
752        //
753        // If the current thread is already pinned (reentrantly), we must manually issue the
754        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
755        // have to.
756        if epoch::is_pinned() {
757            atomic::fence(Ordering::SeqCst);
758        }
759
760        let guard = &epoch::pin();
761
762        // Load the back index.
763        let b = self.inner.back.load(Ordering::Acquire);
764
765        // Is the queue empty?
766        let len = b.wrapping_sub(f);
767        if len <= 0 {
768            return Steal::Empty;
769        }
770
771        // Reserve capacity for the stolen batch.
772        let batch_size = cmp::min((len as usize + 1) / 2, limit);
773        dest.reserve(batch_size);
774        let mut batch_size = batch_size as isize;
775
776        // Get the destination buffer and back index.
777        let dest_buffer = dest.buffer.get();
778        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
779
780        // Load the buffer.
781        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
782
783        match self.flavor {
784            // Steal a batch of tasks from the front at once.
785            Flavor::Fifo => {
786                // Copy the batch from the source to the destination buffer.
787                match dest.flavor {
788                    Flavor::Fifo => {
789                        for i in 0..batch_size {
790                            unsafe {
791                                let task = buffer.deref().read(f.wrapping_add(i));
792                                dest_buffer.write(dest_b.wrapping_add(i), task);
793                            }
794                        }
795                    }
796                    Flavor::Lifo => {
797                        for i in 0..batch_size {
798                            unsafe {
799                                let task = buffer.deref().read(f.wrapping_add(i));
800                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
801                            }
802                        }
803                    }
804                }
805
806                // Try incrementing the front index to steal the batch.
807                // If the buffer has been swapped or the increment fails, we retry.
808                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
809                    || self
810                        .inner
811                        .front
812                        .compare_exchange(
813                            f,
814                            f.wrapping_add(batch_size),
815                            Ordering::SeqCst,
816                            Ordering::Relaxed,
817                        )
818                        .is_err()
819                {
820                    return Steal::Retry;
821                }
822
823                dest_b = dest_b.wrapping_add(batch_size);
824            }
825
826            // Steal a batch of tasks from the front one by one.
827            Flavor::Lifo => {
828                // This loop may modify the batch_size, which triggers a clippy lint warning.
829                // Use a new variable to avoid the warning, and to make it clear we aren't
830                // modifying the loop exit condition during iteration.
831                let original_batch_size = batch_size;
832
833                for i in 0..original_batch_size {
834                    // If this is not the first steal, check whether the queue is empty.
835                    if i > 0 {
836                        // We've already got the current front index. Now execute the fence to
837                        // synchronize with other threads.
838                        atomic::fence(Ordering::SeqCst);
839
840                        // Load the back index.
841                        let b = self.inner.back.load(Ordering::Acquire);
842
843                        // Is the queue empty?
844                        if b.wrapping_sub(f) <= 0 {
845                            batch_size = i;
846                            break;
847                        }
848                    }
849
850                    // Read the task at the front.
851                    let task = unsafe { buffer.deref().read(f) };
852
853                    // Try incrementing the front index to steal the task.
854                    // If the buffer has been swapped or the increment fails, we retry.
855                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
856                        || self
857                            .inner
858                            .front
859                            .compare_exchange(
860                                f,
861                                f.wrapping_add(1),
862                                Ordering::SeqCst,
863                                Ordering::Relaxed,
864                            )
865                            .is_err()
866                    {
867                        // We didn't steal this task, forget it and break from the loop.
868                        batch_size = i;
869                        break;
870                    }
871
872                    // Write the stolen task into the destination buffer.
873                    unsafe {
874                        dest_buffer.write(dest_b, task);
875                    }
876
877                    // Move the source front index and the destination back index one step forward.
878                    f = f.wrapping_add(1);
879                    dest_b = dest_b.wrapping_add(1);
880                }
881
882                // If we didn't steal anything, the operation needs to be retried.
883                if batch_size == 0 {
884                    return Steal::Retry;
885                }
886
887                // If stealing into a FIFO queue, stolen tasks need to be reversed.
888                if dest.flavor == Flavor::Fifo {
889                    for i in 0..batch_size / 2 {
890                        unsafe {
891                            let i1 = dest_b.wrapping_sub(batch_size - i);
892                            let i2 = dest_b.wrapping_sub(i + 1);
893                            let t1 = dest_buffer.read(i1);
894                            let t2 = dest_buffer.read(i2);
895                            dest_buffer.write(i1, t2);
896                            dest_buffer.write(i2, t1);
897                        }
898                    }
899                }
900            }
901        }
902
903        atomic::fence(Ordering::Release);
904
905        // Update the back index in the destination queue.
906        //
907        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
908        // races because it doesn't understand fences.
909        dest.inner.back.store(dest_b, Ordering::Release);
910
911        // Return with success.
912        Steal::Success(())
913    }
914
915    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
916    ///
917    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
918    /// steal around half of the tasks in the queue, but also not more than some constant limit.
919    ///
920    /// # Examples
921    ///
922    /// ```
923    /// use crossbeam_deque::{Steal, Worker};
924    ///
925    /// let w1 = Worker::new_fifo();
926    /// w1.push(1);
927    /// w1.push(2);
928    /// w1.push(3);
929    /// w1.push(4);
930    ///
931    /// let s = w1.stealer();
932    /// let w2 = Worker::new_fifo();
933    ///
934    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
935    /// assert_eq!(w2.pop(), Some(2));
936    /// ```
937    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
938        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
939    }
940
941    /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
942    /// that worker.
943    ///
944    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
945    /// steal around half of the tasks in the queue, but also not more than the given limit.
946    ///
947    /// # Examples
948    ///
949    /// ```
950    /// use crossbeam_deque::{Steal, Worker};
951    ///
952    /// let w1 = Worker::new_fifo();
953    /// w1.push(1);
954    /// w1.push(2);
955    /// w1.push(3);
956    /// w1.push(4);
957    /// w1.push(5);
958    /// w1.push(6);
959    ///
960    /// let s = w1.stealer();
961    /// let w2 = Worker::new_fifo();
962    ///
963    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
964    /// assert_eq!(w2.pop(), Some(2));
965    /// assert_eq!(w2.pop(), None);
966    ///
967    /// w1.push(7);
968    /// w1.push(8);
969    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
970    /// // half of the elements are currently popped, but the number of popped elements is considered
971    /// // an implementation detail that may be changed in the future.
972    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
973    /// assert_eq!(w2.pop(), Some(4));
974    /// assert_eq!(w2.pop(), Some(5));
975    /// assert_eq!(w2.pop(), None);
976    /// ```
977    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
978        assert!(limit > 0);
979        if Arc::ptr_eq(&self.inner, &dest.inner) {
980            match dest.pop() {
981                None => return Steal::Empty,
982                Some(task) => return Steal::Success(task),
983            }
984        }
985
986        // Load the front index.
987        let mut f = self.inner.front.load(Ordering::Acquire);
988
989        // A SeqCst fence is needed here.
990        //
991        // If the current thread is already pinned (reentrantly), we must manually issue the
992        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
993        // have to.
994        if epoch::is_pinned() {
995            atomic::fence(Ordering::SeqCst);
996        }
997
998        let guard = &epoch::pin();
999
1000        // Load the back index.
1001        let b = self.inner.back.load(Ordering::Acquire);
1002
1003        // Is the queue empty?
1004        let len = b.wrapping_sub(f);
1005        if len <= 0 {
1006            return Steal::Empty;
1007        }
1008
1009        // Reserve capacity for the stolen batch.
1010        let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1011        dest.reserve(batch_size);
1012        let mut batch_size = batch_size as isize;
1013
1014        // Get the destination buffer and back index.
1015        let dest_buffer = dest.buffer.get();
1016        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1017
1018        // Load the buffer
1019        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1020
1021        // Read the task at the front.
1022        let mut task = unsafe { buffer.deref().read(f) };
1023
1024        match self.flavor {
1025            // Steal a batch of tasks from the front at once.
1026            Flavor::Fifo => {
1027                // Copy the batch from the source to the destination buffer.
1028                match dest.flavor {
1029                    Flavor::Fifo => {
1030                        for i in 0..batch_size {
1031                            unsafe {
1032                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1033                                dest_buffer.write(dest_b.wrapping_add(i), task);
1034                            }
1035                        }
1036                    }
1037                    Flavor::Lifo => {
1038                        for i in 0..batch_size {
1039                            unsafe {
1040                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1041                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1042                            }
1043                        }
1044                    }
1045                }
1046
1047                // Try incrementing the front index to steal the task.
1048                // If the buffer has been swapped or the increment fails, we retry.
1049                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1050                    || self
1051                        .inner
1052                        .front
1053                        .compare_exchange(
1054                            f,
1055                            f.wrapping_add(batch_size + 1),
1056                            Ordering::SeqCst,
1057                            Ordering::Relaxed,
1058                        )
1059                        .is_err()
1060                {
1061                    // We didn't steal this task, forget it.
1062                    return Steal::Retry;
1063                }
1064
1065                dest_b = dest_b.wrapping_add(batch_size);
1066            }
1067
1068            // Steal a batch of tasks from the front one by one.
1069            Flavor::Lifo => {
1070                // Try incrementing the front index to steal the task.
1071                if self
1072                    .inner
1073                    .front
1074                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1075                    .is_err()
1076                {
1077                    // We didn't steal this task, forget it.
1078                    return Steal::Retry;
1079                }
1080
1081                // Move the front index one step forward.
1082                f = f.wrapping_add(1);
1083
1084                // Repeat the same procedure for the batch steals.
1085                //
1086                // This loop may modify the batch_size, which triggers a clippy lint warning.
1087                // Use a new variable to avoid the warning, and to make it clear we aren't
1088                // modifying the loop exit condition during iteration.
1089                let original_batch_size = batch_size;
1090                for i in 0..original_batch_size {
1091                    // We've already got the current front index. Now execute the fence to
1092                    // synchronize with other threads.
1093                    atomic::fence(Ordering::SeqCst);
1094
1095                    // Load the back index.
1096                    let b = self.inner.back.load(Ordering::Acquire);
1097
1098                    // Is the queue empty?
1099                    if b.wrapping_sub(f) <= 0 {
1100                        batch_size = i;
1101                        break;
1102                    }
1103
1104                    // Read the task at the front.
1105                    let tmp = unsafe { buffer.deref().read(f) };
1106
1107                    // Try incrementing the front index to steal the task.
1108                    // If the buffer has been swapped or the increment fails, we retry.
1109                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1110                        || self
1111                            .inner
1112                            .front
1113                            .compare_exchange(
1114                                f,
1115                                f.wrapping_add(1),
1116                                Ordering::SeqCst,
1117                                Ordering::Relaxed,
1118                            )
1119                            .is_err()
1120                    {
1121                        // We didn't steal this task, forget it and break from the loop.
1122                        batch_size = i;
1123                        break;
1124                    }
1125
1126                    // Write the previously stolen task into the destination buffer.
1127                    unsafe {
1128                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1129                    }
1130
1131                    // Move the source front index and the destination back index one step forward.
1132                    f = f.wrapping_add(1);
1133                    dest_b = dest_b.wrapping_add(1);
1134                }
1135
1136                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1137                if dest.flavor == Flavor::Fifo {
1138                    for i in 0..batch_size / 2 {
1139                        unsafe {
1140                            let i1 = dest_b.wrapping_sub(batch_size - i);
1141                            let i2 = dest_b.wrapping_sub(i + 1);
1142                            let t1 = dest_buffer.read(i1);
1143                            let t2 = dest_buffer.read(i2);
1144                            dest_buffer.write(i1, t2);
1145                            dest_buffer.write(i2, t1);
1146                        }
1147                    }
1148                }
1149            }
1150        }
1151
1152        atomic::fence(Ordering::Release);
1153
1154        // Update the back index in the destination queue.
1155        //
1156        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1157        // races because it doesn't understand fences.
1158        dest.inner.back.store(dest_b, Ordering::Release);
1159
1160        // Return with success.
1161        Steal::Success(unsafe { task.assume_init() })
1162    }
1163}
1164
1165impl<T> Clone for Stealer<T> {
1166    fn clone(&self) -> Stealer<T> {
1167        Stealer {
1168            inner: self.inner.clone(),
1169            flavor: self.flavor,
1170        }
1171    }
1172}
1173
1174impl<T> fmt::Debug for Stealer<T> {
1175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176        f.pad("Stealer { .. }")
1177    }
1178}
1179
1180// Bits indicating the state of a slot:
1181// * If a task has been written into the slot, `WRITE` is set.
1182// * If a task has been read from the slot, `READ` is set.
1183// * If the block is being destroyed, `DESTROY` is set.
1184const WRITE: usize = 1;
1185const READ: usize = 2;
1186const DESTROY: usize = 4;
1187
1188// Each block covers one "lap" of indices.
1189const LAP: usize = 64;
1190// The maximum number of values a block can hold.
1191const BLOCK_CAP: usize = LAP - 1;
1192// How many lower bits are reserved for metadata.
1193const SHIFT: usize = 1;
1194// Indicates that the block is not the last one.
1195const HAS_NEXT: usize = 1;
1196
1197/// A slot in a block.
1198struct Slot<T> {
1199    /// The task.
1200    task: UnsafeCell<MaybeUninit<T>>,
1201
1202    /// The state of the slot.
1203    state: AtomicUsize,
1204}
1205
1206impl<T> Slot<T> {
1207    /// Waits until a task is written into the slot.
1208    fn wait_write(&self) {
1209        let backoff = Backoff::new();
1210        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1211            backoff.snooze();
1212        }
1213    }
1214}
1215
1216/// A block in a linked list.
1217///
1218/// Each block in the list can hold up to `BLOCK_CAP` values.
1219struct Block<T> {
1220    /// The next block in the linked list.
1221    next: AtomicPtr<Block<T>>,
1222
1223    /// Slots for values.
1224    slots: [Slot<T>; BLOCK_CAP],
1225}
1226
1227impl<T> Block<T> {
1228    const LAYOUT: Layout = {
1229        let layout = Layout::new::<Self>();
1230        assert!(
1231            layout.size() != 0,
1232            "Block should never be zero-sized, as it has an AtomicPtr field"
1233        );
1234        layout
1235    };
1236
1237    /// Creates an empty block.
1238    fn new() -> Box<Self> {
1239        // SAFETY: layout is not zero-sized
1240        let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1241        // Handle allocation failure
1242        if ptr.is_null() {
1243            handle_alloc_error(Self::LAYOUT)
1244        }
1245        // SAFETY: This is safe because:
1246        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1247        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1248        //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1249        //       holds a MaybeUninit.
1250        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1251        // TODO: unsafe { Box::new_zeroed().assume_init() }
1252        unsafe { Box::from_raw(ptr.cast()) }
1253    }
1254
1255    /// Waits until the next pointer is set.
1256    fn wait_next(&self) -> *mut Block<T> {
1257        let backoff = Backoff::new();
1258        loop {
1259            let next = self.next.load(Ordering::Acquire);
1260            if !next.is_null() {
1261                return next;
1262            }
1263            backoff.snooze();
1264        }
1265    }
1266
1267    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1268    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1269        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1270        // begun destruction of the block.
1271        for i in (0..count).rev() {
1272            let slot = (*this).slots.get_unchecked(i);
1273
1274            // Mark the `DESTROY` bit if a thread is still using the slot.
1275            if slot.state.load(Ordering::Acquire) & READ == 0
1276                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1277            {
1278                // If a thread is still using the slot, it will continue destruction of the block.
1279                return;
1280            }
1281        }
1282
1283        // No thread is using the block, now it is safe to destroy it.
1284        drop(Box::from_raw(this));
1285    }
1286}
1287
1288/// A position in a queue.
1289struct Position<T> {
1290    /// The index in the queue.
1291    index: AtomicUsize,
1292
1293    /// The block in the linked list.
1294    block: AtomicPtr<Block<T>>,
1295}
1296
1297/// An injector queue.
1298///
1299/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1300/// a single injector queue, which is the entry point for new tasks.
1301///
1302/// # Examples
1303///
1304/// ```
1305/// use crossbeam_deque::{Injector, Steal};
1306///
1307/// let q = Injector::new();
1308/// q.push(1);
1309/// q.push(2);
1310///
1311/// assert_eq!(q.steal(), Steal::Success(1));
1312/// assert_eq!(q.steal(), Steal::Success(2));
1313/// assert_eq!(q.steal(), Steal::Empty);
1314/// ```
1315pub struct Injector<T> {
1316    /// The head of the queue.
1317    head: CachePadded<Position<T>>,
1318
1319    /// The tail of the queue.
1320    tail: CachePadded<Position<T>>,
1321
1322    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1323    _marker: PhantomData<T>,
1324}
1325
1326unsafe impl<T: Send> Send for Injector<T> {}
1327unsafe impl<T: Send> Sync for Injector<T> {}
1328
1329impl<T> Default for Injector<T> {
1330    fn default() -> Self {
1331        let block = Box::into_raw(Block::<T>::new());
1332        Self {
1333            head: CachePadded::new(Position {
1334                block: AtomicPtr::new(block),
1335                index: AtomicUsize::new(0),
1336            }),
1337            tail: CachePadded::new(Position {
1338                block: AtomicPtr::new(block),
1339                index: AtomicUsize::new(0),
1340            }),
1341            _marker: PhantomData,
1342        }
1343    }
1344}
1345
1346impl<T> Injector<T> {
1347    /// Creates a new injector queue.
1348    ///
1349    /// # Examples
1350    ///
1351    /// ```
1352    /// use crossbeam_deque::Injector;
1353    ///
1354    /// let q = Injector::<i32>::new();
1355    /// ```
1356    pub fn new() -> Injector<T> {
1357        Self::default()
1358    }
1359
1360    /// Pushes a task into the queue.
1361    ///
1362    /// # Examples
1363    ///
1364    /// ```
1365    /// use crossbeam_deque::Injector;
1366    ///
1367    /// let w = Injector::new();
1368    /// w.push(1);
1369    /// w.push(2);
1370    /// ```
1371    pub fn push(&self, task: T) {
1372        let backoff = Backoff::new();
1373        let mut tail = self.tail.index.load(Ordering::Acquire);
1374        let mut block = self.tail.block.load(Ordering::Acquire);
1375        let mut next_block = None;
1376
1377        loop {
1378            // Calculate the offset of the index into the block.
1379            let offset = (tail >> SHIFT) % LAP;
1380
1381            // If we reached the end of the block, wait until the next one is installed.
1382            if offset == BLOCK_CAP {
1383                backoff.snooze();
1384                tail = self.tail.index.load(Ordering::Acquire);
1385                block = self.tail.block.load(Ordering::Acquire);
1386                continue;
1387            }
1388
1389            // If we're going to have to install the next block, allocate it in advance in order to
1390            // make the wait for other threads as short as possible.
1391            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1392                next_block = Some(Block::<T>::new());
1393            }
1394
1395            let new_tail = tail + (1 << SHIFT);
1396
1397            // Try advancing the tail forward.
1398            match self.tail.index.compare_exchange_weak(
1399                tail,
1400                new_tail,
1401                Ordering::SeqCst,
1402                Ordering::Acquire,
1403            ) {
1404                Ok(_) => unsafe {
1405                    // If we've reached the end of the block, install the next one.
1406                    if offset + 1 == BLOCK_CAP {
1407                        let next_block = Box::into_raw(next_block.unwrap());
1408                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1409
1410                        self.tail.block.store(next_block, Ordering::Release);
1411                        self.tail.index.store(next_index, Ordering::Release);
1412                        (*block).next.store(next_block, Ordering::Release);
1413                    }
1414
1415                    // Write the task into the slot.
1416                    let slot = (*block).slots.get_unchecked(offset);
1417                    slot.task.get().write(MaybeUninit::new(task));
1418                    slot.state.fetch_or(WRITE, Ordering::Release);
1419
1420                    return;
1421                },
1422                Err(t) => {
1423                    tail = t;
1424                    block = self.tail.block.load(Ordering::Acquire);
1425                    backoff.spin();
1426                }
1427            }
1428        }
1429    }
1430
1431    /// Steals a task from the queue.
1432    ///
1433    /// # Examples
1434    ///
1435    /// ```
1436    /// use crossbeam_deque::{Injector, Steal};
1437    ///
1438    /// let q = Injector::new();
1439    /// q.push(1);
1440    /// q.push(2);
1441    ///
1442    /// assert_eq!(q.steal(), Steal::Success(1));
1443    /// assert_eq!(q.steal(), Steal::Success(2));
1444    /// assert_eq!(q.steal(), Steal::Empty);
1445    /// ```
1446    pub fn steal(&self) -> Steal<T> {
1447        let mut head;
1448        let mut block;
1449        let mut offset;
1450
1451        let backoff = Backoff::new();
1452        loop {
1453            head = self.head.index.load(Ordering::Acquire);
1454            block = self.head.block.load(Ordering::Acquire);
1455
1456            // Calculate the offset of the index into the block.
1457            offset = (head >> SHIFT) % LAP;
1458
1459            // If we reached the end of the block, wait until the next one is installed.
1460            if offset == BLOCK_CAP {
1461                backoff.snooze();
1462            } else {
1463                break;
1464            }
1465        }
1466
1467        let mut new_head = head + (1 << SHIFT);
1468
1469        if new_head & HAS_NEXT == 0 {
1470            atomic::fence(Ordering::SeqCst);
1471            let tail = self.tail.index.load(Ordering::Relaxed);
1472
1473            // If the tail equals the head, that means the queue is empty.
1474            if head >> SHIFT == tail >> SHIFT {
1475                return Steal::Empty;
1476            }
1477
1478            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1479            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1480                new_head |= HAS_NEXT;
1481            }
1482        }
1483
1484        // Try moving the head index forward.
1485        if self
1486            .head
1487            .index
1488            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1489            .is_err()
1490        {
1491            return Steal::Retry;
1492        }
1493
1494        unsafe {
1495            // If we've reached the end of the block, move to the next one.
1496            if offset + 1 == BLOCK_CAP {
1497                let next = (*block).wait_next();
1498                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1499                if !(*next).next.load(Ordering::Relaxed).is_null() {
1500                    next_index |= HAS_NEXT;
1501                }
1502
1503                self.head.block.store(next, Ordering::Release);
1504                self.head.index.store(next_index, Ordering::Release);
1505            }
1506
1507            // Read the task.
1508            let slot = (*block).slots.get_unchecked(offset);
1509            slot.wait_write();
1510            let task = slot.task.get().read().assume_init();
1511
1512            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1513            // but couldn't because we were busy reading from the slot.
1514            if (offset + 1 == BLOCK_CAP)
1515                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1516            {
1517                Block::destroy(block, offset);
1518            }
1519
1520            Steal::Success(task)
1521        }
1522    }
1523
1524    /// Steals a batch of tasks and pushes them into a worker.
1525    ///
1526    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1527    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1528    ///
1529    /// # Examples
1530    ///
1531    /// ```
1532    /// use crossbeam_deque::{Injector, Worker};
1533    ///
1534    /// let q = Injector::new();
1535    /// q.push(1);
1536    /// q.push(2);
1537    /// q.push(3);
1538    /// q.push(4);
1539    ///
1540    /// let w = Worker::new_fifo();
1541    /// let _ = q.steal_batch(&w);
1542    /// assert_eq!(w.pop(), Some(1));
1543    /// assert_eq!(w.pop(), Some(2));
1544    /// ```
1545    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1546        self.steal_batch_with_limit(dest, MAX_BATCH)
1547    }
1548
1549    /// Steals no more than of tasks and pushes them into a worker.
1550    ///
1551    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1552    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1553    ///
1554    /// # Examples
1555    ///
1556    /// ```
1557    /// use crossbeam_deque::{Injector, Worker};
1558    ///
1559    /// let q = Injector::new();
1560    /// q.push(1);
1561    /// q.push(2);
1562    /// q.push(3);
1563    /// q.push(4);
1564    /// q.push(5);
1565    /// q.push(6);
1566    ///
1567    /// let w = Worker::new_fifo();
1568    /// let _ = q.steal_batch_with_limit(&w, 2);
1569    /// assert_eq!(w.pop(), Some(1));
1570    /// assert_eq!(w.pop(), Some(2));
1571    /// assert_eq!(w.pop(), None);
1572    ///
1573    /// q.push(7);
1574    /// q.push(8);
1575    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1576    /// // half of the elements are currently popped, but the number of popped elements is considered
1577    /// // an implementation detail that may be changed in the future.
1578    /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1579    /// assert_eq!(w.len(), 3);
1580    /// ```
1581    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1582        assert!(limit > 0);
1583        let mut head;
1584        let mut block;
1585        let mut offset;
1586
1587        let backoff = Backoff::new();
1588        loop {
1589            head = self.head.index.load(Ordering::Acquire);
1590            block = self.head.block.load(Ordering::Acquire);
1591
1592            // Calculate the offset of the index into the block.
1593            offset = (head >> SHIFT) % LAP;
1594
1595            // If we reached the end of the block, wait until the next one is installed.
1596            if offset == BLOCK_CAP {
1597                backoff.snooze();
1598            } else {
1599                break;
1600            }
1601        }
1602
1603        let mut new_head = head;
1604        let advance;
1605
1606        if new_head & HAS_NEXT == 0 {
1607            atomic::fence(Ordering::SeqCst);
1608            let tail = self.tail.index.load(Ordering::Relaxed);
1609
1610            // If the tail equals the head, that means the queue is empty.
1611            if head >> SHIFT == tail >> SHIFT {
1612                return Steal::Empty;
1613            }
1614
1615            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1616            // the right batch size to steal.
1617            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1618                new_head |= HAS_NEXT;
1619                // We can steal all tasks till the end of the block.
1620                advance = (BLOCK_CAP - offset).min(limit);
1621            } else {
1622                let len = (tail - head) >> SHIFT;
1623                // Steal half of the available tasks.
1624                advance = ((len + 1) / 2).min(limit);
1625            }
1626        } else {
1627            // We can steal all tasks till the end of the block.
1628            advance = (BLOCK_CAP - offset).min(limit);
1629        }
1630
1631        new_head += advance << SHIFT;
1632        let new_offset = offset + advance;
1633
1634        // Try moving the head index forward.
1635        if self
1636            .head
1637            .index
1638            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1639            .is_err()
1640        {
1641            return Steal::Retry;
1642        }
1643
1644        // Reserve capacity for the stolen batch.
1645        let batch_size = new_offset - offset;
1646        dest.reserve(batch_size);
1647
1648        // Get the destination buffer and back index.
1649        let dest_buffer = dest.buffer.get();
1650        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1651
1652        unsafe {
1653            // If we've reached the end of the block, move to the next one.
1654            if new_offset == BLOCK_CAP {
1655                let next = (*block).wait_next();
1656                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1657                if !(*next).next.load(Ordering::Relaxed).is_null() {
1658                    next_index |= HAS_NEXT;
1659                }
1660
1661                self.head.block.store(next, Ordering::Release);
1662                self.head.index.store(next_index, Ordering::Release);
1663            }
1664
1665            // Copy values from the injector into the destination queue.
1666            match dest.flavor {
1667                Flavor::Fifo => {
1668                    for i in 0..batch_size {
1669                        // Read the task.
1670                        let slot = (*block).slots.get_unchecked(offset + i);
1671                        slot.wait_write();
1672                        let task = slot.task.get().read();
1673
1674                        // Write it into the destination queue.
1675                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1676                    }
1677                }
1678
1679                Flavor::Lifo => {
1680                    for i in 0..batch_size {
1681                        // Read the task.
1682                        let slot = (*block).slots.get_unchecked(offset + i);
1683                        slot.wait_write();
1684                        let task = slot.task.get().read();
1685
1686                        // Write it into the destination queue.
1687                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1688                    }
1689                }
1690            }
1691
1692            atomic::fence(Ordering::Release);
1693
1694            // Update the back index in the destination queue.
1695            //
1696            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1697            // data races because it doesn't understand fences.
1698            dest.inner
1699                .back
1700                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1701
1702            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1703            // but couldn't because we were busy reading from the slot.
1704            if new_offset == BLOCK_CAP {
1705                Block::destroy(block, offset);
1706            } else {
1707                for i in offset..new_offset {
1708                    let slot = (*block).slots.get_unchecked(i);
1709
1710                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1711                        Block::destroy(block, offset);
1712                        break;
1713                    }
1714                }
1715            }
1716
1717            Steal::Success(())
1718        }
1719    }
1720
1721    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1722    ///
1723    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1724    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1725    ///
1726    /// # Examples
1727    ///
1728    /// ```
1729    /// use crossbeam_deque::{Injector, Steal, Worker};
1730    ///
1731    /// let q = Injector::new();
1732    /// q.push(1);
1733    /// q.push(2);
1734    /// q.push(3);
1735    /// q.push(4);
1736    ///
1737    /// let w = Worker::new_fifo();
1738    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1739    /// assert_eq!(w.pop(), Some(2));
1740    /// ```
1741    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1742        // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1743        // better, but we may change it in the future to be compatible with the same method in Stealer.
1744        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1745    }
1746
1747    /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1748    ///
1749    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1750    /// steal around half of the tasks in the queue, but also not more than the given limit.
1751    ///
1752    /// # Examples
1753    ///
1754    /// ```
1755    /// use crossbeam_deque::{Injector, Steal, Worker};
1756    ///
1757    /// let q = Injector::new();
1758    /// q.push(1);
1759    /// q.push(2);
1760    /// q.push(3);
1761    /// q.push(4);
1762    /// q.push(5);
1763    /// q.push(6);
1764    ///
1765    /// let w = Worker::new_fifo();
1766    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1767    /// assert_eq!(w.pop(), Some(2));
1768    /// assert_eq!(w.pop(), None);
1769    ///
1770    /// q.push(7);
1771    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1772    /// // half of the elements are currently popped, but the number of popped elements is considered
1773    /// // an implementation detail that may be changed in the future.
1774    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1775    /// assert_eq!(w.pop(), Some(4));
1776    /// assert_eq!(w.pop(), Some(5));
1777    /// assert_eq!(w.pop(), None);
1778    /// ```
1779    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1780        assert!(limit > 0);
1781        let mut head;
1782        let mut block;
1783        let mut offset;
1784
1785        let backoff = Backoff::new();
1786        loop {
1787            head = self.head.index.load(Ordering::Acquire);
1788            block = self.head.block.load(Ordering::Acquire);
1789
1790            // Calculate the offset of the index into the block.
1791            offset = (head >> SHIFT) % LAP;
1792
1793            // If we reached the end of the block, wait until the next one is installed.
1794            if offset == BLOCK_CAP {
1795                backoff.snooze();
1796            } else {
1797                break;
1798            }
1799        }
1800
1801        let mut new_head = head;
1802        let advance;
1803
1804        if new_head & HAS_NEXT == 0 {
1805            atomic::fence(Ordering::SeqCst);
1806            let tail = self.tail.index.load(Ordering::Relaxed);
1807
1808            // If the tail equals the head, that means the queue is empty.
1809            if head >> SHIFT == tail >> SHIFT {
1810                return Steal::Empty;
1811            }
1812
1813            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1814            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1815                new_head |= HAS_NEXT;
1816                // We can steal all tasks till the end of the block.
1817                advance = (BLOCK_CAP - offset).min(limit);
1818            } else {
1819                let len = (tail - head) >> SHIFT;
1820                // Steal half of the available tasks.
1821                advance = ((len + 1) / 2).min(limit);
1822            }
1823        } else {
1824            // We can steal all tasks till the end of the block.
1825            advance = (BLOCK_CAP - offset).min(limit);
1826        }
1827
1828        new_head += advance << SHIFT;
1829        let new_offset = offset + advance;
1830
1831        // Try moving the head index forward.
1832        if self
1833            .head
1834            .index
1835            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1836            .is_err()
1837        {
1838            return Steal::Retry;
1839        }
1840
1841        // Reserve capacity for the stolen batch.
1842        let batch_size = new_offset - offset - 1;
1843        dest.reserve(batch_size);
1844
1845        // Get the destination buffer and back index.
1846        let dest_buffer = dest.buffer.get();
1847        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1848
1849        unsafe {
1850            // If we've reached the end of the block, move to the next one.
1851            if new_offset == BLOCK_CAP {
1852                let next = (*block).wait_next();
1853                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1854                if !(*next).next.load(Ordering::Relaxed).is_null() {
1855                    next_index |= HAS_NEXT;
1856                }
1857
1858                self.head.block.store(next, Ordering::Release);
1859                self.head.index.store(next_index, Ordering::Release);
1860            }
1861
1862            // Read the task.
1863            let slot = (*block).slots.get_unchecked(offset);
1864            slot.wait_write();
1865            let task = slot.task.get().read();
1866
1867            match dest.flavor {
1868                Flavor::Fifo => {
1869                    // Copy values from the injector into the destination queue.
1870                    for i in 0..batch_size {
1871                        // Read the task.
1872                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1873                        slot.wait_write();
1874                        let task = slot.task.get().read();
1875
1876                        // Write it into the destination queue.
1877                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1878                    }
1879                }
1880
1881                Flavor::Lifo => {
1882                    // Copy values from the injector into the destination queue.
1883                    for i in 0..batch_size {
1884                        // Read the task.
1885                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1886                        slot.wait_write();
1887                        let task = slot.task.get().read();
1888
1889                        // Write it into the destination queue.
1890                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1891                    }
1892                }
1893            }
1894
1895            atomic::fence(Ordering::Release);
1896
1897            // Update the back index in the destination queue.
1898            //
1899            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1900            // data races because it doesn't understand fences.
1901            dest.inner
1902                .back
1903                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1904
1905            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1906            // but couldn't because we were busy reading from the slot.
1907            if new_offset == BLOCK_CAP {
1908                Block::destroy(block, offset);
1909            } else {
1910                for i in offset..new_offset {
1911                    let slot = (*block).slots.get_unchecked(i);
1912
1913                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1914                        Block::destroy(block, offset);
1915                        break;
1916                    }
1917                }
1918            }
1919
1920            Steal::Success(task.assume_init())
1921        }
1922    }
1923
1924    /// Returns `true` if the queue is empty.
1925    ///
1926    /// # Examples
1927    ///
1928    /// ```
1929    /// use crossbeam_deque::Injector;
1930    ///
1931    /// let q = Injector::new();
1932    ///
1933    /// assert!(q.is_empty());
1934    /// q.push(1);
1935    /// assert!(!q.is_empty());
1936    /// ```
1937    pub fn is_empty(&self) -> bool {
1938        let head = self.head.index.load(Ordering::SeqCst);
1939        let tail = self.tail.index.load(Ordering::SeqCst);
1940        head >> SHIFT == tail >> SHIFT
1941    }
1942
1943    /// Returns the number of tasks in the queue.
1944    ///
1945    /// # Examples
1946    ///
1947    /// ```
1948    /// use crossbeam_deque::Injector;
1949    ///
1950    /// let q = Injector::new();
1951    ///
1952    /// assert_eq!(q.len(), 0);
1953    /// q.push(1);
1954    /// assert_eq!(q.len(), 1);
1955    /// q.push(1);
1956    /// assert_eq!(q.len(), 2);
1957    /// ```
1958    pub fn len(&self) -> usize {
1959        loop {
1960            // Load the tail index, then load the head index.
1961            let mut tail = self.tail.index.load(Ordering::SeqCst);
1962            let mut head = self.head.index.load(Ordering::SeqCst);
1963
1964            // If the tail index didn't change, we've got consistent indices to work with.
1965            if self.tail.index.load(Ordering::SeqCst) == tail {
1966                // Erase the lower bits.
1967                tail &= !((1 << SHIFT) - 1);
1968                head &= !((1 << SHIFT) - 1);
1969
1970                // Fix up indices if they fall onto block ends.
1971                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1972                    tail = tail.wrapping_add(1 << SHIFT);
1973                }
1974                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1975                    head = head.wrapping_add(1 << SHIFT);
1976                }
1977
1978                // Rotate indices so that head falls into the first block.
1979                let lap = (head >> SHIFT) / LAP;
1980                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1981                head = head.wrapping_sub((lap * LAP) << SHIFT);
1982
1983                // Remove the lower bits.
1984                tail >>= SHIFT;
1985                head >>= SHIFT;
1986
1987                // Return the difference minus the number of blocks between tail and head.
1988                return tail - head - tail / LAP;
1989            }
1990        }
1991    }
1992}
1993
1994impl<T> Drop for Injector<T> {
1995    fn drop(&mut self) {
1996        let mut head = *self.head.index.get_mut();
1997        let mut tail = *self.tail.index.get_mut();
1998        let mut block = *self.head.block.get_mut();
1999
2000        // Erase the lower bits.
2001        head &= !((1 << SHIFT) - 1);
2002        tail &= !((1 << SHIFT) - 1);
2003
2004        unsafe {
2005            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2006            while head != tail {
2007                let offset = (head >> SHIFT) % LAP;
2008
2009                if offset < BLOCK_CAP {
2010                    // Drop the task in the slot.
2011                    let slot = (*block).slots.get_unchecked(offset);
2012                    (*slot.task.get()).assume_init_drop();
2013                } else {
2014                    // Deallocate the block and move to the next one.
2015                    let next = *(*block).next.get_mut();
2016                    drop(Box::from_raw(block));
2017                    block = next;
2018                }
2019
2020                head = head.wrapping_add(1 << SHIFT);
2021            }
2022
2023            // Deallocate the last remaining block.
2024            drop(Box::from_raw(block));
2025        }
2026    }
2027}
2028
2029impl<T> fmt::Debug for Injector<T> {
2030    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2031        f.pad("Worker { .. }")
2032    }
2033}
2034
2035/// Possible outcomes of a steal operation.
2036///
2037/// # Examples
2038///
2039/// There are lots of ways to chain results of steal operations together:
2040///
2041/// ```
2042/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2043///
2044/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2045///
2046/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2047/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2048/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2049///
2050/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2051/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2052/// ```
2053#[must_use]
2054#[derive(PartialEq, Eq, Copy, Clone)]
2055pub enum Steal<T> {
2056    /// The queue was empty at the time of stealing.
2057    Empty,
2058
2059    /// At least one task was successfully stolen.
2060    Success(T),
2061
2062    /// The steal operation needs to be retried.
2063    Retry,
2064}
2065
2066impl<T> Steal<T> {
2067    /// Returns `true` if the queue was empty at the time of stealing.
2068    ///
2069    /// # Examples
2070    ///
2071    /// ```
2072    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2073    ///
2074    /// assert!(!Success(7).is_empty());
2075    /// assert!(!Retry::<i32>.is_empty());
2076    ///
2077    /// assert!(Empty::<i32>.is_empty());
2078    /// ```
2079    pub fn is_empty(&self) -> bool {
2080        match self {
2081            Steal::Empty => true,
2082            _ => false,
2083        }
2084    }
2085
2086    /// Returns `true` if at least one task was stolen.
2087    ///
2088    /// # Examples
2089    ///
2090    /// ```
2091    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2092    ///
2093    /// assert!(!Empty::<i32>.is_success());
2094    /// assert!(!Retry::<i32>.is_success());
2095    ///
2096    /// assert!(Success(7).is_success());
2097    /// ```
2098    pub fn is_success(&self) -> bool {
2099        match self {
2100            Steal::Success(_) => true,
2101            _ => false,
2102        }
2103    }
2104
2105    /// Returns `true` if the steal operation needs to be retried.
2106    ///
2107    /// # Examples
2108    ///
2109    /// ```
2110    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2111    ///
2112    /// assert!(!Empty::<i32>.is_retry());
2113    /// assert!(!Success(7).is_retry());
2114    ///
2115    /// assert!(Retry::<i32>.is_retry());
2116    /// ```
2117    pub fn is_retry(&self) -> bool {
2118        match self {
2119            Steal::Retry => true,
2120            _ => false,
2121        }
2122    }
2123
2124    /// Returns the result of the operation, if successful.
2125    ///
2126    /// # Examples
2127    ///
2128    /// ```
2129    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2130    ///
2131    /// assert_eq!(Empty::<i32>.success(), None);
2132    /// assert_eq!(Retry::<i32>.success(), None);
2133    ///
2134    /// assert_eq!(Success(7).success(), Some(7));
2135    /// ```
2136    pub fn success(self) -> Option<T> {
2137        match self {
2138            Steal::Success(res) => Some(res),
2139            _ => None,
2140        }
2141    }
2142
2143    /// If no task was stolen, attempts another steal operation.
2144    ///
2145    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2146    ///
2147    /// * If the second steal resulted in `Success`, it is returned.
2148    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2149    /// * If both resulted in `None`, then `None` is returned.
2150    ///
2151    /// # Examples
2152    ///
2153    /// ```
2154    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2155    ///
2156    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2157    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2158    ///
2159    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2160    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2161    ///
2162    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2163    /// ```
2164    pub fn or_else<F>(self, f: F) -> Steal<T>
2165    where
2166        F: FnOnce() -> Steal<T>,
2167    {
2168        match self {
2169            Steal::Empty => f(),
2170            Steal::Success(_) => self,
2171            Steal::Retry => {
2172                if let Steal::Success(res) = f() {
2173                    Steal::Success(res)
2174                } else {
2175                    Steal::Retry
2176                }
2177            }
2178        }
2179    }
2180}
2181
2182impl<T> fmt::Debug for Steal<T> {
2183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184        match self {
2185            Steal::Empty => f.pad("Empty"),
2186            Steal::Success(_) => f.pad("Success(..)"),
2187            Steal::Retry => f.pad("Retry"),
2188        }
2189    }
2190}
2191
2192impl<T> FromIterator<Steal<T>> for Steal<T> {
2193    /// Consumes items until a `Success` is found and returns it.
2194    ///
2195    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2196    /// Otherwise, `Empty` is returned.
2197    fn from_iter<I>(iter: I) -> Steal<T>
2198    where
2199        I: IntoIterator<Item = Steal<T>>,
2200    {
2201        let mut retry = false;
2202        for s in iter {
2203            match &s {
2204                Steal::Empty => {}
2205                Steal::Success(_) => return s,
2206                Steal::Retry => retry = true,
2207            }
2208        }
2209
2210        if retry {
2211            Steal::Retry
2212        } else {
2213            Steal::Empty
2214        }
2215    }
2216}