crossbeam_channel/flavors/
tick.rs

1//! Channel that delivers messages periodically.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::atomic::AtomicCell;
9
10use crate::context::Context;
11use crate::err::{RecvTimeoutError, TryRecvError};
12use crate::select::{Operation, SelectHandle, Token};
13
14/// Result of a receive operation.
15pub(crate) type TickToken = Option<Instant>;
16
17/// Channel that delivers messages periodically.
18pub(crate) struct Channel {
19    /// The instant at which the next message will be delivered.
20    delivery_time: AtomicCell<Instant>,
21
22    /// The time interval in which messages get delivered.
23    duration: Duration,
24}
25
26impl Channel {
27    /// Creates a channel that delivers messages periodically.
28    #[inline]
29    pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
30        Channel {
31            delivery_time: AtomicCell::new(delivery_time),
32            duration: dur,
33        }
34    }
35
36    /// Attempts to receive a message without blocking.
37    #[inline]
38    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39        loop {
40            let now = Instant::now();
41            let delivery_time = self.delivery_time.load();
42
43            if now < delivery_time {
44                return Err(TryRecvError::Empty);
45            }
46
47            if self
48                .delivery_time
49                .compare_exchange(delivery_time, now + self.duration)
50                .is_ok()
51            {
52                return Ok(delivery_time);
53            }
54        }
55    }
56
57    /// Receives a message from the channel.
58    #[inline]
59    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60        loop {
61            let delivery_time = self.delivery_time.load();
62            let now = Instant::now();
63
64            if let Some(d) = deadline {
65                if d < delivery_time {
66                    if now < d {
67                        thread::sleep(d - now);
68                    }
69                    return Err(RecvTimeoutError::Timeout);
70                }
71            }
72
73            if self
74                .delivery_time
75                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76                .is_ok()
77            {
78                if now < delivery_time {
79                    thread::sleep(delivery_time - now);
80                }
81                return Ok(delivery_time);
82            }
83        }
84    }
85
86    /// Reads a message from the channel.
87    #[inline]
88    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89        token.tick.ok_or(())
90    }
91
92    /// Returns `true` if the channel is empty.
93    #[inline]
94    pub(crate) fn is_empty(&self) -> bool {
95        Instant::now() < self.delivery_time.load()
96    }
97
98    /// Returns `true` if the channel is full.
99    #[inline]
100    pub(crate) fn is_full(&self) -> bool {
101        !self.is_empty()
102    }
103
104    /// Returns the number of messages in the channel.
105    #[inline]
106    pub(crate) fn len(&self) -> usize {
107        if self.is_empty() {
108            0
109        } else {
110            1
111        }
112    }
113
114    /// Returns the capacity of the channel.
115    #[inline]
116    pub(crate) fn capacity(&self) -> Option<usize> {
117        Some(1)
118    }
119}
120
121impl SelectHandle for Channel {
122    #[inline]
123    fn try_select(&self, token: &mut Token) -> bool {
124        match self.try_recv() {
125            Ok(msg) => {
126                token.tick = Some(msg);
127                true
128            }
129            Err(TryRecvError::Disconnected) => {
130                token.tick = None;
131                true
132            }
133            Err(TryRecvError::Empty) => false,
134        }
135    }
136
137    #[inline]
138    fn deadline(&self) -> Option<Instant> {
139        Some(self.delivery_time.load())
140    }
141
142    #[inline]
143    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
144        self.is_ready()
145    }
146
147    #[inline]
148    fn unregister(&self, _oper: Operation) {}
149
150    #[inline]
151    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
152        self.try_select(token)
153    }
154
155    #[inline]
156    fn is_ready(&self) -> bool {
157        !self.is_empty()
158    }
159
160    #[inline]
161    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
162        self.is_ready()
163    }
164
165    #[inline]
166    fn unwatch(&self, _oper: Operation) {}
167}