crossbeam_channel/flavors/
at.rs

1//! Channel that delivers a message at a certain moment in time.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::Instant;
8
9use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
13
14/// Result of a receive operation.
15pub(crate) type AtToken = Option<Instant>;
16
17/// Channel that delivers a message at a certain moment in time
18pub(crate) struct Channel {
19    /// The instant at which the message will be delivered.
20    delivery_time: Instant,
21
22    /// `true` if the message has been received.
23    received: AtomicBool,
24}
25
26impl Channel {
27    /// Creates a channel that delivers a message at a certain instant in time.
28    #[inline]
29    pub(crate) fn new_deadline(when: Instant) -> Self {
30        Channel {
31            delivery_time: when,
32            received: AtomicBool::new(false),
33        }
34    }
35
36    /// Attempts to receive a message without blocking.
37    #[inline]
38    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39        // We use relaxed ordering because this is just an optional optimistic check.
40        if self.received.load(Ordering::Relaxed) {
41            // The message has already been received.
42            return Err(TryRecvError::Empty);
43        }
44
45        if Instant::now() < self.delivery_time {
46            // The message was not delivered yet.
47            return Err(TryRecvError::Empty);
48        }
49
50        // Try receiving the message if it is still available.
51        if !self.received.swap(true, Ordering::SeqCst) {
52            // Success! Return delivery time as the message.
53            Ok(self.delivery_time)
54        } else {
55            // The message was already received.
56            Err(TryRecvError::Empty)
57        }
58    }
59
60    /// Receives a message from the channel.
61    #[inline]
62    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63        // We use relaxed ordering because this is just an optional optimistic check.
64        if self.received.load(Ordering::Relaxed) {
65            // The message has already been received.
66            utils::sleep_until(deadline);
67            return Err(RecvTimeoutError::Timeout);
68        }
69
70        // Wait until the message is received or the deadline is reached.
71        loop {
72            let now = Instant::now();
73
74            let deadline = match deadline {
75                // Check if we can receive the next message.
76                _ if now >= self.delivery_time => break,
77                // Check if the timeout deadline has been reached.
78                Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
79
80                // Sleep until one of the above happens
81                Some(d) if d < self.delivery_time => d,
82                _ => self.delivery_time,
83            };
84
85            thread::sleep(deadline - now);
86        }
87
88        // Try receiving the message if it is still available.
89        if !self.received.swap(true, Ordering::SeqCst) {
90            // Success! Return the message, which is the instant at which it was delivered.
91            Ok(self.delivery_time)
92        } else {
93            // The message was already received. Block forever.
94            utils::sleep_until(None);
95            unreachable!()
96        }
97    }
98
99    /// Reads a message from the channel.
100    #[inline]
101    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
102        token.at.ok_or(())
103    }
104
105    /// Returns `true` if the channel is empty.
106    #[inline]
107    pub(crate) fn is_empty(&self) -> bool {
108        // We use relaxed ordering because this is just an optional optimistic check.
109        if self.received.load(Ordering::Relaxed) {
110            return true;
111        }
112
113        // If the delivery time hasn't been reached yet, the channel is empty.
114        if Instant::now() < self.delivery_time {
115            return true;
116        }
117
118        // The delivery time has been reached. The channel is empty only if the message has already
119        // been received.
120        self.received.load(Ordering::SeqCst)
121    }
122
123    /// Returns `true` if the channel is full.
124    #[inline]
125    pub(crate) fn is_full(&self) -> bool {
126        !self.is_empty()
127    }
128
129    /// Returns the number of messages in the channel.
130    #[inline]
131    pub(crate) fn len(&self) -> usize {
132        if self.is_empty() {
133            0
134        } else {
135            1
136        }
137    }
138
139    /// Returns the capacity of the channel.
140    #[inline]
141    pub(crate) fn capacity(&self) -> Option<usize> {
142        Some(1)
143    }
144}
145
146impl SelectHandle for Channel {
147    #[inline]
148    fn try_select(&self, token: &mut Token) -> bool {
149        match self.try_recv() {
150            Ok(msg) => {
151                token.at = Some(msg);
152                true
153            }
154            Err(TryRecvError::Disconnected) => {
155                token.at = None;
156                true
157            }
158            Err(TryRecvError::Empty) => false,
159        }
160    }
161
162    #[inline]
163    fn deadline(&self) -> Option<Instant> {
164        // We use relaxed ordering because this is just an optional optimistic check.
165        if self.received.load(Ordering::Relaxed) {
166            None
167        } else {
168            Some(self.delivery_time)
169        }
170    }
171
172    #[inline]
173    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174        self.is_ready()
175    }
176
177    #[inline]
178    fn unregister(&self, _oper: Operation) {}
179
180    #[inline]
181    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
182        self.try_select(token)
183    }
184
185    #[inline]
186    fn is_ready(&self) -> bool {
187        !self.is_empty()
188    }
189
190    #[inline]
191    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
192        self.is_ready()
193    }
194
195    #[inline]
196    fn unwatch(&self, _oper: Operation) {}
197}