crossbeam_channel/flavors/
zero.rs

1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use std::boxed::Box;
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10use std::time::Instant;
11use std::{fmt, ptr};
12
13use crossbeam_utils::Backoff;
14
15use crate::context::Context;
16use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
17use crate::select::{Operation, SelectHandle, Selected, Token};
18use crate::waker::Waker;
19
20/// A pointer to a packet.
21pub(crate) struct ZeroToken(*mut ());
22
23impl Default for ZeroToken {
24    fn default() -> Self {
25        Self(ptr::null_mut())
26    }
27}
28
29impl fmt::Debug for ZeroToken {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        fmt::Debug::fmt(&(self.0 as usize), f)
32    }
33}
34
35/// A slot for passing one message from a sender to a receiver.
36struct Packet<T> {
37    /// Equals `true` if the packet is allocated on the stack.
38    on_stack: bool,
39
40    /// Equals `true` once the packet is ready for reading or writing.
41    ready: AtomicBool,
42
43    /// The message.
44    msg: UnsafeCell<Option<T>>,
45}
46
47impl<T> Packet<T> {
48    /// Creates an empty packet on the stack.
49    fn empty_on_stack() -> Packet<T> {
50        Packet {
51            on_stack: true,
52            ready: AtomicBool::new(false),
53            msg: UnsafeCell::new(None),
54        }
55    }
56
57    /// Creates an empty packet on the heap.
58    fn empty_on_heap() -> Box<Packet<T>> {
59        Box::new(Packet {
60            on_stack: false,
61            ready: AtomicBool::new(false),
62            msg: UnsafeCell::new(None),
63        })
64    }
65
66    /// Creates a packet on the stack, containing a message.
67    fn message_on_stack(msg: T) -> Packet<T> {
68        Packet {
69            on_stack: true,
70            ready: AtomicBool::new(false),
71            msg: UnsafeCell::new(Some(msg)),
72        }
73    }
74
75    /// Waits until the packet becomes ready for reading or writing.
76    fn wait_ready(&self) {
77        let backoff = Backoff::new();
78        while !self.ready.load(Ordering::Acquire) {
79            backoff.snooze();
80        }
81    }
82}
83
84/// Inner representation of a zero-capacity channel.
85struct Inner {
86    /// Senders waiting to pair up with a receive operation.
87    senders: Waker,
88
89    /// Receivers waiting to pair up with a send operation.
90    receivers: Waker,
91
92    /// Equals `true` when the channel is disconnected.
93    is_disconnected: bool,
94}
95
96/// Zero-capacity channel.
97pub(crate) struct Channel<T> {
98    /// Inner representation of the channel.
99    inner: Mutex<Inner>,
100
101    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
102    _marker: PhantomData<T>,
103}
104
105impl<T> Channel<T> {
106    /// Constructs a new zero-capacity channel.
107    pub(crate) fn new() -> Self {
108        Channel {
109            inner: Mutex::new(Inner {
110                senders: Waker::new(),
111                receivers: Waker::new(),
112                is_disconnected: false,
113            }),
114            _marker: PhantomData,
115        }
116    }
117
118    /// Returns a receiver handle to the channel.
119    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
120        Receiver(self)
121    }
122
123    /// Returns a sender handle to the channel.
124    pub(crate) fn sender(&self) -> Sender<'_, T> {
125        Sender(self)
126    }
127
128    /// Attempts to reserve a slot for sending a message.
129    fn start_send(&self, token: &mut Token) -> bool {
130        let mut inner = self.inner.lock().unwrap();
131
132        // If there's a waiting receiver, pair up with it.
133        if let Some(operation) = inner.receivers.try_select() {
134            token.zero.0 = operation.packet;
135            true
136        } else if inner.is_disconnected {
137            token.zero.0 = ptr::null_mut();
138            true
139        } else {
140            false
141        }
142    }
143
144    /// Writes a message into the packet.
145    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
146        // If there is no packet, the channel is disconnected.
147        if token.zero.0.is_null() {
148            return Err(msg);
149        }
150
151        let packet = &*(token.zero.0 as *const Packet<T>);
152        packet.msg.get().write(Some(msg));
153        packet.ready.store(true, Ordering::Release);
154        Ok(())
155    }
156
157    /// Attempts to pair up with a sender.
158    fn start_recv(&self, token: &mut Token) -> bool {
159        let mut inner = self.inner.lock().unwrap();
160
161        // If there's a waiting sender, pair up with it.
162        if let Some(operation) = inner.senders.try_select() {
163            token.zero.0 = operation.packet;
164            true
165        } else if inner.is_disconnected {
166            token.zero.0 = ptr::null_mut();
167            true
168        } else {
169            false
170        }
171    }
172
173    /// Reads a message from the packet.
174    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
175        // If there is no packet, the channel is disconnected.
176        if token.zero.0.is_null() {
177            return Err(());
178        }
179
180        let packet = &*(token.zero.0 as *const Packet<T>);
181
182        if packet.on_stack {
183            // The message has been in the packet from the beginning, so there is no need to wait
184            // for it. However, after reading the message, we need to set `ready` to `true` in
185            // order to signal that the packet can be destroyed.
186            let msg = packet.msg.get().replace(None).unwrap();
187            packet.ready.store(true, Ordering::Release);
188            Ok(msg)
189        } else {
190            // Wait until the message becomes available, then read it and destroy the
191            // heap-allocated packet.
192            packet.wait_ready();
193            let msg = packet.msg.get().replace(None).unwrap();
194            drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
195            Ok(msg)
196        }
197    }
198
199    /// Attempts to send a message into the channel.
200    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201        let token = &mut Token::default();
202        let mut inner = self.inner.lock().unwrap();
203
204        // If there's a waiting receiver, pair up with it.
205        if let Some(operation) = inner.receivers.try_select() {
206            token.zero.0 = operation.packet;
207            drop(inner);
208            unsafe {
209                self.write(token, msg).ok().unwrap();
210            }
211            Ok(())
212        } else if inner.is_disconnected {
213            Err(TrySendError::Disconnected(msg))
214        } else {
215            Err(TrySendError::Full(msg))
216        }
217    }
218
219    /// Sends a message into the channel.
220    pub(crate) fn send(
221        &self,
222        msg: T,
223        deadline: Option<Instant>,
224    ) -> Result<(), SendTimeoutError<T>> {
225        let token = &mut Token::default();
226        let mut inner = self.inner.lock().unwrap();
227
228        // If there's a waiting receiver, pair up with it.
229        if let Some(operation) = inner.receivers.try_select() {
230            token.zero.0 = operation.packet;
231            drop(inner);
232            unsafe {
233                self.write(token, msg).ok().unwrap();
234            }
235            return Ok(());
236        }
237
238        if inner.is_disconnected {
239            return Err(SendTimeoutError::Disconnected(msg));
240        }
241
242        Context::with(|cx| {
243            // Prepare for blocking until a receiver wakes us up.
244            let oper = Operation::hook(token);
245            let mut packet = Packet::<T>::message_on_stack(msg);
246            inner
247                .senders
248                .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
249            inner.receivers.notify();
250            drop(inner);
251
252            // Block the current thread.
253            let sel = cx.wait_until(deadline);
254
255            match sel {
256                Selected::Waiting => unreachable!(),
257                Selected::Aborted => {
258                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
259                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
260                    Err(SendTimeoutError::Timeout(msg))
261                }
262                Selected::Disconnected => {
263                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
264                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
265                    Err(SendTimeoutError::Disconnected(msg))
266                }
267                Selected::Operation(_) => {
268                    // Wait until the message is read, then drop the packet.
269                    packet.wait_ready();
270                    Ok(())
271                }
272            }
273        })
274    }
275
276    /// Attempts to receive a message without blocking.
277    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
278        let token = &mut Token::default();
279        let mut inner = self.inner.lock().unwrap();
280
281        // If there's a waiting sender, pair up with it.
282        if let Some(operation) = inner.senders.try_select() {
283            token.zero.0 = operation.packet;
284            drop(inner);
285            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
286        } else if inner.is_disconnected {
287            Err(TryRecvError::Disconnected)
288        } else {
289            Err(TryRecvError::Empty)
290        }
291    }
292
293    /// Receives a message from the channel.
294    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
295        let token = &mut Token::default();
296        let mut inner = self.inner.lock().unwrap();
297
298        // If there's a waiting sender, pair up with it.
299        if let Some(operation) = inner.senders.try_select() {
300            token.zero.0 = operation.packet;
301            drop(inner);
302            unsafe {
303                return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
304            }
305        }
306
307        if inner.is_disconnected {
308            return Err(RecvTimeoutError::Disconnected);
309        }
310
311        Context::with(|cx| {
312            // Prepare for blocking until a sender wakes us up.
313            let oper = Operation::hook(token);
314            let mut packet = Packet::<T>::empty_on_stack();
315            inner.receivers.register_with_packet(
316                oper,
317                &mut packet as *mut Packet<T> as *mut (),
318                cx,
319            );
320            inner.senders.notify();
321            drop(inner);
322
323            // Block the current thread.
324            let sel = cx.wait_until(deadline);
325
326            match sel {
327                Selected::Waiting => unreachable!(),
328                Selected::Aborted => {
329                    self.inner
330                        .lock()
331                        .unwrap()
332                        .receivers
333                        .unregister(oper)
334                        .unwrap();
335                    Err(RecvTimeoutError::Timeout)
336                }
337                Selected::Disconnected => {
338                    self.inner
339                        .lock()
340                        .unwrap()
341                        .receivers
342                        .unregister(oper)
343                        .unwrap();
344                    Err(RecvTimeoutError::Disconnected)
345                }
346                Selected::Operation(_) => {
347                    // Wait until the message is provided, then read it.
348                    packet.wait_ready();
349                    unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
350                }
351            }
352        })
353    }
354
355    /// Disconnects the channel and wakes up all blocked senders and receivers.
356    ///
357    /// Returns `true` if this call disconnected the channel.
358    pub(crate) fn disconnect(&self) -> bool {
359        let mut inner = self.inner.lock().unwrap();
360
361        if !inner.is_disconnected {
362            inner.is_disconnected = true;
363            inner.senders.disconnect();
364            inner.receivers.disconnect();
365            true
366        } else {
367            false
368        }
369    }
370
371    /// Returns the current number of messages inside the channel.
372    pub(crate) fn len(&self) -> usize {
373        0
374    }
375
376    /// Returns the capacity of the channel.
377    pub(crate) fn capacity(&self) -> Option<usize> {
378        Some(0)
379    }
380
381    /// Returns `true` if the channel is empty.
382    pub(crate) fn is_empty(&self) -> bool {
383        true
384    }
385
386    /// Returns `true` if the channel is full.
387    pub(crate) fn is_full(&self) -> bool {
388        true
389    }
390}
391
392/// Receiver handle to a channel.
393pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
394
395/// Sender handle to a channel.
396pub(crate) struct Sender<'a, T>(&'a Channel<T>);
397
398impl<T> SelectHandle for Receiver<'_, T> {
399    fn try_select(&self, token: &mut Token) -> bool {
400        self.0.start_recv(token)
401    }
402
403    fn deadline(&self) -> Option<Instant> {
404        None
405    }
406
407    fn register(&self, oper: Operation, cx: &Context) -> bool {
408        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
409
410        let mut inner = self.0.inner.lock().unwrap();
411        inner
412            .receivers
413            .register_with_packet(oper, packet.cast::<()>(), cx);
414        inner.senders.notify();
415        inner.senders.can_select() || inner.is_disconnected
416    }
417
418    fn unregister(&self, oper: Operation) {
419        if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
420            unsafe {
421                drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
422            }
423        }
424    }
425
426    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
427        token.zero.0 = cx.wait_packet();
428        true
429    }
430
431    fn is_ready(&self) -> bool {
432        let inner = self.0.inner.lock().unwrap();
433        inner.senders.can_select() || inner.is_disconnected
434    }
435
436    fn watch(&self, oper: Operation, cx: &Context) -> bool {
437        let mut inner = self.0.inner.lock().unwrap();
438        inner.receivers.watch(oper, cx);
439        inner.senders.can_select() || inner.is_disconnected
440    }
441
442    fn unwatch(&self, oper: Operation) {
443        let mut inner = self.0.inner.lock().unwrap();
444        inner.receivers.unwatch(oper);
445    }
446}
447
448impl<T> SelectHandle for Sender<'_, T> {
449    fn try_select(&self, token: &mut Token) -> bool {
450        self.0.start_send(token)
451    }
452
453    fn deadline(&self) -> Option<Instant> {
454        None
455    }
456
457    fn register(&self, oper: Operation, cx: &Context) -> bool {
458        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
459
460        let mut inner = self.0.inner.lock().unwrap();
461        inner
462            .senders
463            .register_with_packet(oper, packet.cast::<()>(), cx);
464        inner.receivers.notify();
465        inner.receivers.can_select() || inner.is_disconnected
466    }
467
468    fn unregister(&self, oper: Operation) {
469        if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
470            unsafe {
471                drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
472            }
473        }
474    }
475
476    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
477        token.zero.0 = cx.wait_packet();
478        true
479    }
480
481    fn is_ready(&self) -> bool {
482        let inner = self.0.inner.lock().unwrap();
483        inner.receivers.can_select() || inner.is_disconnected
484    }
485
486    fn watch(&self, oper: Operation, cx: &Context) -> bool {
487        let mut inner = self.0.inner.lock().unwrap();
488        inner.senders.watch(oper, cx);
489        inner.receivers.can_select() || inner.is_disconnected
490    }
491
492    fn unwatch(&self, oper: Operation) {
493        let mut inner = self.0.inner.lock().unwrap();
494        inner.senders.unwatch(oper);
495    }
496}