crossbeam_channel/flavors/
array.rs1use std::boxed::Box;
12use std::cell::UnsafeCell;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
24
25struct Slot<T> {
27 stamp: AtomicUsize,
29
30 msg: UnsafeCell<MaybeUninit<T>>,
32}
33
34#[derive(Debug)]
36pub(crate) struct ArrayToken {
37 slot: *const u8,
39
40 stamp: usize,
42}
43
44impl Default for ArrayToken {
45 #[inline]
46 fn default() -> Self {
47 ArrayToken {
48 slot: ptr::null(),
49 stamp: 0,
50 }
51 }
52}
53
54pub(crate) struct Channel<T> {
56 head: CachePadded<AtomicUsize>,
64
65 tail: CachePadded<AtomicUsize>,
73
74 buffer: Box<[Slot<T>]>,
76
77 cap: usize,
79
80 one_lap: usize,
82
83 mark_bit: usize,
85
86 senders: SyncWaker,
88
89 receivers: SyncWaker,
91}
92
93impl<T> Channel<T> {
94 pub(crate) fn with_capacity(cap: usize) -> Self {
96 assert!(cap > 0, "capacity must be positive");
97
98 let mark_bit = (cap + 1).next_power_of_two();
100 let one_lap = mark_bit * 2;
101
102 let head = 0;
104 let tail = 0;
106
107 let buffer: Box<[Slot<T>]> = (0..cap)
110 .map(|i| {
111 Slot {
113 stamp: AtomicUsize::new(i),
114 msg: UnsafeCell::new(MaybeUninit::uninit()),
115 }
116 })
117 .collect();
118
119 Channel {
120 buffer,
121 cap,
122 one_lap,
123 mark_bit,
124 head: CachePadded::new(AtomicUsize::new(head)),
125 tail: CachePadded::new(AtomicUsize::new(tail)),
126 senders: SyncWaker::new(),
127 receivers: SyncWaker::new(),
128 }
129 }
130
131 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
133 Receiver(self)
134 }
135
136 pub(crate) fn sender(&self) -> Sender<'_, T> {
138 Sender(self)
139 }
140
141 fn start_send(&self, token: &mut Token) -> bool {
143 let backoff = Backoff::new();
144 let mut tail = self.tail.load(Ordering::Relaxed);
145
146 loop {
147 if tail & self.mark_bit != 0 {
149 token.array.slot = ptr::null();
150 token.array.stamp = 0;
151 return true;
152 }
153
154 let index = tail & (self.mark_bit - 1);
156 let lap = tail & !(self.one_lap - 1);
157
158 debug_assert!(index < self.buffer.len());
160 let slot = unsafe { self.buffer.get_unchecked(index) };
161 let stamp = slot.stamp.load(Ordering::Acquire);
162
163 if tail == stamp {
165 let new_tail = if index + 1 < self.cap {
166 tail + 1
169 } else {
170 lap.wrapping_add(self.one_lap)
173 };
174
175 match self.tail.compare_exchange_weak(
177 tail,
178 new_tail,
179 Ordering::SeqCst,
180 Ordering::Relaxed,
181 ) {
182 Ok(_) => {
183 token.array.slot = slot as *const Slot<T> as *const u8;
185 token.array.stamp = tail + 1;
186 return true;
187 }
188 Err(t) => {
189 tail = t;
190 backoff.spin();
191 }
192 }
193 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
194 atomic::fence(Ordering::SeqCst);
195 let head = self.head.load(Ordering::Relaxed);
196
197 if head.wrapping_add(self.one_lap) == tail {
199 return false;
201 }
202
203 backoff.spin();
204 tail = self.tail.load(Ordering::Relaxed);
205 } else {
206 backoff.snooze();
208 tail = self.tail.load(Ordering::Relaxed);
209 }
210 }
211 }
212
213 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
215 if token.array.slot.is_null() {
217 return Err(msg);
218 }
219
220 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
221
222 slot.msg.get().write(MaybeUninit::new(msg));
224 slot.stamp.store(token.array.stamp, Ordering::Release);
225
226 self.receivers.notify();
228 Ok(())
229 }
230
231 fn start_recv(&self, token: &mut Token) -> bool {
233 let backoff = Backoff::new();
234 let mut head = self.head.load(Ordering::Relaxed);
235
236 loop {
237 let index = head & (self.mark_bit - 1);
239 let lap = head & !(self.one_lap - 1);
240
241 debug_assert!(index < self.buffer.len());
243 let slot = unsafe { self.buffer.get_unchecked(index) };
244 let stamp = slot.stamp.load(Ordering::Acquire);
245
246 if head + 1 == stamp {
248 let new = if index + 1 < self.cap {
249 head + 1
252 } else {
253 lap.wrapping_add(self.one_lap)
256 };
257
258 match self.head.compare_exchange_weak(
260 head,
261 new,
262 Ordering::SeqCst,
263 Ordering::Relaxed,
264 ) {
265 Ok(_) => {
266 token.array.slot = slot as *const Slot<T> as *const u8;
268 token.array.stamp = head.wrapping_add(self.one_lap);
269 return true;
270 }
271 Err(h) => {
272 head = h;
273 backoff.spin();
274 }
275 }
276 } else if stamp == head {
277 atomic::fence(Ordering::SeqCst);
278 let tail = self.tail.load(Ordering::Relaxed);
279
280 if (tail & !self.mark_bit) == head {
282 if tail & self.mark_bit != 0 {
284 token.array.slot = ptr::null();
286 token.array.stamp = 0;
287 return true;
288 } else {
289 return false;
291 }
292 }
293
294 backoff.spin();
295 head = self.head.load(Ordering::Relaxed);
296 } else {
297 backoff.snooze();
299 head = self.head.load(Ordering::Relaxed);
300 }
301 }
302 }
303
304 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
306 if token.array.slot.is_null() {
307 return Err(());
309 }
310
311 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
312
313 let msg = slot.msg.get().read().assume_init();
315 slot.stamp.store(token.array.stamp, Ordering::Release);
316
317 self.senders.notify();
319 Ok(msg)
320 }
321
322 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
324 let token = &mut Token::default();
325 if self.start_send(token) {
326 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
327 } else {
328 Err(TrySendError::Full(msg))
329 }
330 }
331
332 pub(crate) fn send(
334 &self,
335 msg: T,
336 deadline: Option<Instant>,
337 ) -> Result<(), SendTimeoutError<T>> {
338 let token = &mut Token::default();
339 loop {
340 let backoff = Backoff::new();
342 loop {
343 if self.start_send(token) {
344 let res = unsafe { self.write(token, msg) };
345 return res.map_err(SendTimeoutError::Disconnected);
346 }
347
348 if backoff.is_completed() {
349 break;
350 } else {
351 backoff.snooze();
352 }
353 }
354
355 if let Some(d) = deadline {
356 if Instant::now() >= d {
357 return Err(SendTimeoutError::Timeout(msg));
358 }
359 }
360
361 Context::with(|cx| {
362 let oper = Operation::hook(token);
364 self.senders.register(oper, cx);
365
366 if !self.is_full() || self.is_disconnected() {
368 let _ = cx.try_select(Selected::Aborted);
369 }
370
371 let sel = cx.wait_until(deadline);
373
374 match sel {
375 Selected::Waiting => unreachable!(),
376 Selected::Aborted | Selected::Disconnected => {
377 self.senders.unregister(oper).unwrap();
378 }
379 Selected::Operation(_) => {}
380 }
381 });
382 }
383 }
384
385 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
387 let token = &mut Token::default();
388
389 if self.start_recv(token) {
390 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
391 } else {
392 Err(TryRecvError::Empty)
393 }
394 }
395
396 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
398 let token = &mut Token::default();
399 loop {
400 let backoff = Backoff::new();
402 loop {
403 if self.start_recv(token) {
404 let res = unsafe { self.read(token) };
405 return res.map_err(|_| RecvTimeoutError::Disconnected);
406 }
407
408 if backoff.is_completed() {
409 break;
410 } else {
411 backoff.snooze();
412 }
413 }
414
415 if let Some(d) = deadline {
416 if Instant::now() >= d {
417 return Err(RecvTimeoutError::Timeout);
418 }
419 }
420
421 Context::with(|cx| {
422 let oper = Operation::hook(token);
424 self.receivers.register(oper, cx);
425
426 if !self.is_empty() || self.is_disconnected() {
428 let _ = cx.try_select(Selected::Aborted);
429 }
430
431 let sel = cx.wait_until(deadline);
433
434 match sel {
435 Selected::Waiting => unreachable!(),
436 Selected::Aborted | Selected::Disconnected => {
437 self.receivers.unregister(oper).unwrap();
438 }
441 Selected::Operation(_) => {}
442 }
443 });
444 }
445 }
446
447 pub(crate) fn len(&self) -> usize {
449 loop {
450 let tail = self.tail.load(Ordering::SeqCst);
452 let head = self.head.load(Ordering::SeqCst);
453
454 if self.tail.load(Ordering::SeqCst) == tail {
456 let hix = head & (self.mark_bit - 1);
457 let tix = tail & (self.mark_bit - 1);
458
459 return if hix < tix {
460 tix - hix
461 } else if hix > tix {
462 self.cap - hix + tix
463 } else if (tail & !self.mark_bit) == head {
464 0
465 } else {
466 self.cap
467 };
468 }
469 }
470 }
471
472 pub(crate) fn capacity(&self) -> Option<usize> {
474 Some(self.cap)
475 }
476
477 pub(crate) fn disconnect(&self) -> bool {
481 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
482
483 if tail & self.mark_bit == 0 {
484 self.senders.disconnect();
485 self.receivers.disconnect();
486 true
487 } else {
488 false
489 }
490 }
491
492 pub(crate) fn is_disconnected(&self) -> bool {
494 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
495 }
496
497 pub(crate) fn is_empty(&self) -> bool {
499 let head = self.head.load(Ordering::SeqCst);
500 let tail = self.tail.load(Ordering::SeqCst);
501
502 (tail & !self.mark_bit) == head
507 }
508
509 pub(crate) fn is_full(&self) -> bool {
511 let tail = self.tail.load(Ordering::SeqCst);
512 let head = self.head.load(Ordering::SeqCst);
513
514 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
519 }
520}
521
522impl<T> Drop for Channel<T> {
523 fn drop(&mut self) {
524 if mem::needs_drop::<T>() {
525 let head = *self.head.get_mut();
527 let tail = *self.tail.get_mut();
528
529 let hix = head & (self.mark_bit - 1);
530 let tix = tail & (self.mark_bit - 1);
531
532 let len = if hix < tix {
533 tix - hix
534 } else if hix > tix {
535 self.cap - hix + tix
536 } else if (tail & !self.mark_bit) == head {
537 0
538 } else {
539 self.cap
540 };
541
542 for i in 0..len {
544 let index = if hix + i < self.cap {
546 hix + i
547 } else {
548 hix + i - self.cap
549 };
550
551 unsafe {
552 debug_assert!(index < self.buffer.len());
553 let slot = self.buffer.get_unchecked_mut(index);
554 (*slot.msg.get()).assume_init_drop();
555 }
556 }
557 }
558 }
559}
560
561pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
563
564pub(crate) struct Sender<'a, T>(&'a Channel<T>);
566
567impl<T> SelectHandle for Receiver<'_, T> {
568 fn try_select(&self, token: &mut Token) -> bool {
569 self.0.start_recv(token)
570 }
571
572 fn deadline(&self) -> Option<Instant> {
573 None
574 }
575
576 fn register(&self, oper: Operation, cx: &Context) -> bool {
577 self.0.receivers.register(oper, cx);
578 self.is_ready()
579 }
580
581 fn unregister(&self, oper: Operation) {
582 self.0.receivers.unregister(oper);
583 }
584
585 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
586 self.try_select(token)
587 }
588
589 fn is_ready(&self) -> bool {
590 !self.0.is_empty() || self.0.is_disconnected()
591 }
592
593 fn watch(&self, oper: Operation, cx: &Context) -> bool {
594 self.0.receivers.watch(oper, cx);
595 self.is_ready()
596 }
597
598 fn unwatch(&self, oper: Operation) {
599 self.0.receivers.unwatch(oper);
600 }
601}
602
603impl<T> SelectHandle for Sender<'_, T> {
604 fn try_select(&self, token: &mut Token) -> bool {
605 self.0.start_send(token)
606 }
607
608 fn deadline(&self) -> Option<Instant> {
609 None
610 }
611
612 fn register(&self, oper: Operation, cx: &Context) -> bool {
613 self.0.senders.register(oper, cx);
614 self.is_ready()
615 }
616
617 fn unregister(&self, oper: Operation) {
618 self.0.senders.unregister(oper);
619 }
620
621 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
622 self.try_select(token)
623 }
624
625 fn is_ready(&self) -> bool {
626 !self.0.is_full() || self.0.is_disconnected()
627 }
628
629 fn watch(&self, oper: Operation, cx: &Context) -> bool {
630 self.0.senders.watch(oper, cx);
631 self.is_ready()
632 }
633
634 fn unwatch(&self, oper: Operation) {
635 self.0.senders.unwatch(oper);
636 }
637}