crossbeam_channel/flavors/
zero.rs1use std::boxed::Box;
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10use std::time::Instant;
11use std::{fmt, ptr};
12
13use crossbeam_utils::Backoff;
14
15use crate::context::Context;
16use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
17use crate::select::{Operation, SelectHandle, Selected, Token};
18use crate::waker::Waker;
19
20pub(crate) struct ZeroToken(*mut ());
22
23impl Default for ZeroToken {
24 fn default() -> Self {
25 Self(ptr::null_mut())
26 }
27}
28
29impl fmt::Debug for ZeroToken {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 fmt::Debug::fmt(&(self.0 as usize), f)
32 }
33}
34
35struct Packet<T> {
37 on_stack: bool,
39
40 ready: AtomicBool,
42
43 msg: UnsafeCell<Option<T>>,
45}
46
47impl<T> Packet<T> {
48 fn empty_on_stack() -> Packet<T> {
50 Packet {
51 on_stack: true,
52 ready: AtomicBool::new(false),
53 msg: UnsafeCell::new(None),
54 }
55 }
56
57 fn empty_on_heap() -> Box<Packet<T>> {
59 Box::new(Packet {
60 on_stack: false,
61 ready: AtomicBool::new(false),
62 msg: UnsafeCell::new(None),
63 })
64 }
65
66 fn message_on_stack(msg: T) -> Packet<T> {
68 Packet {
69 on_stack: true,
70 ready: AtomicBool::new(false),
71 msg: UnsafeCell::new(Some(msg)),
72 }
73 }
74
75 fn wait_ready(&self) {
77 let backoff = Backoff::new();
78 while !self.ready.load(Ordering::Acquire) {
79 backoff.snooze();
80 }
81 }
82}
83
84struct Inner {
86 senders: Waker,
88
89 receivers: Waker,
91
92 is_disconnected: bool,
94}
95
96pub(crate) struct Channel<T> {
98 inner: Mutex<Inner>,
100
101 _marker: PhantomData<T>,
103}
104
105impl<T> Channel<T> {
106 pub(crate) fn new() -> Self {
108 Channel {
109 inner: Mutex::new(Inner {
110 senders: Waker::new(),
111 receivers: Waker::new(),
112 is_disconnected: false,
113 }),
114 _marker: PhantomData,
115 }
116 }
117
118 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
120 Receiver(self)
121 }
122
123 pub(crate) fn sender(&self) -> Sender<'_, T> {
125 Sender(self)
126 }
127
128 fn start_send(&self, token: &mut Token) -> bool {
130 let mut inner = self.inner.lock().unwrap();
131
132 if let Some(operation) = inner.receivers.try_select() {
134 token.zero.0 = operation.packet;
135 true
136 } else if inner.is_disconnected {
137 token.zero.0 = ptr::null_mut();
138 true
139 } else {
140 false
141 }
142 }
143
144 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
146 if token.zero.0.is_null() {
148 return Err(msg);
149 }
150
151 let packet = &*(token.zero.0 as *const Packet<T>);
152 packet.msg.get().write(Some(msg));
153 packet.ready.store(true, Ordering::Release);
154 Ok(())
155 }
156
157 fn start_recv(&self, token: &mut Token) -> bool {
159 let mut inner = self.inner.lock().unwrap();
160
161 if let Some(operation) = inner.senders.try_select() {
163 token.zero.0 = operation.packet;
164 true
165 } else if inner.is_disconnected {
166 token.zero.0 = ptr::null_mut();
167 true
168 } else {
169 false
170 }
171 }
172
173 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
175 if token.zero.0.is_null() {
177 return Err(());
178 }
179
180 let packet = &*(token.zero.0 as *const Packet<T>);
181
182 if packet.on_stack {
183 let msg = packet.msg.get().replace(None).unwrap();
187 packet.ready.store(true, Ordering::Release);
188 Ok(msg)
189 } else {
190 packet.wait_ready();
193 let msg = packet.msg.get().replace(None).unwrap();
194 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
195 Ok(msg)
196 }
197 }
198
199 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201 let token = &mut Token::default();
202 let mut inner = self.inner.lock().unwrap();
203
204 if let Some(operation) = inner.receivers.try_select() {
206 token.zero.0 = operation.packet;
207 drop(inner);
208 unsafe {
209 self.write(token, msg).ok().unwrap();
210 }
211 Ok(())
212 } else if inner.is_disconnected {
213 Err(TrySendError::Disconnected(msg))
214 } else {
215 Err(TrySendError::Full(msg))
216 }
217 }
218
219 pub(crate) fn send(
221 &self,
222 msg: T,
223 deadline: Option<Instant>,
224 ) -> Result<(), SendTimeoutError<T>> {
225 let token = &mut Token::default();
226 let mut inner = self.inner.lock().unwrap();
227
228 if let Some(operation) = inner.receivers.try_select() {
230 token.zero.0 = operation.packet;
231 drop(inner);
232 unsafe {
233 self.write(token, msg).ok().unwrap();
234 }
235 return Ok(());
236 }
237
238 if inner.is_disconnected {
239 return Err(SendTimeoutError::Disconnected(msg));
240 }
241
242 Context::with(|cx| {
243 let oper = Operation::hook(token);
245 let mut packet = Packet::<T>::message_on_stack(msg);
246 inner
247 .senders
248 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
249 inner.receivers.notify();
250 drop(inner);
251
252 let sel = cx.wait_until(deadline);
254
255 match sel {
256 Selected::Waiting => unreachable!(),
257 Selected::Aborted => {
258 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
259 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
260 Err(SendTimeoutError::Timeout(msg))
261 }
262 Selected::Disconnected => {
263 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
264 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
265 Err(SendTimeoutError::Disconnected(msg))
266 }
267 Selected::Operation(_) => {
268 packet.wait_ready();
270 Ok(())
271 }
272 }
273 })
274 }
275
276 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
278 let token = &mut Token::default();
279 let mut inner = self.inner.lock().unwrap();
280
281 if let Some(operation) = inner.senders.try_select() {
283 token.zero.0 = operation.packet;
284 drop(inner);
285 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
286 } else if inner.is_disconnected {
287 Err(TryRecvError::Disconnected)
288 } else {
289 Err(TryRecvError::Empty)
290 }
291 }
292
293 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
295 let token = &mut Token::default();
296 let mut inner = self.inner.lock().unwrap();
297
298 if let Some(operation) = inner.senders.try_select() {
300 token.zero.0 = operation.packet;
301 drop(inner);
302 unsafe {
303 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
304 }
305 }
306
307 if inner.is_disconnected {
308 return Err(RecvTimeoutError::Disconnected);
309 }
310
311 Context::with(|cx| {
312 let oper = Operation::hook(token);
314 let mut packet = Packet::<T>::empty_on_stack();
315 inner.receivers.register_with_packet(
316 oper,
317 &mut packet as *mut Packet<T> as *mut (),
318 cx,
319 );
320 inner.senders.notify();
321 drop(inner);
322
323 let sel = cx.wait_until(deadline);
325
326 match sel {
327 Selected::Waiting => unreachable!(),
328 Selected::Aborted => {
329 self.inner
330 .lock()
331 .unwrap()
332 .receivers
333 .unregister(oper)
334 .unwrap();
335 Err(RecvTimeoutError::Timeout)
336 }
337 Selected::Disconnected => {
338 self.inner
339 .lock()
340 .unwrap()
341 .receivers
342 .unregister(oper)
343 .unwrap();
344 Err(RecvTimeoutError::Disconnected)
345 }
346 Selected::Operation(_) => {
347 packet.wait_ready();
349 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
350 }
351 }
352 })
353 }
354
355 pub(crate) fn disconnect(&self) -> bool {
359 let mut inner = self.inner.lock().unwrap();
360
361 if !inner.is_disconnected {
362 inner.is_disconnected = true;
363 inner.senders.disconnect();
364 inner.receivers.disconnect();
365 true
366 } else {
367 false
368 }
369 }
370
371 pub(crate) fn len(&self) -> usize {
373 0
374 }
375
376 pub(crate) fn capacity(&self) -> Option<usize> {
378 Some(0)
379 }
380
381 pub(crate) fn is_empty(&self) -> bool {
383 true
384 }
385
386 pub(crate) fn is_full(&self) -> bool {
388 true
389 }
390}
391
392pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
394
395pub(crate) struct Sender<'a, T>(&'a Channel<T>);
397
398impl<T> SelectHandle for Receiver<'_, T> {
399 fn try_select(&self, token: &mut Token) -> bool {
400 self.0.start_recv(token)
401 }
402
403 fn deadline(&self) -> Option<Instant> {
404 None
405 }
406
407 fn register(&self, oper: Operation, cx: &Context) -> bool {
408 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
409
410 let mut inner = self.0.inner.lock().unwrap();
411 inner
412 .receivers
413 .register_with_packet(oper, packet.cast::<()>(), cx);
414 inner.senders.notify();
415 inner.senders.can_select() || inner.is_disconnected
416 }
417
418 fn unregister(&self, oper: Operation) {
419 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
420 unsafe {
421 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
422 }
423 }
424 }
425
426 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
427 token.zero.0 = cx.wait_packet();
428 true
429 }
430
431 fn is_ready(&self) -> bool {
432 let inner = self.0.inner.lock().unwrap();
433 inner.senders.can_select() || inner.is_disconnected
434 }
435
436 fn watch(&self, oper: Operation, cx: &Context) -> bool {
437 let mut inner = self.0.inner.lock().unwrap();
438 inner.receivers.watch(oper, cx);
439 inner.senders.can_select() || inner.is_disconnected
440 }
441
442 fn unwatch(&self, oper: Operation) {
443 let mut inner = self.0.inner.lock().unwrap();
444 inner.receivers.unwatch(oper);
445 }
446}
447
448impl<T> SelectHandle for Sender<'_, T> {
449 fn try_select(&self, token: &mut Token) -> bool {
450 self.0.start_send(token)
451 }
452
453 fn deadline(&self) -> Option<Instant> {
454 None
455 }
456
457 fn register(&self, oper: Operation, cx: &Context) -> bool {
458 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
459
460 let mut inner = self.0.inner.lock().unwrap();
461 inner
462 .senders
463 .register_with_packet(oper, packet.cast::<()>(), cx);
464 inner.receivers.notify();
465 inner.receivers.can_select() || inner.is_disconnected
466 }
467
468 fn unregister(&self, oper: Operation) {
469 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
470 unsafe {
471 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
472 }
473 }
474 }
475
476 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
477 token.zero.0 = cx.wait_packet();
478 true
479 }
480
481 fn is_ready(&self) -> bool {
482 let inner = self.0.inner.lock().unwrap();
483 inner.receivers.can_select() || inner.is_disconnected
484 }
485
486 fn watch(&self, oper: Operation, cx: &Context) -> bool {
487 let mut inner = self.0.inner.lock().unwrap();
488 inner.senders.watch(oper, cx);
489 inner.receivers.can_select() || inner.is_disconnected
490 }
491
492 fn unwatch(&self, oper: Operation) {
493 let mut inner = self.0.inner.lock().unwrap();
494 inner.senders.unwatch(oper);
495 }
496}