crossbeam_channel/flavors/
at.rs1use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::Instant;
8
9use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
13
14pub(crate) type AtToken = Option<Instant>;
16
17pub(crate) struct Channel {
19 delivery_time: Instant,
21
22 received: AtomicBool,
24}
25
26impl Channel {
27 #[inline]
29 pub(crate) fn new_deadline(when: Instant) -> Self {
30 Channel {
31 delivery_time: when,
32 received: AtomicBool::new(false),
33 }
34 }
35
36 #[inline]
38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39 if self.received.load(Ordering::Relaxed) {
41 return Err(TryRecvError::Empty);
43 }
44
45 if Instant::now() < self.delivery_time {
46 return Err(TryRecvError::Empty);
48 }
49
50 if !self.received.swap(true, Ordering::SeqCst) {
52 Ok(self.delivery_time)
54 } else {
55 Err(TryRecvError::Empty)
57 }
58 }
59
60 #[inline]
62 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63 if self.received.load(Ordering::Relaxed) {
65 utils::sleep_until(deadline);
67 return Err(RecvTimeoutError::Timeout);
68 }
69
70 loop {
72 let now = Instant::now();
73
74 let deadline = match deadline {
75 _ if now >= self.delivery_time => break,
77 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
79
80 Some(d) if d < self.delivery_time => d,
82 _ => self.delivery_time,
83 };
84
85 thread::sleep(deadline - now);
86 }
87
88 if !self.received.swap(true, Ordering::SeqCst) {
90 Ok(self.delivery_time)
92 } else {
93 utils::sleep_until(None);
95 unreachable!()
96 }
97 }
98
99 #[inline]
101 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
102 token.at.ok_or(())
103 }
104
105 #[inline]
107 pub(crate) fn is_empty(&self) -> bool {
108 if self.received.load(Ordering::Relaxed) {
110 return true;
111 }
112
113 if Instant::now() < self.delivery_time {
115 return true;
116 }
117
118 self.received.load(Ordering::SeqCst)
121 }
122
123 #[inline]
125 pub(crate) fn is_full(&self) -> bool {
126 !self.is_empty()
127 }
128
129 #[inline]
131 pub(crate) fn len(&self) -> usize {
132 if self.is_empty() {
133 0
134 } else {
135 1
136 }
137 }
138
139 #[inline]
141 pub(crate) fn capacity(&self) -> Option<usize> {
142 Some(1)
143 }
144}
145
146impl SelectHandle for Channel {
147 #[inline]
148 fn try_select(&self, token: &mut Token) -> bool {
149 match self.try_recv() {
150 Ok(msg) => {
151 token.at = Some(msg);
152 true
153 }
154 Err(TryRecvError::Disconnected) => {
155 token.at = None;
156 true
157 }
158 Err(TryRecvError::Empty) => false,
159 }
160 }
161
162 #[inline]
163 fn deadline(&self) -> Option<Instant> {
164 if self.received.load(Ordering::Relaxed) {
166 None
167 } else {
168 Some(self.delivery_time)
169 }
170 }
171
172 #[inline]
173 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174 self.is_ready()
175 }
176
177 #[inline]
178 fn unregister(&self, _oper: Operation) {}
179
180 #[inline]
181 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
182 self.try_select(token)
183 }
184
185 #[inline]
186 fn is_ready(&self) -> bool {
187 !self.is_empty()
188 }
189
190 #[inline]
191 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
192 self.is_ready()
193 }
194
195 #[inline]
196 fn unwatch(&self, _oper: Operation) {}
197}