crossbeam_channel/flavors/
array.rs

1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//!   - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10
11use std::boxed::Box;
12use std::cell::UnsafeCell;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
24
25/// A slot in a channel.
26struct Slot<T> {
27    /// The current stamp.
28    stamp: AtomicUsize,
29
30    /// The message in this slot.
31    msg: UnsafeCell<MaybeUninit<T>>,
32}
33
34/// The token type for the array flavor.
35#[derive(Debug)]
36pub(crate) struct ArrayToken {
37    /// Slot to read from or write to.
38    slot: *const u8,
39
40    /// Stamp to store into the slot after reading or writing.
41    stamp: usize,
42}
43
44impl Default for ArrayToken {
45    #[inline]
46    fn default() -> Self {
47        ArrayToken {
48            slot: ptr::null(),
49            stamp: 0,
50        }
51    }
52}
53
54/// Bounded channel based on a preallocated array.
55pub(crate) struct Channel<T> {
56    /// The head of the channel.
57    ///
58    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60    /// represent the lap. The mark bit in the head is always zero.
61    ///
62    /// Messages are popped from the head of the channel.
63    head: CachePadded<AtomicUsize>,
64
65    /// The tail of the channel.
66    ///
67    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69    /// represent the lap. The mark bit indicates that the channel is disconnected.
70    ///
71    /// Messages are pushed into the tail of the channel.
72    tail: CachePadded<AtomicUsize>,
73
74    /// The buffer holding slots.
75    buffer: Box<[Slot<T>]>,
76
77    /// The channel capacity.
78    cap: usize,
79
80    /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81    one_lap: usize,
82
83    /// If this bit is set in the tail, that means the channel is disconnected.
84    mark_bit: usize,
85
86    /// Senders waiting while the channel is full.
87    senders: SyncWaker,
88
89    /// Receivers waiting while the channel is empty and not disconnected.
90    receivers: SyncWaker,
91}
92
93impl<T> Channel<T> {
94    /// Creates a bounded channel of capacity `cap`.
95    pub(crate) fn with_capacity(cap: usize) -> Self {
96        assert!(cap > 0, "capacity must be positive");
97
98        // Compute constants `mark_bit` and `one_lap`.
99        let mark_bit = (cap + 1).next_power_of_two();
100        let one_lap = mark_bit * 2;
101
102        // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
103        let head = 0;
104        // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
105        let tail = 0;
106
107        // Allocate a buffer of `cap` slots initialized
108        // with stamps.
109        let buffer: Box<[Slot<T>]> = (0..cap)
110            .map(|i| {
111                // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
112                Slot {
113                    stamp: AtomicUsize::new(i),
114                    msg: UnsafeCell::new(MaybeUninit::uninit()),
115                }
116            })
117            .collect();
118
119        Channel {
120            buffer,
121            cap,
122            one_lap,
123            mark_bit,
124            head: CachePadded::new(AtomicUsize::new(head)),
125            tail: CachePadded::new(AtomicUsize::new(tail)),
126            senders: SyncWaker::new(),
127            receivers: SyncWaker::new(),
128        }
129    }
130
131    /// Returns a receiver handle to the channel.
132    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
133        Receiver(self)
134    }
135
136    /// Returns a sender handle to the channel.
137    pub(crate) fn sender(&self) -> Sender<'_, T> {
138        Sender(self)
139    }
140
141    /// Attempts to reserve a slot for sending a message.
142    fn start_send(&self, token: &mut Token) -> bool {
143        let backoff = Backoff::new();
144        let mut tail = self.tail.load(Ordering::Relaxed);
145
146        loop {
147            // Check if the channel is disconnected.
148            if tail & self.mark_bit != 0 {
149                token.array.slot = ptr::null();
150                token.array.stamp = 0;
151                return true;
152            }
153
154            // Deconstruct the tail.
155            let index = tail & (self.mark_bit - 1);
156            let lap = tail & !(self.one_lap - 1);
157
158            // Inspect the corresponding slot.
159            debug_assert!(index < self.buffer.len());
160            let slot = unsafe { self.buffer.get_unchecked(index) };
161            let stamp = slot.stamp.load(Ordering::Acquire);
162
163            // If the tail and the stamp match, we may attempt to push.
164            if tail == stamp {
165                let new_tail = if index + 1 < self.cap {
166                    // Same lap, incremented index.
167                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
168                    tail + 1
169                } else {
170                    // One lap forward, index wraps around to zero.
171                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
172                    lap.wrapping_add(self.one_lap)
173                };
174
175                // Try moving the tail.
176                match self.tail.compare_exchange_weak(
177                    tail,
178                    new_tail,
179                    Ordering::SeqCst,
180                    Ordering::Relaxed,
181                ) {
182                    Ok(_) => {
183                        // Prepare the token for the follow-up call to `write`.
184                        token.array.slot = slot as *const Slot<T> as *const u8;
185                        token.array.stamp = tail + 1;
186                        return true;
187                    }
188                    Err(t) => {
189                        tail = t;
190                        backoff.spin();
191                    }
192                }
193            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
194                atomic::fence(Ordering::SeqCst);
195                let head = self.head.load(Ordering::Relaxed);
196
197                // If the head lags one lap behind the tail as well...
198                if head.wrapping_add(self.one_lap) == tail {
199                    // ...then the channel is full.
200                    return false;
201                }
202
203                backoff.spin();
204                tail = self.tail.load(Ordering::Relaxed);
205            } else {
206                // Snooze because we need to wait for the stamp to get updated.
207                backoff.snooze();
208                tail = self.tail.load(Ordering::Relaxed);
209            }
210        }
211    }
212
213    /// Writes a message into the channel.
214    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
215        // If there is no slot, the channel is disconnected.
216        if token.array.slot.is_null() {
217            return Err(msg);
218        }
219
220        let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
221
222        // Write the message into the slot and update the stamp.
223        slot.msg.get().write(MaybeUninit::new(msg));
224        slot.stamp.store(token.array.stamp, Ordering::Release);
225
226        // Wake a sleeping receiver.
227        self.receivers.notify();
228        Ok(())
229    }
230
231    /// Attempts to reserve a slot for receiving a message.
232    fn start_recv(&self, token: &mut Token) -> bool {
233        let backoff = Backoff::new();
234        let mut head = self.head.load(Ordering::Relaxed);
235
236        loop {
237            // Deconstruct the head.
238            let index = head & (self.mark_bit - 1);
239            let lap = head & !(self.one_lap - 1);
240
241            // Inspect the corresponding slot.
242            debug_assert!(index < self.buffer.len());
243            let slot = unsafe { self.buffer.get_unchecked(index) };
244            let stamp = slot.stamp.load(Ordering::Acquire);
245
246            // If the stamp is ahead of the head by 1, we may attempt to pop.
247            if head + 1 == stamp {
248                let new = if index + 1 < self.cap {
249                    // Same lap, incremented index.
250                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
251                    head + 1
252                } else {
253                    // One lap forward, index wraps around to zero.
254                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
255                    lap.wrapping_add(self.one_lap)
256                };
257
258                // Try moving the head.
259                match self.head.compare_exchange_weak(
260                    head,
261                    new,
262                    Ordering::SeqCst,
263                    Ordering::Relaxed,
264                ) {
265                    Ok(_) => {
266                        // Prepare the token for the follow-up call to `read`.
267                        token.array.slot = slot as *const Slot<T> as *const u8;
268                        token.array.stamp = head.wrapping_add(self.one_lap);
269                        return true;
270                    }
271                    Err(h) => {
272                        head = h;
273                        backoff.spin();
274                    }
275                }
276            } else if stamp == head {
277                atomic::fence(Ordering::SeqCst);
278                let tail = self.tail.load(Ordering::Relaxed);
279
280                // If the tail equals the head, that means the channel is empty.
281                if (tail & !self.mark_bit) == head {
282                    // If the channel is disconnected...
283                    if tail & self.mark_bit != 0 {
284                        // ...then receive an error.
285                        token.array.slot = ptr::null();
286                        token.array.stamp = 0;
287                        return true;
288                    } else {
289                        // Otherwise, the receive operation is not ready.
290                        return false;
291                    }
292                }
293
294                backoff.spin();
295                head = self.head.load(Ordering::Relaxed);
296            } else {
297                // Snooze because we need to wait for the stamp to get updated.
298                backoff.snooze();
299                head = self.head.load(Ordering::Relaxed);
300            }
301        }
302    }
303
304    /// Reads a message from the channel.
305    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
306        if token.array.slot.is_null() {
307            // The channel is disconnected.
308            return Err(());
309        }
310
311        let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
312
313        // Read the message from the slot and update the stamp.
314        let msg = slot.msg.get().read().assume_init();
315        slot.stamp.store(token.array.stamp, Ordering::Release);
316
317        // Wake a sleeping sender.
318        self.senders.notify();
319        Ok(msg)
320    }
321
322    /// Attempts to send a message into the channel.
323    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
324        let token = &mut Token::default();
325        if self.start_send(token) {
326            unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
327        } else {
328            Err(TrySendError::Full(msg))
329        }
330    }
331
332    /// Sends a message into the channel.
333    pub(crate) fn send(
334        &self,
335        msg: T,
336        deadline: Option<Instant>,
337    ) -> Result<(), SendTimeoutError<T>> {
338        let token = &mut Token::default();
339        loop {
340            // Try sending a message several times.
341            let backoff = Backoff::new();
342            loop {
343                if self.start_send(token) {
344                    let res = unsafe { self.write(token, msg) };
345                    return res.map_err(SendTimeoutError::Disconnected);
346                }
347
348                if backoff.is_completed() {
349                    break;
350                } else {
351                    backoff.snooze();
352                }
353            }
354
355            if let Some(d) = deadline {
356                if Instant::now() >= d {
357                    return Err(SendTimeoutError::Timeout(msg));
358                }
359            }
360
361            Context::with(|cx| {
362                // Prepare for blocking until a receiver wakes us up.
363                let oper = Operation::hook(token);
364                self.senders.register(oper, cx);
365
366                // Has the channel become ready just now?
367                if !self.is_full() || self.is_disconnected() {
368                    let _ = cx.try_select(Selected::Aborted);
369                }
370
371                // Block the current thread.
372                let sel = cx.wait_until(deadline);
373
374                match sel {
375                    Selected::Waiting => unreachable!(),
376                    Selected::Aborted | Selected::Disconnected => {
377                        self.senders.unregister(oper).unwrap();
378                    }
379                    Selected::Operation(_) => {}
380                }
381            });
382        }
383    }
384
385    /// Attempts to receive a message without blocking.
386    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
387        let token = &mut Token::default();
388
389        if self.start_recv(token) {
390            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
391        } else {
392            Err(TryRecvError::Empty)
393        }
394    }
395
396    /// Receives a message from the channel.
397    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
398        let token = &mut Token::default();
399        loop {
400            // Try receiving a message several times.
401            let backoff = Backoff::new();
402            loop {
403                if self.start_recv(token) {
404                    let res = unsafe { self.read(token) };
405                    return res.map_err(|_| RecvTimeoutError::Disconnected);
406                }
407
408                if backoff.is_completed() {
409                    break;
410                } else {
411                    backoff.snooze();
412                }
413            }
414
415            if let Some(d) = deadline {
416                if Instant::now() >= d {
417                    return Err(RecvTimeoutError::Timeout);
418                }
419            }
420
421            Context::with(|cx| {
422                // Prepare for blocking until a sender wakes us up.
423                let oper = Operation::hook(token);
424                self.receivers.register(oper, cx);
425
426                // Has the channel become ready just now?
427                if !self.is_empty() || self.is_disconnected() {
428                    let _ = cx.try_select(Selected::Aborted);
429                }
430
431                // Block the current thread.
432                let sel = cx.wait_until(deadline);
433
434                match sel {
435                    Selected::Waiting => unreachable!(),
436                    Selected::Aborted | Selected::Disconnected => {
437                        self.receivers.unregister(oper).unwrap();
438                        // If the channel was disconnected, we still have to check for remaining
439                        // messages.
440                    }
441                    Selected::Operation(_) => {}
442                }
443            });
444        }
445    }
446
447    /// Returns the current number of messages inside the channel.
448    pub(crate) fn len(&self) -> usize {
449        loop {
450            // Load the tail, then load the head.
451            let tail = self.tail.load(Ordering::SeqCst);
452            let head = self.head.load(Ordering::SeqCst);
453
454            // If the tail didn't change, we've got consistent values to work with.
455            if self.tail.load(Ordering::SeqCst) == tail {
456                let hix = head & (self.mark_bit - 1);
457                let tix = tail & (self.mark_bit - 1);
458
459                return if hix < tix {
460                    tix - hix
461                } else if hix > tix {
462                    self.cap - hix + tix
463                } else if (tail & !self.mark_bit) == head {
464                    0
465                } else {
466                    self.cap
467                };
468            }
469        }
470    }
471
472    /// Returns the capacity of the channel.
473    pub(crate) fn capacity(&self) -> Option<usize> {
474        Some(self.cap)
475    }
476
477    /// Disconnects the channel and wakes up all blocked senders and receivers.
478    ///
479    /// Returns `true` if this call disconnected the channel.
480    pub(crate) fn disconnect(&self) -> bool {
481        let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
482
483        if tail & self.mark_bit == 0 {
484            self.senders.disconnect();
485            self.receivers.disconnect();
486            true
487        } else {
488            false
489        }
490    }
491
492    /// Returns `true` if the channel is disconnected.
493    pub(crate) fn is_disconnected(&self) -> bool {
494        self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
495    }
496
497    /// Returns `true` if the channel is empty.
498    pub(crate) fn is_empty(&self) -> bool {
499        let head = self.head.load(Ordering::SeqCst);
500        let tail = self.tail.load(Ordering::SeqCst);
501
502        // Is the tail equal to the head?
503        //
504        // Note: If the head changes just before we load the tail, that means there was a moment
505        // when the channel was not empty, so it is safe to just return `false`.
506        (tail & !self.mark_bit) == head
507    }
508
509    /// Returns `true` if the channel is full.
510    pub(crate) fn is_full(&self) -> bool {
511        let tail = self.tail.load(Ordering::SeqCst);
512        let head = self.head.load(Ordering::SeqCst);
513
514        // Is the head lagging one lap behind tail?
515        //
516        // Note: If the tail changes just before we load the head, that means there was a moment
517        // when the channel was not full, so it is safe to just return `false`.
518        head.wrapping_add(self.one_lap) == tail & !self.mark_bit
519    }
520}
521
522impl<T> Drop for Channel<T> {
523    fn drop(&mut self) {
524        if mem::needs_drop::<T>() {
525            // Get the index of the head.
526            let head = *self.head.get_mut();
527            let tail = *self.tail.get_mut();
528
529            let hix = head & (self.mark_bit - 1);
530            let tix = tail & (self.mark_bit - 1);
531
532            let len = if hix < tix {
533                tix - hix
534            } else if hix > tix {
535                self.cap - hix + tix
536            } else if (tail & !self.mark_bit) == head {
537                0
538            } else {
539                self.cap
540            };
541
542            // Loop over all slots that hold a message and drop them.
543            for i in 0..len {
544                // Compute the index of the next slot holding a message.
545                let index = if hix + i < self.cap {
546                    hix + i
547                } else {
548                    hix + i - self.cap
549                };
550
551                unsafe {
552                    debug_assert!(index < self.buffer.len());
553                    let slot = self.buffer.get_unchecked_mut(index);
554                    (*slot.msg.get()).assume_init_drop();
555                }
556            }
557        }
558    }
559}
560
561/// Receiver handle to a channel.
562pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
563
564/// Sender handle to a channel.
565pub(crate) struct Sender<'a, T>(&'a Channel<T>);
566
567impl<T> SelectHandle for Receiver<'_, T> {
568    fn try_select(&self, token: &mut Token) -> bool {
569        self.0.start_recv(token)
570    }
571
572    fn deadline(&self) -> Option<Instant> {
573        None
574    }
575
576    fn register(&self, oper: Operation, cx: &Context) -> bool {
577        self.0.receivers.register(oper, cx);
578        self.is_ready()
579    }
580
581    fn unregister(&self, oper: Operation) {
582        self.0.receivers.unregister(oper);
583    }
584
585    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
586        self.try_select(token)
587    }
588
589    fn is_ready(&self) -> bool {
590        !self.0.is_empty() || self.0.is_disconnected()
591    }
592
593    fn watch(&self, oper: Operation, cx: &Context) -> bool {
594        self.0.receivers.watch(oper, cx);
595        self.is_ready()
596    }
597
598    fn unwatch(&self, oper: Operation) {
599        self.0.receivers.unwatch(oper);
600    }
601}
602
603impl<T> SelectHandle for Sender<'_, T> {
604    fn try_select(&self, token: &mut Token) -> bool {
605        self.0.start_send(token)
606    }
607
608    fn deadline(&self) -> Option<Instant> {
609        None
610    }
611
612    fn register(&self, oper: Operation, cx: &Context) -> bool {
613        self.0.senders.register(oper, cx);
614        self.is_ready()
615    }
616
617    fn unregister(&self, oper: Operation) {
618        self.0.senders.unregister(oper);
619    }
620
621    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
622        self.try_select(token)
623    }
624
625    fn is_ready(&self) -> bool {
626        !self.0.is_full() || self.0.is_disconnected()
627    }
628
629    fn watch(&self, oper: Operation, cx: &Context) -> bool {
630        self.0.senders.watch(oper, cx);
631        self.is_ready()
632    }
633
634    fn unwatch(&self, oper: Operation) {
635        self.0.senders.unwatch(oper);
636    }
637}