crossbeam_deque/deque.rs
1use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2use std::boxed::Box;
3use std::cell::{Cell, UnsafeCell};
4use std::cmp;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
10use std::sync::Arc;
11
12use crossbeam_epoch::{self as epoch, Atomic, Owned};
13use crossbeam_utils::{Backoff, CachePadded};
14
15// Minimum buffer capacity.
16const MIN_CAP: usize = 64;
17// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
18const MAX_BATCH: usize = 32;
19// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
20// deallocated as soon as possible.
21const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
22
23/// A buffer that holds tasks in a worker queue.
24///
25/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
26/// *not* deallocate the buffer.
27struct Buffer<T> {
28 /// Pointer to the allocated memory.
29 ptr: *mut T,
30
31 /// Capacity of the buffer. Always a power of two.
32 cap: usize,
33}
34
35unsafe impl<T> Send for Buffer<T> {}
36
37impl<T> Buffer<T> {
38 /// Allocates a new buffer with the specified capacity.
39 fn alloc(cap: usize) -> Buffer<T> {
40 debug_assert_eq!(cap, cap.next_power_of_two());
41
42 let ptr = Box::into_raw(
43 (0..cap)
44 .map(|_| MaybeUninit::<T>::uninit())
45 .collect::<Box<[_]>>(),
46 )
47 .cast::<T>();
48
49 Buffer { ptr, cap }
50 }
51
52 /// Deallocates the buffer.
53 unsafe fn dealloc(self) {
54 drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
55 self.ptr.cast::<MaybeUninit<T>>(),
56 self.cap,
57 )));
58 }
59
60 /// Returns a pointer to the task at the specified `index`.
61 unsafe fn at(&self, index: isize) -> *mut T {
62 // `self.cap` is always a power of two.
63 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
64 // don't actually have the right to access this memory.
65 self.ptr.offset(index & (self.cap - 1) as isize)
66 }
67
68 /// Writes `task` into the specified `index`.
69 ///
70 /// This method might be concurrently called with another `read` at the same index, which is
71 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
72 /// that would be more expensive and difficult to implement generically for all types `T`.
73 /// Hence, as a hack, we use a volatile write instead.
74 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
75 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
76 }
77
78 /// Reads a task from the specified `index`.
79 ///
80 /// This method might be concurrently called with another `write` at the same index, which is
81 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
82 /// that would be more expensive and difficult to implement generically for all types `T`.
83 /// Hence, as a hack, we use a volatile load instead.
84 unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
85 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
86 }
87}
88
89impl<T> Clone for Buffer<T> {
90 fn clone(&self) -> Buffer<T> {
91 *self
92 }
93}
94
95impl<T> Copy for Buffer<T> {}
96
97/// Internal queue data shared between the worker and stealers.
98///
99/// The implementation is based on the following work:
100///
101/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
102/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
103/// PPoPP 2013.][weak-mem]
104/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
105/// atomics. OOPSLA 2013.][checker]
106///
107/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
108/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
109/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
110struct Inner<T> {
111 /// The front index.
112 front: AtomicIsize,
113
114 /// The back index.
115 back: AtomicIsize,
116
117 /// The underlying buffer.
118 buffer: CachePadded<Atomic<Buffer<T>>>,
119}
120
121impl<T> Drop for Inner<T> {
122 fn drop(&mut self) {
123 // Load the back index, front index, and buffer.
124 let b = *self.back.get_mut();
125 let f = *self.front.get_mut();
126
127 unsafe {
128 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
129
130 // Go through the buffer from front to back and drop all tasks in the queue.
131 let mut i = f;
132 while i != b {
133 buffer.deref().at(i).drop_in_place();
134 i = i.wrapping_add(1);
135 }
136
137 // Free the memory allocated by the buffer.
138 buffer.into_owned().into_box().dealloc();
139 }
140 }
141}
142
143/// Worker queue flavor: FIFO or LIFO.
144#[derive(Clone, Copy, Debug, Eq, PartialEq)]
145enum Flavor {
146 /// The first-in first-out flavor.
147 Fifo,
148
149 /// The last-in first-out flavor.
150 Lifo,
151}
152
153/// A worker queue.
154///
155/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
156/// tasks from it. Task schedulers typically create a single worker queue per thread.
157///
158/// # Examples
159///
160/// A FIFO worker:
161///
162/// ```
163/// use crossbeam_deque::{Steal, Worker};
164///
165/// let w = Worker::new_fifo();
166/// let s = w.stealer();
167///
168/// w.push(1);
169/// w.push(2);
170/// w.push(3);
171///
172/// assert_eq!(s.steal(), Steal::Success(1));
173/// assert_eq!(w.pop(), Some(2));
174/// assert_eq!(w.pop(), Some(3));
175/// ```
176///
177/// A LIFO worker:
178///
179/// ```
180/// use crossbeam_deque::{Steal, Worker};
181///
182/// let w = Worker::new_lifo();
183/// let s = w.stealer();
184///
185/// w.push(1);
186/// w.push(2);
187/// w.push(3);
188///
189/// assert_eq!(s.steal(), Steal::Success(1));
190/// assert_eq!(w.pop(), Some(3));
191/// assert_eq!(w.pop(), Some(2));
192/// ```
193pub struct Worker<T> {
194 /// A reference to the inner representation of the queue.
195 inner: Arc<CachePadded<Inner<T>>>,
196
197 /// A copy of `inner.buffer` for quick access.
198 buffer: Cell<Buffer<T>>,
199
200 /// The flavor of the queue.
201 flavor: Flavor,
202
203 /// Indicates that the worker cannot be shared among threads.
204 _marker: PhantomData<*mut ()>, // !Send + !Sync
205}
206
207unsafe impl<T: Send> Send for Worker<T> {}
208
209impl<T> Worker<T> {
210 /// Creates a FIFO worker queue.
211 ///
212 /// Tasks are pushed and popped from opposite ends.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use crossbeam_deque::Worker;
218 ///
219 /// let w = Worker::<i32>::new_fifo();
220 /// ```
221 pub fn new_fifo() -> Worker<T> {
222 let buffer = Buffer::alloc(MIN_CAP);
223
224 let inner = Arc::new(CachePadded::new(Inner {
225 front: AtomicIsize::new(0),
226 back: AtomicIsize::new(0),
227 buffer: CachePadded::new(Atomic::new(buffer)),
228 }));
229
230 Worker {
231 inner,
232 buffer: Cell::new(buffer),
233 flavor: Flavor::Fifo,
234 _marker: PhantomData,
235 }
236 }
237
238 /// Creates a LIFO worker queue.
239 ///
240 /// Tasks are pushed and popped from the same end.
241 ///
242 /// # Examples
243 ///
244 /// ```
245 /// use crossbeam_deque::Worker;
246 ///
247 /// let w = Worker::<i32>::new_lifo();
248 /// ```
249 pub fn new_lifo() -> Worker<T> {
250 let buffer = Buffer::alloc(MIN_CAP);
251
252 let inner = Arc::new(CachePadded::new(Inner {
253 front: AtomicIsize::new(0),
254 back: AtomicIsize::new(0),
255 buffer: CachePadded::new(Atomic::new(buffer)),
256 }));
257
258 Worker {
259 inner,
260 buffer: Cell::new(buffer),
261 flavor: Flavor::Lifo,
262 _marker: PhantomData,
263 }
264 }
265
266 /// Creates a stealer for this queue.
267 ///
268 /// The returned stealer can be shared among threads and cloned.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// use crossbeam_deque::Worker;
274 ///
275 /// let w = Worker::<i32>::new_lifo();
276 /// let s = w.stealer();
277 /// ```
278 pub fn stealer(&self) -> Stealer<T> {
279 Stealer {
280 inner: self.inner.clone(),
281 flavor: self.flavor,
282 }
283 }
284
285 /// Resizes the internal buffer to the new capacity of `new_cap`.
286 #[cold]
287 unsafe fn resize(&self, new_cap: usize) {
288 // Load the back index, front index, and buffer.
289 let b = self.inner.back.load(Ordering::Relaxed);
290 let f = self.inner.front.load(Ordering::Relaxed);
291 let buffer = self.buffer.get();
292
293 // Allocate a new buffer and copy data from the old buffer to the new one.
294 let new = Buffer::alloc(new_cap);
295 let mut i = f;
296 while i != b {
297 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
298 i = i.wrapping_add(1);
299 }
300
301 let guard = &epoch::pin();
302
303 // Replace the old buffer with the new one.
304 self.buffer.replace(new);
305 let old =
306 self.inner
307 .buffer
308 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
309
310 // Destroy the old buffer later.
311 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
312
313 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
314 // it as soon as possible.
315 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
316 guard.flush();
317 }
318 }
319
320 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
321 /// buffer.
322 fn reserve(&self, reserve_cap: usize) {
323 if reserve_cap > 0 {
324 // Compute the current length.
325 let b = self.inner.back.load(Ordering::Relaxed);
326 let f = self.inner.front.load(Ordering::SeqCst);
327 let len = b.wrapping_sub(f) as usize;
328
329 // The current capacity.
330 let cap = self.buffer.get().cap;
331
332 // Is there enough capacity to push `reserve_cap` tasks?
333 if cap - len < reserve_cap {
334 // Keep doubling the capacity as much as is needed.
335 let mut new_cap = cap * 2;
336 while new_cap - len < reserve_cap {
337 new_cap *= 2;
338 }
339
340 // Resize the buffer.
341 unsafe {
342 self.resize(new_cap);
343 }
344 }
345 }
346 }
347
348 /// Returns `true` if the queue is empty.
349 ///
350 /// ```
351 /// use crossbeam_deque::Worker;
352 ///
353 /// let w = Worker::new_lifo();
354 ///
355 /// assert!(w.is_empty());
356 /// w.push(1);
357 /// assert!(!w.is_empty());
358 /// ```
359 pub fn is_empty(&self) -> bool {
360 let b = self.inner.back.load(Ordering::Relaxed);
361 let f = self.inner.front.load(Ordering::SeqCst);
362 b.wrapping_sub(f) <= 0
363 }
364
365 /// Returns the number of tasks in the deque.
366 ///
367 /// ```
368 /// use crossbeam_deque::Worker;
369 ///
370 /// let w = Worker::new_lifo();
371 ///
372 /// assert_eq!(w.len(), 0);
373 /// w.push(1);
374 /// assert_eq!(w.len(), 1);
375 /// w.push(1);
376 /// assert_eq!(w.len(), 2);
377 /// ```
378 pub fn len(&self) -> usize {
379 let b = self.inner.back.load(Ordering::Relaxed);
380 let f = self.inner.front.load(Ordering::SeqCst);
381 b.wrapping_sub(f).max(0) as usize
382 }
383
384 /// Pushes a task into the queue.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// use crossbeam_deque::Worker;
390 ///
391 /// let w = Worker::new_lifo();
392 /// w.push(1);
393 /// w.push(2);
394 /// ```
395 pub fn push(&self, task: T) {
396 // Load the back index, front index, and buffer.
397 let b = self.inner.back.load(Ordering::Relaxed);
398 let f = self.inner.front.load(Ordering::Acquire);
399 let mut buffer = self.buffer.get();
400
401 // Calculate the length of the queue.
402 let len = b.wrapping_sub(f);
403
404 // Is the queue full?
405 if len >= buffer.cap as isize {
406 // Yes. Grow the underlying buffer.
407 unsafe {
408 self.resize(2 * buffer.cap);
409 }
410 buffer = self.buffer.get();
411 }
412
413 // Write `task` into the slot.
414 unsafe {
415 buffer.write(b, MaybeUninit::new(task));
416 }
417
418 atomic::fence(Ordering::Release);
419
420 // Increment the back index.
421 //
422 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
423 // races because it doesn't understand fences.
424 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
425 }
426
427 /// Pops a task from the queue.
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// use crossbeam_deque::Worker;
433 ///
434 /// let w = Worker::new_fifo();
435 /// w.push(1);
436 /// w.push(2);
437 ///
438 /// assert_eq!(w.pop(), Some(1));
439 /// assert_eq!(w.pop(), Some(2));
440 /// assert_eq!(w.pop(), None);
441 /// ```
442 pub fn pop(&self) -> Option<T> {
443 // Load the back and front index.
444 let b = self.inner.back.load(Ordering::Relaxed);
445 let f = self.inner.front.load(Ordering::Relaxed);
446
447 // Calculate the length of the queue.
448 let len = b.wrapping_sub(f);
449
450 // Is the queue empty?
451 if len <= 0 {
452 return None;
453 }
454
455 match self.flavor {
456 // Pop from the front of the queue.
457 Flavor::Fifo => {
458 // Try incrementing the front index to pop the task.
459 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
460 let new_f = f.wrapping_add(1);
461
462 if b.wrapping_sub(new_f) < 0 {
463 self.inner.front.store(f, Ordering::Relaxed);
464 return None;
465 }
466
467 unsafe {
468 // Read the popped task.
469 let buffer = self.buffer.get();
470 let task = buffer.read(f).assume_init();
471
472 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
473 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
474 self.resize(buffer.cap / 2);
475 }
476
477 Some(task)
478 }
479 }
480
481 // Pop from the back of the queue.
482 Flavor::Lifo => {
483 // Decrement the back index.
484 let b = b.wrapping_sub(1);
485 self.inner.back.store(b, Ordering::Relaxed);
486
487 atomic::fence(Ordering::SeqCst);
488
489 // Load the front index.
490 let f = self.inner.front.load(Ordering::Relaxed);
491
492 // Compute the length after the back index was decremented.
493 let len = b.wrapping_sub(f);
494
495 if len < 0 {
496 // The queue is empty. Restore the back index to the original task.
497 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
498 None
499 } else {
500 // Read the task to be popped.
501 let buffer = self.buffer.get();
502 let mut task = unsafe { Some(buffer.read(b)) };
503
504 // Are we popping the last task from the queue?
505 if len == 0 {
506 // Try incrementing the front index.
507 if self
508 .inner
509 .front
510 .compare_exchange(
511 f,
512 f.wrapping_add(1),
513 Ordering::SeqCst,
514 Ordering::Relaxed,
515 )
516 .is_err()
517 {
518 // Failed. We didn't pop anything. Reset to `None`.
519 task.take();
520 }
521
522 // Restore the back index to the original task.
523 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
524 } else {
525 // Shrink the buffer if `len` is less than one fourth of the capacity.
526 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
527 unsafe {
528 self.resize(buffer.cap / 2);
529 }
530 }
531 }
532
533 task.map(|t| unsafe { t.assume_init() })
534 }
535 }
536 }
537 }
538}
539
540impl<T> fmt::Debug for Worker<T> {
541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542 f.pad("Worker { .. }")
543 }
544}
545
546/// A stealer handle of a worker queue.
547///
548/// Stealers can be shared among threads.
549///
550/// Task schedulers typically have a single worker queue per worker thread.
551///
552/// # Examples
553///
554/// ```
555/// use crossbeam_deque::{Steal, Worker};
556///
557/// let w = Worker::new_lifo();
558/// w.push(1);
559/// w.push(2);
560///
561/// let s = w.stealer();
562/// assert_eq!(s.steal(), Steal::Success(1));
563/// assert_eq!(s.steal(), Steal::Success(2));
564/// assert_eq!(s.steal(), Steal::Empty);
565/// ```
566pub struct Stealer<T> {
567 /// A reference to the inner representation of the queue.
568 inner: Arc<CachePadded<Inner<T>>>,
569
570 /// The flavor of the queue.
571 flavor: Flavor,
572}
573
574unsafe impl<T: Send> Send for Stealer<T> {}
575unsafe impl<T: Send> Sync for Stealer<T> {}
576
577impl<T> Stealer<T> {
578 /// Returns `true` if the queue is empty.
579 ///
580 /// ```
581 /// use crossbeam_deque::Worker;
582 ///
583 /// let w = Worker::new_lifo();
584 /// let s = w.stealer();
585 ///
586 /// assert!(s.is_empty());
587 /// w.push(1);
588 /// assert!(!s.is_empty());
589 /// ```
590 pub fn is_empty(&self) -> bool {
591 let f = self.inner.front.load(Ordering::Acquire);
592 atomic::fence(Ordering::SeqCst);
593 let b = self.inner.back.load(Ordering::Acquire);
594 b.wrapping_sub(f) <= 0
595 }
596
597 /// Returns the number of tasks in the deque.
598 ///
599 /// ```
600 /// use crossbeam_deque::Worker;
601 ///
602 /// let w = Worker::new_lifo();
603 /// let s = w.stealer();
604 ///
605 /// assert_eq!(s.len(), 0);
606 /// w.push(1);
607 /// assert_eq!(s.len(), 1);
608 /// w.push(2);
609 /// assert_eq!(s.len(), 2);
610 /// ```
611 pub fn len(&self) -> usize {
612 let f = self.inner.front.load(Ordering::Acquire);
613 atomic::fence(Ordering::SeqCst);
614 let b = self.inner.back.load(Ordering::Acquire);
615 b.wrapping_sub(f).max(0) as usize
616 }
617
618 /// Steals a task from the queue.
619 ///
620 /// # Examples
621 ///
622 /// ```
623 /// use crossbeam_deque::{Steal, Worker};
624 ///
625 /// let w = Worker::new_lifo();
626 /// w.push(1);
627 /// w.push(2);
628 ///
629 /// let s = w.stealer();
630 /// assert_eq!(s.steal(), Steal::Success(1));
631 /// assert_eq!(s.steal(), Steal::Success(2));
632 /// ```
633 pub fn steal(&self) -> Steal<T> {
634 // Load the front index.
635 let f = self.inner.front.load(Ordering::Acquire);
636
637 // A SeqCst fence is needed here.
638 //
639 // If the current thread is already pinned (reentrantly), we must manually issue the
640 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
641 // have to.
642 if epoch::is_pinned() {
643 atomic::fence(Ordering::SeqCst);
644 }
645
646 let guard = &epoch::pin();
647
648 // Load the back index.
649 let b = self.inner.back.load(Ordering::Acquire);
650
651 // Is the queue empty?
652 if b.wrapping_sub(f) <= 0 {
653 return Steal::Empty;
654 }
655
656 // Load the buffer and read the task at the front.
657 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
658 let task = unsafe { buffer.deref().read(f) };
659
660 // Try incrementing the front index to steal the task.
661 // If the buffer has been swapped or the increment fails, we retry.
662 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
663 || self
664 .inner
665 .front
666 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
667 .is_err()
668 {
669 // We didn't steal this task, forget it.
670 return Steal::Retry;
671 }
672
673 // Return the stolen task.
674 Steal::Success(unsafe { task.assume_init() })
675 }
676
677 /// Steals a batch of tasks and pushes them into another worker.
678 ///
679 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
680 /// steal around half of the tasks in the queue, but also not more than some constant limit.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// use crossbeam_deque::Worker;
686 ///
687 /// let w1 = Worker::new_fifo();
688 /// w1.push(1);
689 /// w1.push(2);
690 /// w1.push(3);
691 /// w1.push(4);
692 ///
693 /// let s = w1.stealer();
694 /// let w2 = Worker::new_fifo();
695 ///
696 /// let _ = s.steal_batch(&w2);
697 /// assert_eq!(w2.pop(), Some(1));
698 /// assert_eq!(w2.pop(), Some(2));
699 /// ```
700 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
701 self.steal_batch_with_limit(dest, MAX_BATCH)
702 }
703
704 /// Steals no more than `limit` of tasks and pushes them into another worker.
705 ///
706 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
707 /// steal around half of the tasks in the queue, but also not more than the given limit.
708 ///
709 /// # Examples
710 ///
711 /// ```
712 /// use crossbeam_deque::Worker;
713 ///
714 /// let w1 = Worker::new_fifo();
715 /// w1.push(1);
716 /// w1.push(2);
717 /// w1.push(3);
718 /// w1.push(4);
719 /// w1.push(5);
720 /// w1.push(6);
721 ///
722 /// let s = w1.stealer();
723 /// let w2 = Worker::new_fifo();
724 ///
725 /// let _ = s.steal_batch_with_limit(&w2, 2);
726 /// assert_eq!(w2.pop(), Some(1));
727 /// assert_eq!(w2.pop(), Some(2));
728 /// assert_eq!(w2.pop(), None);
729 ///
730 /// w1.push(7);
731 /// w1.push(8);
732 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
733 /// // half of the elements are currently popped, but the number of popped elements is considered
734 /// // an implementation detail that may be changed in the future.
735 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
736 /// assert_eq!(w2.len(), 3);
737 /// ```
738 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
739 assert!(limit > 0);
740 if Arc::ptr_eq(&self.inner, &dest.inner) {
741 if dest.is_empty() {
742 return Steal::Empty;
743 } else {
744 return Steal::Success(());
745 }
746 }
747
748 // Load the front index.
749 let mut f = self.inner.front.load(Ordering::Acquire);
750
751 // A SeqCst fence is needed here.
752 //
753 // If the current thread is already pinned (reentrantly), we must manually issue the
754 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
755 // have to.
756 if epoch::is_pinned() {
757 atomic::fence(Ordering::SeqCst);
758 }
759
760 let guard = &epoch::pin();
761
762 // Load the back index.
763 let b = self.inner.back.load(Ordering::Acquire);
764
765 // Is the queue empty?
766 let len = b.wrapping_sub(f);
767 if len <= 0 {
768 return Steal::Empty;
769 }
770
771 // Reserve capacity for the stolen batch.
772 let batch_size = cmp::min((len as usize + 1) / 2, limit);
773 dest.reserve(batch_size);
774 let mut batch_size = batch_size as isize;
775
776 // Get the destination buffer and back index.
777 let dest_buffer = dest.buffer.get();
778 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
779
780 // Load the buffer.
781 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
782
783 match self.flavor {
784 // Steal a batch of tasks from the front at once.
785 Flavor::Fifo => {
786 // Copy the batch from the source to the destination buffer.
787 match dest.flavor {
788 Flavor::Fifo => {
789 for i in 0..batch_size {
790 unsafe {
791 let task = buffer.deref().read(f.wrapping_add(i));
792 dest_buffer.write(dest_b.wrapping_add(i), task);
793 }
794 }
795 }
796 Flavor::Lifo => {
797 for i in 0..batch_size {
798 unsafe {
799 let task = buffer.deref().read(f.wrapping_add(i));
800 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
801 }
802 }
803 }
804 }
805
806 // Try incrementing the front index to steal the batch.
807 // If the buffer has been swapped or the increment fails, we retry.
808 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
809 || self
810 .inner
811 .front
812 .compare_exchange(
813 f,
814 f.wrapping_add(batch_size),
815 Ordering::SeqCst,
816 Ordering::Relaxed,
817 )
818 .is_err()
819 {
820 return Steal::Retry;
821 }
822
823 dest_b = dest_b.wrapping_add(batch_size);
824 }
825
826 // Steal a batch of tasks from the front one by one.
827 Flavor::Lifo => {
828 // This loop may modify the batch_size, which triggers a clippy lint warning.
829 // Use a new variable to avoid the warning, and to make it clear we aren't
830 // modifying the loop exit condition during iteration.
831 let original_batch_size = batch_size;
832
833 for i in 0..original_batch_size {
834 // If this is not the first steal, check whether the queue is empty.
835 if i > 0 {
836 // We've already got the current front index. Now execute the fence to
837 // synchronize with other threads.
838 atomic::fence(Ordering::SeqCst);
839
840 // Load the back index.
841 let b = self.inner.back.load(Ordering::Acquire);
842
843 // Is the queue empty?
844 if b.wrapping_sub(f) <= 0 {
845 batch_size = i;
846 break;
847 }
848 }
849
850 // Read the task at the front.
851 let task = unsafe { buffer.deref().read(f) };
852
853 // Try incrementing the front index to steal the task.
854 // If the buffer has been swapped or the increment fails, we retry.
855 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
856 || self
857 .inner
858 .front
859 .compare_exchange(
860 f,
861 f.wrapping_add(1),
862 Ordering::SeqCst,
863 Ordering::Relaxed,
864 )
865 .is_err()
866 {
867 // We didn't steal this task, forget it and break from the loop.
868 batch_size = i;
869 break;
870 }
871
872 // Write the stolen task into the destination buffer.
873 unsafe {
874 dest_buffer.write(dest_b, task);
875 }
876
877 // Move the source front index and the destination back index one step forward.
878 f = f.wrapping_add(1);
879 dest_b = dest_b.wrapping_add(1);
880 }
881
882 // If we didn't steal anything, the operation needs to be retried.
883 if batch_size == 0 {
884 return Steal::Retry;
885 }
886
887 // If stealing into a FIFO queue, stolen tasks need to be reversed.
888 if dest.flavor == Flavor::Fifo {
889 for i in 0..batch_size / 2 {
890 unsafe {
891 let i1 = dest_b.wrapping_sub(batch_size - i);
892 let i2 = dest_b.wrapping_sub(i + 1);
893 let t1 = dest_buffer.read(i1);
894 let t2 = dest_buffer.read(i2);
895 dest_buffer.write(i1, t2);
896 dest_buffer.write(i2, t1);
897 }
898 }
899 }
900 }
901 }
902
903 atomic::fence(Ordering::Release);
904
905 // Update the back index in the destination queue.
906 //
907 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
908 // races because it doesn't understand fences.
909 dest.inner.back.store(dest_b, Ordering::Release);
910
911 // Return with success.
912 Steal::Success(())
913 }
914
915 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
916 ///
917 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
918 /// steal around half of the tasks in the queue, but also not more than some constant limit.
919 ///
920 /// # Examples
921 ///
922 /// ```
923 /// use crossbeam_deque::{Steal, Worker};
924 ///
925 /// let w1 = Worker::new_fifo();
926 /// w1.push(1);
927 /// w1.push(2);
928 /// w1.push(3);
929 /// w1.push(4);
930 ///
931 /// let s = w1.stealer();
932 /// let w2 = Worker::new_fifo();
933 ///
934 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
935 /// assert_eq!(w2.pop(), Some(2));
936 /// ```
937 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
938 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
939 }
940
941 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
942 /// that worker.
943 ///
944 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
945 /// steal around half of the tasks in the queue, but also not more than the given limit.
946 ///
947 /// # Examples
948 ///
949 /// ```
950 /// use crossbeam_deque::{Steal, Worker};
951 ///
952 /// let w1 = Worker::new_fifo();
953 /// w1.push(1);
954 /// w1.push(2);
955 /// w1.push(3);
956 /// w1.push(4);
957 /// w1.push(5);
958 /// w1.push(6);
959 ///
960 /// let s = w1.stealer();
961 /// let w2 = Worker::new_fifo();
962 ///
963 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
964 /// assert_eq!(w2.pop(), Some(2));
965 /// assert_eq!(w2.pop(), None);
966 ///
967 /// w1.push(7);
968 /// w1.push(8);
969 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
970 /// // half of the elements are currently popped, but the number of popped elements is considered
971 /// // an implementation detail that may be changed in the future.
972 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
973 /// assert_eq!(w2.pop(), Some(4));
974 /// assert_eq!(w2.pop(), Some(5));
975 /// assert_eq!(w2.pop(), None);
976 /// ```
977 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
978 assert!(limit > 0);
979 if Arc::ptr_eq(&self.inner, &dest.inner) {
980 match dest.pop() {
981 None => return Steal::Empty,
982 Some(task) => return Steal::Success(task),
983 }
984 }
985
986 // Load the front index.
987 let mut f = self.inner.front.load(Ordering::Acquire);
988
989 // A SeqCst fence is needed here.
990 //
991 // If the current thread is already pinned (reentrantly), we must manually issue the
992 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
993 // have to.
994 if epoch::is_pinned() {
995 atomic::fence(Ordering::SeqCst);
996 }
997
998 let guard = &epoch::pin();
999
1000 // Load the back index.
1001 let b = self.inner.back.load(Ordering::Acquire);
1002
1003 // Is the queue empty?
1004 let len = b.wrapping_sub(f);
1005 if len <= 0 {
1006 return Steal::Empty;
1007 }
1008
1009 // Reserve capacity for the stolen batch.
1010 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1011 dest.reserve(batch_size);
1012 let mut batch_size = batch_size as isize;
1013
1014 // Get the destination buffer and back index.
1015 let dest_buffer = dest.buffer.get();
1016 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1017
1018 // Load the buffer
1019 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1020
1021 // Read the task at the front.
1022 let mut task = unsafe { buffer.deref().read(f) };
1023
1024 match self.flavor {
1025 // Steal a batch of tasks from the front at once.
1026 Flavor::Fifo => {
1027 // Copy the batch from the source to the destination buffer.
1028 match dest.flavor {
1029 Flavor::Fifo => {
1030 for i in 0..batch_size {
1031 unsafe {
1032 let task = buffer.deref().read(f.wrapping_add(i + 1));
1033 dest_buffer.write(dest_b.wrapping_add(i), task);
1034 }
1035 }
1036 }
1037 Flavor::Lifo => {
1038 for i in 0..batch_size {
1039 unsafe {
1040 let task = buffer.deref().read(f.wrapping_add(i + 1));
1041 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1042 }
1043 }
1044 }
1045 }
1046
1047 // Try incrementing the front index to steal the task.
1048 // If the buffer has been swapped or the increment fails, we retry.
1049 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1050 || self
1051 .inner
1052 .front
1053 .compare_exchange(
1054 f,
1055 f.wrapping_add(batch_size + 1),
1056 Ordering::SeqCst,
1057 Ordering::Relaxed,
1058 )
1059 .is_err()
1060 {
1061 // We didn't steal this task, forget it.
1062 return Steal::Retry;
1063 }
1064
1065 dest_b = dest_b.wrapping_add(batch_size);
1066 }
1067
1068 // Steal a batch of tasks from the front one by one.
1069 Flavor::Lifo => {
1070 // Try incrementing the front index to steal the task.
1071 if self
1072 .inner
1073 .front
1074 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1075 .is_err()
1076 {
1077 // We didn't steal this task, forget it.
1078 return Steal::Retry;
1079 }
1080
1081 // Move the front index one step forward.
1082 f = f.wrapping_add(1);
1083
1084 // Repeat the same procedure for the batch steals.
1085 //
1086 // This loop may modify the batch_size, which triggers a clippy lint warning.
1087 // Use a new variable to avoid the warning, and to make it clear we aren't
1088 // modifying the loop exit condition during iteration.
1089 let original_batch_size = batch_size;
1090 for i in 0..original_batch_size {
1091 // We've already got the current front index. Now execute the fence to
1092 // synchronize with other threads.
1093 atomic::fence(Ordering::SeqCst);
1094
1095 // Load the back index.
1096 let b = self.inner.back.load(Ordering::Acquire);
1097
1098 // Is the queue empty?
1099 if b.wrapping_sub(f) <= 0 {
1100 batch_size = i;
1101 break;
1102 }
1103
1104 // Read the task at the front.
1105 let tmp = unsafe { buffer.deref().read(f) };
1106
1107 // Try incrementing the front index to steal the task.
1108 // If the buffer has been swapped or the increment fails, we retry.
1109 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1110 || self
1111 .inner
1112 .front
1113 .compare_exchange(
1114 f,
1115 f.wrapping_add(1),
1116 Ordering::SeqCst,
1117 Ordering::Relaxed,
1118 )
1119 .is_err()
1120 {
1121 // We didn't steal this task, forget it and break from the loop.
1122 batch_size = i;
1123 break;
1124 }
1125
1126 // Write the previously stolen task into the destination buffer.
1127 unsafe {
1128 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1129 }
1130
1131 // Move the source front index and the destination back index one step forward.
1132 f = f.wrapping_add(1);
1133 dest_b = dest_b.wrapping_add(1);
1134 }
1135
1136 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1137 if dest.flavor == Flavor::Fifo {
1138 for i in 0..batch_size / 2 {
1139 unsafe {
1140 let i1 = dest_b.wrapping_sub(batch_size - i);
1141 let i2 = dest_b.wrapping_sub(i + 1);
1142 let t1 = dest_buffer.read(i1);
1143 let t2 = dest_buffer.read(i2);
1144 dest_buffer.write(i1, t2);
1145 dest_buffer.write(i2, t1);
1146 }
1147 }
1148 }
1149 }
1150 }
1151
1152 atomic::fence(Ordering::Release);
1153
1154 // Update the back index in the destination queue.
1155 //
1156 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1157 // races because it doesn't understand fences.
1158 dest.inner.back.store(dest_b, Ordering::Release);
1159
1160 // Return with success.
1161 Steal::Success(unsafe { task.assume_init() })
1162 }
1163}
1164
1165impl<T> Clone for Stealer<T> {
1166 fn clone(&self) -> Stealer<T> {
1167 Stealer {
1168 inner: self.inner.clone(),
1169 flavor: self.flavor,
1170 }
1171 }
1172}
1173
1174impl<T> fmt::Debug for Stealer<T> {
1175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176 f.pad("Stealer { .. }")
1177 }
1178}
1179
1180// Bits indicating the state of a slot:
1181// * If a task has been written into the slot, `WRITE` is set.
1182// * If a task has been read from the slot, `READ` is set.
1183// * If the block is being destroyed, `DESTROY` is set.
1184const WRITE: usize = 1;
1185const READ: usize = 2;
1186const DESTROY: usize = 4;
1187
1188// Each block covers one "lap" of indices.
1189const LAP: usize = 64;
1190// The maximum number of values a block can hold.
1191const BLOCK_CAP: usize = LAP - 1;
1192// How many lower bits are reserved for metadata.
1193const SHIFT: usize = 1;
1194// Indicates that the block is not the last one.
1195const HAS_NEXT: usize = 1;
1196
1197/// A slot in a block.
1198struct Slot<T> {
1199 /// The task.
1200 task: UnsafeCell<MaybeUninit<T>>,
1201
1202 /// The state of the slot.
1203 state: AtomicUsize,
1204}
1205
1206impl<T> Slot<T> {
1207 /// Waits until a task is written into the slot.
1208 fn wait_write(&self) {
1209 let backoff = Backoff::new();
1210 while self.state.load(Ordering::Acquire) & WRITE == 0 {
1211 backoff.snooze();
1212 }
1213 }
1214}
1215
1216/// A block in a linked list.
1217///
1218/// Each block in the list can hold up to `BLOCK_CAP` values.
1219struct Block<T> {
1220 /// The next block in the linked list.
1221 next: AtomicPtr<Block<T>>,
1222
1223 /// Slots for values.
1224 slots: [Slot<T>; BLOCK_CAP],
1225}
1226
1227impl<T> Block<T> {
1228 const LAYOUT: Layout = {
1229 let layout = Layout::new::<Self>();
1230 assert!(
1231 layout.size() != 0,
1232 "Block should never be zero-sized, as it has an AtomicPtr field"
1233 );
1234 layout
1235 };
1236
1237 /// Creates an empty block.
1238 fn new() -> Box<Self> {
1239 // SAFETY: layout is not zero-sized
1240 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1241 // Handle allocation failure
1242 if ptr.is_null() {
1243 handle_alloc_error(Self::LAYOUT)
1244 }
1245 // SAFETY: This is safe because:
1246 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1247 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1248 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1249 // holds a MaybeUninit.
1250 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1251 // TODO: unsafe { Box::new_zeroed().assume_init() }
1252 unsafe { Box::from_raw(ptr.cast()) }
1253 }
1254
1255 /// Waits until the next pointer is set.
1256 fn wait_next(&self) -> *mut Block<T> {
1257 let backoff = Backoff::new();
1258 loop {
1259 let next = self.next.load(Ordering::Acquire);
1260 if !next.is_null() {
1261 return next;
1262 }
1263 backoff.snooze();
1264 }
1265 }
1266
1267 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1268 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1269 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1270 // begun destruction of the block.
1271 for i in (0..count).rev() {
1272 let slot = (*this).slots.get_unchecked(i);
1273
1274 // Mark the `DESTROY` bit if a thread is still using the slot.
1275 if slot.state.load(Ordering::Acquire) & READ == 0
1276 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1277 {
1278 // If a thread is still using the slot, it will continue destruction of the block.
1279 return;
1280 }
1281 }
1282
1283 // No thread is using the block, now it is safe to destroy it.
1284 drop(Box::from_raw(this));
1285 }
1286}
1287
1288/// A position in a queue.
1289struct Position<T> {
1290 /// The index in the queue.
1291 index: AtomicUsize,
1292
1293 /// The block in the linked list.
1294 block: AtomicPtr<Block<T>>,
1295}
1296
1297/// An injector queue.
1298///
1299/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1300/// a single injector queue, which is the entry point for new tasks.
1301///
1302/// # Examples
1303///
1304/// ```
1305/// use crossbeam_deque::{Injector, Steal};
1306///
1307/// let q = Injector::new();
1308/// q.push(1);
1309/// q.push(2);
1310///
1311/// assert_eq!(q.steal(), Steal::Success(1));
1312/// assert_eq!(q.steal(), Steal::Success(2));
1313/// assert_eq!(q.steal(), Steal::Empty);
1314/// ```
1315pub struct Injector<T> {
1316 /// The head of the queue.
1317 head: CachePadded<Position<T>>,
1318
1319 /// The tail of the queue.
1320 tail: CachePadded<Position<T>>,
1321
1322 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1323 _marker: PhantomData<T>,
1324}
1325
1326unsafe impl<T: Send> Send for Injector<T> {}
1327unsafe impl<T: Send> Sync for Injector<T> {}
1328
1329impl<T> Default for Injector<T> {
1330 fn default() -> Self {
1331 let block = Box::into_raw(Block::<T>::new());
1332 Self {
1333 head: CachePadded::new(Position {
1334 block: AtomicPtr::new(block),
1335 index: AtomicUsize::new(0),
1336 }),
1337 tail: CachePadded::new(Position {
1338 block: AtomicPtr::new(block),
1339 index: AtomicUsize::new(0),
1340 }),
1341 _marker: PhantomData,
1342 }
1343 }
1344}
1345
1346impl<T> Injector<T> {
1347 /// Creates a new injector queue.
1348 ///
1349 /// # Examples
1350 ///
1351 /// ```
1352 /// use crossbeam_deque::Injector;
1353 ///
1354 /// let q = Injector::<i32>::new();
1355 /// ```
1356 pub fn new() -> Injector<T> {
1357 Self::default()
1358 }
1359
1360 /// Pushes a task into the queue.
1361 ///
1362 /// # Examples
1363 ///
1364 /// ```
1365 /// use crossbeam_deque::Injector;
1366 ///
1367 /// let w = Injector::new();
1368 /// w.push(1);
1369 /// w.push(2);
1370 /// ```
1371 pub fn push(&self, task: T) {
1372 let backoff = Backoff::new();
1373 let mut tail = self.tail.index.load(Ordering::Acquire);
1374 let mut block = self.tail.block.load(Ordering::Acquire);
1375 let mut next_block = None;
1376
1377 loop {
1378 // Calculate the offset of the index into the block.
1379 let offset = (tail >> SHIFT) % LAP;
1380
1381 // If we reached the end of the block, wait until the next one is installed.
1382 if offset == BLOCK_CAP {
1383 backoff.snooze();
1384 tail = self.tail.index.load(Ordering::Acquire);
1385 block = self.tail.block.load(Ordering::Acquire);
1386 continue;
1387 }
1388
1389 // If we're going to have to install the next block, allocate it in advance in order to
1390 // make the wait for other threads as short as possible.
1391 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1392 next_block = Some(Block::<T>::new());
1393 }
1394
1395 let new_tail = tail + (1 << SHIFT);
1396
1397 // Try advancing the tail forward.
1398 match self.tail.index.compare_exchange_weak(
1399 tail,
1400 new_tail,
1401 Ordering::SeqCst,
1402 Ordering::Acquire,
1403 ) {
1404 Ok(_) => unsafe {
1405 // If we've reached the end of the block, install the next one.
1406 if offset + 1 == BLOCK_CAP {
1407 let next_block = Box::into_raw(next_block.unwrap());
1408 let next_index = new_tail.wrapping_add(1 << SHIFT);
1409
1410 self.tail.block.store(next_block, Ordering::Release);
1411 self.tail.index.store(next_index, Ordering::Release);
1412 (*block).next.store(next_block, Ordering::Release);
1413 }
1414
1415 // Write the task into the slot.
1416 let slot = (*block).slots.get_unchecked(offset);
1417 slot.task.get().write(MaybeUninit::new(task));
1418 slot.state.fetch_or(WRITE, Ordering::Release);
1419
1420 return;
1421 },
1422 Err(t) => {
1423 tail = t;
1424 block = self.tail.block.load(Ordering::Acquire);
1425 backoff.spin();
1426 }
1427 }
1428 }
1429 }
1430
1431 /// Steals a task from the queue.
1432 ///
1433 /// # Examples
1434 ///
1435 /// ```
1436 /// use crossbeam_deque::{Injector, Steal};
1437 ///
1438 /// let q = Injector::new();
1439 /// q.push(1);
1440 /// q.push(2);
1441 ///
1442 /// assert_eq!(q.steal(), Steal::Success(1));
1443 /// assert_eq!(q.steal(), Steal::Success(2));
1444 /// assert_eq!(q.steal(), Steal::Empty);
1445 /// ```
1446 pub fn steal(&self) -> Steal<T> {
1447 let mut head;
1448 let mut block;
1449 let mut offset;
1450
1451 let backoff = Backoff::new();
1452 loop {
1453 head = self.head.index.load(Ordering::Acquire);
1454 block = self.head.block.load(Ordering::Acquire);
1455
1456 // Calculate the offset of the index into the block.
1457 offset = (head >> SHIFT) % LAP;
1458
1459 // If we reached the end of the block, wait until the next one is installed.
1460 if offset == BLOCK_CAP {
1461 backoff.snooze();
1462 } else {
1463 break;
1464 }
1465 }
1466
1467 let mut new_head = head + (1 << SHIFT);
1468
1469 if new_head & HAS_NEXT == 0 {
1470 atomic::fence(Ordering::SeqCst);
1471 let tail = self.tail.index.load(Ordering::Relaxed);
1472
1473 // If the tail equals the head, that means the queue is empty.
1474 if head >> SHIFT == tail >> SHIFT {
1475 return Steal::Empty;
1476 }
1477
1478 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1479 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1480 new_head |= HAS_NEXT;
1481 }
1482 }
1483
1484 // Try moving the head index forward.
1485 if self
1486 .head
1487 .index
1488 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1489 .is_err()
1490 {
1491 return Steal::Retry;
1492 }
1493
1494 unsafe {
1495 // If we've reached the end of the block, move to the next one.
1496 if offset + 1 == BLOCK_CAP {
1497 let next = (*block).wait_next();
1498 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1499 if !(*next).next.load(Ordering::Relaxed).is_null() {
1500 next_index |= HAS_NEXT;
1501 }
1502
1503 self.head.block.store(next, Ordering::Release);
1504 self.head.index.store(next_index, Ordering::Release);
1505 }
1506
1507 // Read the task.
1508 let slot = (*block).slots.get_unchecked(offset);
1509 slot.wait_write();
1510 let task = slot.task.get().read().assume_init();
1511
1512 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1513 // but couldn't because we were busy reading from the slot.
1514 if (offset + 1 == BLOCK_CAP)
1515 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1516 {
1517 Block::destroy(block, offset);
1518 }
1519
1520 Steal::Success(task)
1521 }
1522 }
1523
1524 /// Steals a batch of tasks and pushes them into a worker.
1525 ///
1526 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1527 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1528 ///
1529 /// # Examples
1530 ///
1531 /// ```
1532 /// use crossbeam_deque::{Injector, Worker};
1533 ///
1534 /// let q = Injector::new();
1535 /// q.push(1);
1536 /// q.push(2);
1537 /// q.push(3);
1538 /// q.push(4);
1539 ///
1540 /// let w = Worker::new_fifo();
1541 /// let _ = q.steal_batch(&w);
1542 /// assert_eq!(w.pop(), Some(1));
1543 /// assert_eq!(w.pop(), Some(2));
1544 /// ```
1545 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1546 self.steal_batch_with_limit(dest, MAX_BATCH)
1547 }
1548
1549 /// Steals no more than of tasks and pushes them into a worker.
1550 ///
1551 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1552 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1553 ///
1554 /// # Examples
1555 ///
1556 /// ```
1557 /// use crossbeam_deque::{Injector, Worker};
1558 ///
1559 /// let q = Injector::new();
1560 /// q.push(1);
1561 /// q.push(2);
1562 /// q.push(3);
1563 /// q.push(4);
1564 /// q.push(5);
1565 /// q.push(6);
1566 ///
1567 /// let w = Worker::new_fifo();
1568 /// let _ = q.steal_batch_with_limit(&w, 2);
1569 /// assert_eq!(w.pop(), Some(1));
1570 /// assert_eq!(w.pop(), Some(2));
1571 /// assert_eq!(w.pop(), None);
1572 ///
1573 /// q.push(7);
1574 /// q.push(8);
1575 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1576 /// // half of the elements are currently popped, but the number of popped elements is considered
1577 /// // an implementation detail that may be changed in the future.
1578 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1579 /// assert_eq!(w.len(), 3);
1580 /// ```
1581 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1582 assert!(limit > 0);
1583 let mut head;
1584 let mut block;
1585 let mut offset;
1586
1587 let backoff = Backoff::new();
1588 loop {
1589 head = self.head.index.load(Ordering::Acquire);
1590 block = self.head.block.load(Ordering::Acquire);
1591
1592 // Calculate the offset of the index into the block.
1593 offset = (head >> SHIFT) % LAP;
1594
1595 // If we reached the end of the block, wait until the next one is installed.
1596 if offset == BLOCK_CAP {
1597 backoff.snooze();
1598 } else {
1599 break;
1600 }
1601 }
1602
1603 let mut new_head = head;
1604 let advance;
1605
1606 if new_head & HAS_NEXT == 0 {
1607 atomic::fence(Ordering::SeqCst);
1608 let tail = self.tail.index.load(Ordering::Relaxed);
1609
1610 // If the tail equals the head, that means the queue is empty.
1611 if head >> SHIFT == tail >> SHIFT {
1612 return Steal::Empty;
1613 }
1614
1615 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1616 // the right batch size to steal.
1617 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1618 new_head |= HAS_NEXT;
1619 // We can steal all tasks till the end of the block.
1620 advance = (BLOCK_CAP - offset).min(limit);
1621 } else {
1622 let len = (tail - head) >> SHIFT;
1623 // Steal half of the available tasks.
1624 advance = ((len + 1) / 2).min(limit);
1625 }
1626 } else {
1627 // We can steal all tasks till the end of the block.
1628 advance = (BLOCK_CAP - offset).min(limit);
1629 }
1630
1631 new_head += advance << SHIFT;
1632 let new_offset = offset + advance;
1633
1634 // Try moving the head index forward.
1635 if self
1636 .head
1637 .index
1638 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1639 .is_err()
1640 {
1641 return Steal::Retry;
1642 }
1643
1644 // Reserve capacity for the stolen batch.
1645 let batch_size = new_offset - offset;
1646 dest.reserve(batch_size);
1647
1648 // Get the destination buffer and back index.
1649 let dest_buffer = dest.buffer.get();
1650 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1651
1652 unsafe {
1653 // If we've reached the end of the block, move to the next one.
1654 if new_offset == BLOCK_CAP {
1655 let next = (*block).wait_next();
1656 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1657 if !(*next).next.load(Ordering::Relaxed).is_null() {
1658 next_index |= HAS_NEXT;
1659 }
1660
1661 self.head.block.store(next, Ordering::Release);
1662 self.head.index.store(next_index, Ordering::Release);
1663 }
1664
1665 // Copy values from the injector into the destination queue.
1666 match dest.flavor {
1667 Flavor::Fifo => {
1668 for i in 0..batch_size {
1669 // Read the task.
1670 let slot = (*block).slots.get_unchecked(offset + i);
1671 slot.wait_write();
1672 let task = slot.task.get().read();
1673
1674 // Write it into the destination queue.
1675 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1676 }
1677 }
1678
1679 Flavor::Lifo => {
1680 for i in 0..batch_size {
1681 // Read the task.
1682 let slot = (*block).slots.get_unchecked(offset + i);
1683 slot.wait_write();
1684 let task = slot.task.get().read();
1685
1686 // Write it into the destination queue.
1687 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1688 }
1689 }
1690 }
1691
1692 atomic::fence(Ordering::Release);
1693
1694 // Update the back index in the destination queue.
1695 //
1696 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1697 // data races because it doesn't understand fences.
1698 dest.inner
1699 .back
1700 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1701
1702 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1703 // but couldn't because we were busy reading from the slot.
1704 if new_offset == BLOCK_CAP {
1705 Block::destroy(block, offset);
1706 } else {
1707 for i in offset..new_offset {
1708 let slot = (*block).slots.get_unchecked(i);
1709
1710 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1711 Block::destroy(block, offset);
1712 break;
1713 }
1714 }
1715 }
1716
1717 Steal::Success(())
1718 }
1719 }
1720
1721 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1722 ///
1723 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1724 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1725 ///
1726 /// # Examples
1727 ///
1728 /// ```
1729 /// use crossbeam_deque::{Injector, Steal, Worker};
1730 ///
1731 /// let q = Injector::new();
1732 /// q.push(1);
1733 /// q.push(2);
1734 /// q.push(3);
1735 /// q.push(4);
1736 ///
1737 /// let w = Worker::new_fifo();
1738 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1739 /// assert_eq!(w.pop(), Some(2));
1740 /// ```
1741 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1742 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1743 // better, but we may change it in the future to be compatible with the same method in Stealer.
1744 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1745 }
1746
1747 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1748 ///
1749 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1750 /// steal around half of the tasks in the queue, but also not more than the given limit.
1751 ///
1752 /// # Examples
1753 ///
1754 /// ```
1755 /// use crossbeam_deque::{Injector, Steal, Worker};
1756 ///
1757 /// let q = Injector::new();
1758 /// q.push(1);
1759 /// q.push(2);
1760 /// q.push(3);
1761 /// q.push(4);
1762 /// q.push(5);
1763 /// q.push(6);
1764 ///
1765 /// let w = Worker::new_fifo();
1766 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1767 /// assert_eq!(w.pop(), Some(2));
1768 /// assert_eq!(w.pop(), None);
1769 ///
1770 /// q.push(7);
1771 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1772 /// // half of the elements are currently popped, but the number of popped elements is considered
1773 /// // an implementation detail that may be changed in the future.
1774 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1775 /// assert_eq!(w.pop(), Some(4));
1776 /// assert_eq!(w.pop(), Some(5));
1777 /// assert_eq!(w.pop(), None);
1778 /// ```
1779 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1780 assert!(limit > 0);
1781 let mut head;
1782 let mut block;
1783 let mut offset;
1784
1785 let backoff = Backoff::new();
1786 loop {
1787 head = self.head.index.load(Ordering::Acquire);
1788 block = self.head.block.load(Ordering::Acquire);
1789
1790 // Calculate the offset of the index into the block.
1791 offset = (head >> SHIFT) % LAP;
1792
1793 // If we reached the end of the block, wait until the next one is installed.
1794 if offset == BLOCK_CAP {
1795 backoff.snooze();
1796 } else {
1797 break;
1798 }
1799 }
1800
1801 let mut new_head = head;
1802 let advance;
1803
1804 if new_head & HAS_NEXT == 0 {
1805 atomic::fence(Ordering::SeqCst);
1806 let tail = self.tail.index.load(Ordering::Relaxed);
1807
1808 // If the tail equals the head, that means the queue is empty.
1809 if head >> SHIFT == tail >> SHIFT {
1810 return Steal::Empty;
1811 }
1812
1813 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1814 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1815 new_head |= HAS_NEXT;
1816 // We can steal all tasks till the end of the block.
1817 advance = (BLOCK_CAP - offset).min(limit);
1818 } else {
1819 let len = (tail - head) >> SHIFT;
1820 // Steal half of the available tasks.
1821 advance = ((len + 1) / 2).min(limit);
1822 }
1823 } else {
1824 // We can steal all tasks till the end of the block.
1825 advance = (BLOCK_CAP - offset).min(limit);
1826 }
1827
1828 new_head += advance << SHIFT;
1829 let new_offset = offset + advance;
1830
1831 // Try moving the head index forward.
1832 if self
1833 .head
1834 .index
1835 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1836 .is_err()
1837 {
1838 return Steal::Retry;
1839 }
1840
1841 // Reserve capacity for the stolen batch.
1842 let batch_size = new_offset - offset - 1;
1843 dest.reserve(batch_size);
1844
1845 // Get the destination buffer and back index.
1846 let dest_buffer = dest.buffer.get();
1847 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1848
1849 unsafe {
1850 // If we've reached the end of the block, move to the next one.
1851 if new_offset == BLOCK_CAP {
1852 let next = (*block).wait_next();
1853 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1854 if !(*next).next.load(Ordering::Relaxed).is_null() {
1855 next_index |= HAS_NEXT;
1856 }
1857
1858 self.head.block.store(next, Ordering::Release);
1859 self.head.index.store(next_index, Ordering::Release);
1860 }
1861
1862 // Read the task.
1863 let slot = (*block).slots.get_unchecked(offset);
1864 slot.wait_write();
1865 let task = slot.task.get().read();
1866
1867 match dest.flavor {
1868 Flavor::Fifo => {
1869 // Copy values from the injector into the destination queue.
1870 for i in 0..batch_size {
1871 // Read the task.
1872 let slot = (*block).slots.get_unchecked(offset + i + 1);
1873 slot.wait_write();
1874 let task = slot.task.get().read();
1875
1876 // Write it into the destination queue.
1877 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1878 }
1879 }
1880
1881 Flavor::Lifo => {
1882 // Copy values from the injector into the destination queue.
1883 for i in 0..batch_size {
1884 // Read the task.
1885 let slot = (*block).slots.get_unchecked(offset + i + 1);
1886 slot.wait_write();
1887 let task = slot.task.get().read();
1888
1889 // Write it into the destination queue.
1890 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1891 }
1892 }
1893 }
1894
1895 atomic::fence(Ordering::Release);
1896
1897 // Update the back index in the destination queue.
1898 //
1899 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1900 // data races because it doesn't understand fences.
1901 dest.inner
1902 .back
1903 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1904
1905 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1906 // but couldn't because we were busy reading from the slot.
1907 if new_offset == BLOCK_CAP {
1908 Block::destroy(block, offset);
1909 } else {
1910 for i in offset..new_offset {
1911 let slot = (*block).slots.get_unchecked(i);
1912
1913 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1914 Block::destroy(block, offset);
1915 break;
1916 }
1917 }
1918 }
1919
1920 Steal::Success(task.assume_init())
1921 }
1922 }
1923
1924 /// Returns `true` if the queue is empty.
1925 ///
1926 /// # Examples
1927 ///
1928 /// ```
1929 /// use crossbeam_deque::Injector;
1930 ///
1931 /// let q = Injector::new();
1932 ///
1933 /// assert!(q.is_empty());
1934 /// q.push(1);
1935 /// assert!(!q.is_empty());
1936 /// ```
1937 pub fn is_empty(&self) -> bool {
1938 let head = self.head.index.load(Ordering::SeqCst);
1939 let tail = self.tail.index.load(Ordering::SeqCst);
1940 head >> SHIFT == tail >> SHIFT
1941 }
1942
1943 /// Returns the number of tasks in the queue.
1944 ///
1945 /// # Examples
1946 ///
1947 /// ```
1948 /// use crossbeam_deque::Injector;
1949 ///
1950 /// let q = Injector::new();
1951 ///
1952 /// assert_eq!(q.len(), 0);
1953 /// q.push(1);
1954 /// assert_eq!(q.len(), 1);
1955 /// q.push(1);
1956 /// assert_eq!(q.len(), 2);
1957 /// ```
1958 pub fn len(&self) -> usize {
1959 loop {
1960 // Load the tail index, then load the head index.
1961 let mut tail = self.tail.index.load(Ordering::SeqCst);
1962 let mut head = self.head.index.load(Ordering::SeqCst);
1963
1964 // If the tail index didn't change, we've got consistent indices to work with.
1965 if self.tail.index.load(Ordering::SeqCst) == tail {
1966 // Erase the lower bits.
1967 tail &= !((1 << SHIFT) - 1);
1968 head &= !((1 << SHIFT) - 1);
1969
1970 // Fix up indices if they fall onto block ends.
1971 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1972 tail = tail.wrapping_add(1 << SHIFT);
1973 }
1974 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1975 head = head.wrapping_add(1 << SHIFT);
1976 }
1977
1978 // Rotate indices so that head falls into the first block.
1979 let lap = (head >> SHIFT) / LAP;
1980 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1981 head = head.wrapping_sub((lap * LAP) << SHIFT);
1982
1983 // Remove the lower bits.
1984 tail >>= SHIFT;
1985 head >>= SHIFT;
1986
1987 // Return the difference minus the number of blocks between tail and head.
1988 return tail - head - tail / LAP;
1989 }
1990 }
1991 }
1992}
1993
1994impl<T> Drop for Injector<T> {
1995 fn drop(&mut self) {
1996 let mut head = *self.head.index.get_mut();
1997 let mut tail = *self.tail.index.get_mut();
1998 let mut block = *self.head.block.get_mut();
1999
2000 // Erase the lower bits.
2001 head &= !((1 << SHIFT) - 1);
2002 tail &= !((1 << SHIFT) - 1);
2003
2004 unsafe {
2005 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2006 while head != tail {
2007 let offset = (head >> SHIFT) % LAP;
2008
2009 if offset < BLOCK_CAP {
2010 // Drop the task in the slot.
2011 let slot = (*block).slots.get_unchecked(offset);
2012 (*slot.task.get()).assume_init_drop();
2013 } else {
2014 // Deallocate the block and move to the next one.
2015 let next = *(*block).next.get_mut();
2016 drop(Box::from_raw(block));
2017 block = next;
2018 }
2019
2020 head = head.wrapping_add(1 << SHIFT);
2021 }
2022
2023 // Deallocate the last remaining block.
2024 drop(Box::from_raw(block));
2025 }
2026 }
2027}
2028
2029impl<T> fmt::Debug for Injector<T> {
2030 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2031 f.pad("Worker { .. }")
2032 }
2033}
2034
2035/// Possible outcomes of a steal operation.
2036///
2037/// # Examples
2038///
2039/// There are lots of ways to chain results of steal operations together:
2040///
2041/// ```
2042/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2043///
2044/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2045///
2046/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2047/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2048/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2049///
2050/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2051/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2052/// ```
2053#[must_use]
2054#[derive(PartialEq, Eq, Copy, Clone)]
2055pub enum Steal<T> {
2056 /// The queue was empty at the time of stealing.
2057 Empty,
2058
2059 /// At least one task was successfully stolen.
2060 Success(T),
2061
2062 /// The steal operation needs to be retried.
2063 Retry,
2064}
2065
2066impl<T> Steal<T> {
2067 /// Returns `true` if the queue was empty at the time of stealing.
2068 ///
2069 /// # Examples
2070 ///
2071 /// ```
2072 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2073 ///
2074 /// assert!(!Success(7).is_empty());
2075 /// assert!(!Retry::<i32>.is_empty());
2076 ///
2077 /// assert!(Empty::<i32>.is_empty());
2078 /// ```
2079 pub fn is_empty(&self) -> bool {
2080 match self {
2081 Steal::Empty => true,
2082 _ => false,
2083 }
2084 }
2085
2086 /// Returns `true` if at least one task was stolen.
2087 ///
2088 /// # Examples
2089 ///
2090 /// ```
2091 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2092 ///
2093 /// assert!(!Empty::<i32>.is_success());
2094 /// assert!(!Retry::<i32>.is_success());
2095 ///
2096 /// assert!(Success(7).is_success());
2097 /// ```
2098 pub fn is_success(&self) -> bool {
2099 match self {
2100 Steal::Success(_) => true,
2101 _ => false,
2102 }
2103 }
2104
2105 /// Returns `true` if the steal operation needs to be retried.
2106 ///
2107 /// # Examples
2108 ///
2109 /// ```
2110 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2111 ///
2112 /// assert!(!Empty::<i32>.is_retry());
2113 /// assert!(!Success(7).is_retry());
2114 ///
2115 /// assert!(Retry::<i32>.is_retry());
2116 /// ```
2117 pub fn is_retry(&self) -> bool {
2118 match self {
2119 Steal::Retry => true,
2120 _ => false,
2121 }
2122 }
2123
2124 /// Returns the result of the operation, if successful.
2125 ///
2126 /// # Examples
2127 ///
2128 /// ```
2129 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2130 ///
2131 /// assert_eq!(Empty::<i32>.success(), None);
2132 /// assert_eq!(Retry::<i32>.success(), None);
2133 ///
2134 /// assert_eq!(Success(7).success(), Some(7));
2135 /// ```
2136 pub fn success(self) -> Option<T> {
2137 match self {
2138 Steal::Success(res) => Some(res),
2139 _ => None,
2140 }
2141 }
2142
2143 /// If no task was stolen, attempts another steal operation.
2144 ///
2145 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2146 ///
2147 /// * If the second steal resulted in `Success`, it is returned.
2148 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2149 /// * If both resulted in `None`, then `None` is returned.
2150 ///
2151 /// # Examples
2152 ///
2153 /// ```
2154 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2155 ///
2156 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2157 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2158 ///
2159 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2160 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2161 ///
2162 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2163 /// ```
2164 pub fn or_else<F>(self, f: F) -> Steal<T>
2165 where
2166 F: FnOnce() -> Steal<T>,
2167 {
2168 match self {
2169 Steal::Empty => f(),
2170 Steal::Success(_) => self,
2171 Steal::Retry => {
2172 if let Steal::Success(res) = f() {
2173 Steal::Success(res)
2174 } else {
2175 Steal::Retry
2176 }
2177 }
2178 }
2179 }
2180}
2181
2182impl<T> fmt::Debug for Steal<T> {
2183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184 match self {
2185 Steal::Empty => f.pad("Empty"),
2186 Steal::Success(_) => f.pad("Success(..)"),
2187 Steal::Retry => f.pad("Retry"),
2188 }
2189 }
2190}
2191
2192impl<T> FromIterator<Steal<T>> for Steal<T> {
2193 /// Consumes items until a `Success` is found and returns it.
2194 ///
2195 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2196 /// Otherwise, `Empty` is returned.
2197 fn from_iter<I>(iter: I) -> Steal<T>
2198 where
2199 I: IntoIterator<Item = Steal<T>>,
2200 {
2201 let mut retry = false;
2202 for s in iter {
2203 match &s {
2204 Steal::Empty => {}
2205 Steal::Success(_) => return s,
2206 Steal::Retry => retry = true,
2207 }
2208 }
2209
2210 if retry {
2211 Steal::Retry
2212 } else {
2213 Steal::Empty
2214 }
2215 }
2216}