crossbeam_channel/flavors/
list.rs1use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
4use std::boxed::Box;
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::ptr;
9use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10use std::time::Instant;
11
12use crossbeam_utils::{Backoff, CachePadded};
13
14use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::SyncWaker;
18
19const WRITE: usize = 1;
30const READ: usize = 2;
31const DESTROY: usize = 4;
32
33const LAP: usize = 32;
35const BLOCK_CAP: usize = LAP - 1;
37const SHIFT: usize = 1;
39const MARK_BIT: usize = 1;
43
44struct Slot<T> {
46 msg: UnsafeCell<MaybeUninit<T>>,
48
49 state: AtomicUsize,
51}
52
53impl<T> Slot<T> {
54 fn wait_write(&self) {
56 let backoff = Backoff::new();
57 while self.state.load(Ordering::Acquire) & WRITE == 0 {
58 backoff.snooze();
59 }
60 }
61}
62
63struct Block<T> {
67 next: AtomicPtr<Block<T>>,
69
70 slots: [Slot<T>; BLOCK_CAP],
72}
73
74impl<T> Block<T> {
75 const LAYOUT: Layout = {
76 let layout = Layout::new::<Self>();
77 assert!(
78 layout.size() != 0,
79 "Block should never be zero-sized, as it has an AtomicPtr field"
80 );
81 layout
82 };
83
84 fn new() -> Box<Self> {
86 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
88 if ptr.is_null() {
90 handle_alloc_error(Self::LAYOUT)
91 }
92 unsafe { Box::from_raw(ptr.cast()) }
100 }
101
102 fn wait_next(&self) -> *mut Block<T> {
104 let backoff = Backoff::new();
105 loop {
106 let next = self.next.load(Ordering::Acquire);
107 if !next.is_null() {
108 return next;
109 }
110 backoff.snooze();
111 }
112 }
113
114 unsafe fn destroy(this: *mut Block<T>, start: usize) {
116 for i in start..BLOCK_CAP - 1 {
119 let slot = (*this).slots.get_unchecked(i);
120
121 if slot.state.load(Ordering::Acquire) & READ == 0
123 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
124 {
125 return;
127 }
128 }
129
130 drop(Box::from_raw(this));
132 }
133}
134
135#[derive(Debug)]
137struct Position<T> {
138 index: AtomicUsize,
140
141 block: AtomicPtr<Block<T>>,
143}
144
145#[derive(Debug)]
147pub(crate) struct ListToken {
148 block: *const u8,
150
151 offset: usize,
153}
154
155impl Default for ListToken {
156 #[inline]
157 fn default() -> Self {
158 ListToken {
159 block: ptr::null(),
160 offset: 0,
161 }
162 }
163}
164
165pub(crate) struct Channel<T> {
173 head: CachePadded<Position<T>>,
175
176 tail: CachePadded<Position<T>>,
178
179 receivers: SyncWaker,
181
182 _marker: PhantomData<T>,
184}
185
186impl<T> Channel<T> {
187 pub(crate) fn new() -> Self {
189 Channel {
190 head: CachePadded::new(Position {
191 block: AtomicPtr::new(ptr::null_mut()),
192 index: AtomicUsize::new(0),
193 }),
194 tail: CachePadded::new(Position {
195 block: AtomicPtr::new(ptr::null_mut()),
196 index: AtomicUsize::new(0),
197 }),
198 receivers: SyncWaker::new(),
199 _marker: PhantomData,
200 }
201 }
202
203 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
205 Receiver(self)
206 }
207
208 pub(crate) fn sender(&self) -> Sender<'_, T> {
210 Sender(self)
211 }
212
213 fn start_send(&self, token: &mut Token) -> bool {
215 let backoff = Backoff::new();
216 let mut tail = self.tail.index.load(Ordering::Acquire);
217 let mut block = self.tail.block.load(Ordering::Acquire);
218 let mut next_block = None;
219
220 loop {
221 if tail & MARK_BIT != 0 {
223 token.list.block = ptr::null();
224 return true;
225 }
226
227 let offset = (tail >> SHIFT) % LAP;
229
230 if offset == BLOCK_CAP {
232 backoff.snooze();
233 tail = self.tail.index.load(Ordering::Acquire);
234 block = self.tail.block.load(Ordering::Acquire);
235 continue;
236 }
237
238 if offset + 1 == BLOCK_CAP && next_block.is_none() {
241 next_block = Some(Block::<T>::new());
242 }
243
244 if block.is_null() {
247 let new = Box::into_raw(Block::<T>::new());
248
249 if self
250 .tail
251 .block
252 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
253 .is_ok()
254 {
255 self.head.block.store(new, Ordering::Release);
256 block = new;
257 } else {
258 next_block = unsafe { Some(Box::from_raw(new)) };
259 tail = self.tail.index.load(Ordering::Acquire);
260 block = self.tail.block.load(Ordering::Acquire);
261 continue;
262 }
263 }
264
265 let new_tail = tail + (1 << SHIFT);
266
267 match self.tail.index.compare_exchange_weak(
269 tail,
270 new_tail,
271 Ordering::SeqCst,
272 Ordering::Acquire,
273 ) {
274 Ok(_) => unsafe {
275 if offset + 1 == BLOCK_CAP {
277 let next_block = Box::into_raw(next_block.unwrap());
278 self.tail.block.store(next_block, Ordering::Release);
279 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
280 (*block).next.store(next_block, Ordering::Release);
281 }
282
283 token.list.block = block as *const u8;
284 token.list.offset = offset;
285 return true;
286 },
287 Err(t) => {
288 tail = t;
289 block = self.tail.block.load(Ordering::Acquire);
290 backoff.spin();
291 }
292 }
293 }
294 }
295
296 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
298 if token.list.block.is_null() {
300 return Err(msg);
301 }
302
303 let block = token.list.block.cast::<Block<T>>();
305 let offset = token.list.offset;
306 let slot = (*block).slots.get_unchecked(offset);
307 slot.msg.get().write(MaybeUninit::new(msg));
308 slot.state.fetch_or(WRITE, Ordering::Release);
309
310 self.receivers.notify();
312 Ok(())
313 }
314
315 fn start_recv(&self, token: &mut Token) -> bool {
317 let backoff = Backoff::new();
318 let mut head = self.head.index.load(Ordering::Acquire);
319 let mut block = self.head.block.load(Ordering::Acquire);
320
321 loop {
322 let offset = (head >> SHIFT) % LAP;
324
325 if offset == BLOCK_CAP {
327 backoff.snooze();
328 head = self.head.index.load(Ordering::Acquire);
329 block = self.head.block.load(Ordering::Acquire);
330 continue;
331 }
332
333 let mut new_head = head + (1 << SHIFT);
334
335 if new_head & MARK_BIT == 0 {
336 atomic::fence(Ordering::SeqCst);
337 let tail = self.tail.index.load(Ordering::Relaxed);
338
339 if head >> SHIFT == tail >> SHIFT {
341 if tail & MARK_BIT != 0 {
343 token.list.block = ptr::null();
345 return true;
346 } else {
347 return false;
349 }
350 }
351
352 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
354 new_head |= MARK_BIT;
355 }
356 }
357
358 if block.is_null() {
361 backoff.snooze();
362 head = self.head.index.load(Ordering::Acquire);
363 block = self.head.block.load(Ordering::Acquire);
364 continue;
365 }
366
367 match self.head.index.compare_exchange_weak(
369 head,
370 new_head,
371 Ordering::SeqCst,
372 Ordering::Acquire,
373 ) {
374 Ok(_) => unsafe {
375 if offset + 1 == BLOCK_CAP {
377 let next = (*block).wait_next();
378 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
379 if !(*next).next.load(Ordering::Relaxed).is_null() {
380 next_index |= MARK_BIT;
381 }
382
383 self.head.block.store(next, Ordering::Release);
384 self.head.index.store(next_index, Ordering::Release);
385 }
386
387 token.list.block = block as *const u8;
388 token.list.offset = offset;
389 return true;
390 },
391 Err(h) => {
392 head = h;
393 block = self.head.block.load(Ordering::Acquire);
394 backoff.spin();
395 }
396 }
397 }
398 }
399
400 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
402 if token.list.block.is_null() {
403 return Err(());
405 }
406
407 let block = token.list.block as *mut Block<T>;
409 let offset = token.list.offset;
410 let slot = (*block).slots.get_unchecked(offset);
411 slot.wait_write();
412 let msg = slot.msg.get().read().assume_init();
413
414 if offset + 1 == BLOCK_CAP {
417 Block::destroy(block, 0);
418 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
419 Block::destroy(block, offset + 1);
420 }
421
422 Ok(msg)
423 }
424
425 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
427 self.send(msg, None).map_err(|err| match err {
428 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
429 SendTimeoutError::Timeout(_) => unreachable!(),
430 })
431 }
432
433 pub(crate) fn send(
435 &self,
436 msg: T,
437 _deadline: Option<Instant>,
438 ) -> Result<(), SendTimeoutError<T>> {
439 let token = &mut Token::default();
440 assert!(self.start_send(token));
441 unsafe {
442 self.write(token, msg)
443 .map_err(SendTimeoutError::Disconnected)
444 }
445 }
446
447 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
449 let token = &mut Token::default();
450
451 if self.start_recv(token) {
452 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
453 } else {
454 Err(TryRecvError::Empty)
455 }
456 }
457
458 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
460 let token = &mut Token::default();
461 loop {
462 let backoff = Backoff::new();
464 loop {
465 if self.start_recv(token) {
466 unsafe {
467 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
468 }
469 }
470
471 if backoff.is_completed() {
472 break;
473 } else {
474 backoff.snooze();
475 }
476 }
477
478 if let Some(d) = deadline {
479 if Instant::now() >= d {
480 return Err(RecvTimeoutError::Timeout);
481 }
482 }
483
484 Context::with(|cx| {
486 let oper = Operation::hook(token);
487 self.receivers.register(oper, cx);
488
489 if !self.is_empty() || self.is_disconnected() {
491 let _ = cx.try_select(Selected::Aborted);
492 }
493
494 let sel = cx.wait_until(deadline);
496
497 match sel {
498 Selected::Waiting => unreachable!(),
499 Selected::Aborted | Selected::Disconnected => {
500 self.receivers.unregister(oper).unwrap();
501 }
504 Selected::Operation(_) => {}
505 }
506 });
507 }
508 }
509
510 pub(crate) fn len(&self) -> usize {
512 loop {
513 let mut tail = self.tail.index.load(Ordering::SeqCst);
515 let mut head = self.head.index.load(Ordering::SeqCst);
516
517 if self.tail.index.load(Ordering::SeqCst) == tail {
519 tail &= !((1 << SHIFT) - 1);
521 head &= !((1 << SHIFT) - 1);
522
523 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
525 tail = tail.wrapping_add(1 << SHIFT);
526 }
527 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
528 head = head.wrapping_add(1 << SHIFT);
529 }
530
531 let lap = (head >> SHIFT) / LAP;
533 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
534 head = head.wrapping_sub((lap * LAP) << SHIFT);
535
536 tail >>= SHIFT;
538 head >>= SHIFT;
539
540 return tail - head - tail / LAP;
542 }
543 }
544 }
545
546 pub(crate) fn capacity(&self) -> Option<usize> {
548 None
549 }
550
551 pub(crate) fn disconnect_senders(&self) -> bool {
555 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
556
557 if tail & MARK_BIT == 0 {
558 self.receivers.disconnect();
559 true
560 } else {
561 false
562 }
563 }
564
565 pub(crate) fn disconnect_receivers(&self) -> bool {
569 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
570
571 if tail & MARK_BIT == 0 {
572 self.discard_all_messages();
575 true
576 } else {
577 false
578 }
579 }
580
581 fn discard_all_messages(&self) {
585 let backoff = Backoff::new();
586 let mut tail = self.tail.index.load(Ordering::Acquire);
587 loop {
588 let offset = (tail >> SHIFT) % LAP;
589 if offset != BLOCK_CAP {
590 break;
591 }
592
593 backoff.snooze();
597 tail = self.tail.index.load(Ordering::Acquire);
598 }
599
600 let mut head = self.head.index.load(Ordering::Acquire);
601 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
605
606 if head >> SHIFT != tail >> SHIFT {
608 while block.is_null() {
613 backoff.snooze();
614 block = self.head.block.load(Ordering::Acquire);
615 }
616 }
617
618 unsafe {
619 while head >> SHIFT != tail >> SHIFT {
621 let offset = (head >> SHIFT) % LAP;
622
623 if offset < BLOCK_CAP {
624 let slot = (*block).slots.get_unchecked(offset);
626 slot.wait_write();
627 (*slot.msg.get()).assume_init_drop();
628 } else {
629 (*block).wait_next();
630 let next = (*block).next.load(Ordering::Acquire);
632 drop(Box::from_raw(block));
633 block = next;
634 }
635
636 head = head.wrapping_add(1 << SHIFT);
637 }
638
639 if !block.is_null() {
641 drop(Box::from_raw(block));
642 }
643 }
644 head &= !MARK_BIT;
645 self.head.index.store(head, Ordering::Release);
646 }
647
648 pub(crate) fn is_disconnected(&self) -> bool {
650 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
651 }
652
653 pub(crate) fn is_empty(&self) -> bool {
655 let head = self.head.index.load(Ordering::SeqCst);
656 let tail = self.tail.index.load(Ordering::SeqCst);
657 head >> SHIFT == tail >> SHIFT
658 }
659
660 pub(crate) fn is_full(&self) -> bool {
662 false
663 }
664}
665
666impl<T> Drop for Channel<T> {
667 fn drop(&mut self) {
668 let mut head = *self.head.index.get_mut();
669 let mut tail = *self.tail.index.get_mut();
670 let mut block = *self.head.block.get_mut();
671
672 head &= !((1 << SHIFT) - 1);
674 tail &= !((1 << SHIFT) - 1);
675
676 unsafe {
677 while head != tail {
679 let offset = (head >> SHIFT) % LAP;
680
681 if offset < BLOCK_CAP {
682 let slot = (*block).slots.get_unchecked(offset);
684 (*slot.msg.get()).assume_init_drop();
685 } else {
686 let next = *(*block).next.get_mut();
688 drop(Box::from_raw(block));
689 block = next;
690 }
691
692 head = head.wrapping_add(1 << SHIFT);
693 }
694
695 if !block.is_null() {
697 drop(Box::from_raw(block));
698 }
699 }
700 }
701}
702
703pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
705
706pub(crate) struct Sender<'a, T>(&'a Channel<T>);
708
709impl<T> SelectHandle for Receiver<'_, T> {
710 fn try_select(&self, token: &mut Token) -> bool {
711 self.0.start_recv(token)
712 }
713
714 fn deadline(&self) -> Option<Instant> {
715 None
716 }
717
718 fn register(&self, oper: Operation, cx: &Context) -> bool {
719 self.0.receivers.register(oper, cx);
720 self.is_ready()
721 }
722
723 fn unregister(&self, oper: Operation) {
724 self.0.receivers.unregister(oper);
725 }
726
727 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
728 self.try_select(token)
729 }
730
731 fn is_ready(&self) -> bool {
732 !self.0.is_empty() || self.0.is_disconnected()
733 }
734
735 fn watch(&self, oper: Operation, cx: &Context) -> bool {
736 self.0.receivers.watch(oper, cx);
737 self.is_ready()
738 }
739
740 fn unwatch(&self, oper: Operation) {
741 self.0.receivers.unwatch(oper);
742 }
743}
744
745impl<T> SelectHandle for Sender<'_, T> {
746 fn try_select(&self, token: &mut Token) -> bool {
747 self.0.start_send(token)
748 }
749
750 fn deadline(&self) -> Option<Instant> {
751 None
752 }
753
754 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
755 self.is_ready()
756 }
757
758 fn unregister(&self, _oper: Operation) {}
759
760 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
761 self.try_select(token)
762 }
763
764 fn is_ready(&self) -> bool {
765 true
766 }
767
768 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
769 self.is_ready()
770 }
771
772 fn unwatch(&self, _oper: Operation) {}
773}