crossbeam_channel/
counter.rs

1//! Reference counter for channels.
2
3use std::boxed::Box;
4use std::isize;
5use std::ops;
6use std::process;
7use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8
9/// Reference counter internals.
10struct Counter<C> {
11    /// The number of senders associated with the channel.
12    senders: AtomicUsize,
13
14    /// The number of receivers associated with the channel.
15    receivers: AtomicUsize,
16
17    /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
18    destroy: AtomicBool,
19
20    /// The internal channel.
21    chan: C,
22}
23
24/// Wraps a channel into the reference counter.
25pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
26    let counter = Box::into_raw(Box::new(Counter {
27        senders: AtomicUsize::new(1),
28        receivers: AtomicUsize::new(1),
29        destroy: AtomicBool::new(false),
30        chan,
31    }));
32    let s = Sender { counter };
33    let r = Receiver { counter };
34    (s, r)
35}
36
37/// The sending side.
38pub(crate) struct Sender<C> {
39    counter: *mut Counter<C>,
40}
41
42impl<C> Sender<C> {
43    /// Returns the internal `Counter`.
44    fn counter(&self) -> &Counter<C> {
45        unsafe { &*self.counter }
46    }
47
48    /// Acquires another sender reference.
49    pub(crate) fn acquire(&self) -> Sender<C> {
50        let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
51
52        // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
53        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
54        // just abort when the count becomes very large.
55        if count > isize::MAX as usize {
56            process::abort();
57        }
58
59        Sender {
60            counter: self.counter,
61        }
62    }
63
64    /// Releases the sender reference.
65    ///
66    /// Function `disconnect` will be called if this is the last sender reference.
67    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
68        if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
69            disconnect(&self.counter().chan);
70
71            if self.counter().destroy.swap(true, Ordering::AcqRel) {
72                drop(Box::from_raw(self.counter));
73            }
74        }
75    }
76}
77
78impl<C> ops::Deref for Sender<C> {
79    type Target = C;
80
81    fn deref(&self) -> &C {
82        &self.counter().chan
83    }
84}
85
86impl<C> PartialEq for Sender<C> {
87    fn eq(&self, other: &Sender<C>) -> bool {
88        self.counter == other.counter
89    }
90}
91
92/// The receiving side.
93pub(crate) struct Receiver<C> {
94    counter: *mut Counter<C>,
95}
96
97impl<C> Receiver<C> {
98    /// Returns the internal `Counter`.
99    fn counter(&self) -> &Counter<C> {
100        unsafe { &*self.counter }
101    }
102
103    /// Acquires another receiver reference.
104    pub(crate) fn acquire(&self) -> Receiver<C> {
105        let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
106
107        // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
108        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
109        // just abort when the count becomes very large.
110        if count > isize::MAX as usize {
111            process::abort();
112        }
113
114        Receiver {
115            counter: self.counter,
116        }
117    }
118
119    /// Releases the receiver reference.
120    ///
121    /// Function `disconnect` will be called if this is the last receiver reference.
122    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
123        if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
124            disconnect(&self.counter().chan);
125
126            if self.counter().destroy.swap(true, Ordering::AcqRel) {
127                drop(Box::from_raw(self.counter));
128            }
129        }
130    }
131}
132
133impl<C> ops::Deref for Receiver<C> {
134    type Target = C;
135
136    fn deref(&self) -> &C {
137        &self.counter().chan
138    }
139}
140
141impl<C> PartialEq for Receiver<C> {
142    fn eq(&self, other: &Receiver<C>) -> bool {
143        self.counter == other.counter
144    }
145}