crossbeam_channel/flavors/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
4use std::boxed::Box;
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::ptr;
9use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10use std::time::Instant;
11
12use crossbeam_utils::{Backoff, CachePadded};
13
14use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::SyncWaker;
18
19// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
20// following changes by @kleimkuhler:
21//
22// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
23// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
24
25// Bits indicating the state of a slot:
26// * If a message has been written into the slot, `WRITE` is set.
27// * If a message has been read from the slot, `READ` is set.
28// * If the block is being destroyed, `DESTROY` is set.
29const WRITE: usize = 1;
30const READ: usize = 2;
31const DESTROY: usize = 4;
32
33// Each block covers one "lap" of indices.
34const LAP: usize = 32;
35// The maximum number of messages a block can hold.
36const BLOCK_CAP: usize = LAP - 1;
37// How many lower bits are reserved for metadata.
38const SHIFT: usize = 1;
39// Has two different purposes:
40// * If set in head, indicates that the block is not the last one.
41// * If set in tail, indicates that the channel is disconnected.
42const MARK_BIT: usize = 1;
43
44/// A slot in a block.
45struct Slot<T> {
46    /// The message.
47    msg: UnsafeCell<MaybeUninit<T>>,
48
49    /// The state of the slot.
50    state: AtomicUsize,
51}
52
53impl<T> Slot<T> {
54    /// Waits until a message is written into the slot.
55    fn wait_write(&self) {
56        let backoff = Backoff::new();
57        while self.state.load(Ordering::Acquire) & WRITE == 0 {
58            backoff.snooze();
59        }
60    }
61}
62
63/// A block in a linked list.
64///
65/// Each block in the list can hold up to `BLOCK_CAP` messages.
66struct Block<T> {
67    /// The next block in the linked list.
68    next: AtomicPtr<Block<T>>,
69
70    /// Slots for messages.
71    slots: [Slot<T>; BLOCK_CAP],
72}
73
74impl<T> Block<T> {
75    const LAYOUT: Layout = {
76        let layout = Layout::new::<Self>();
77        assert!(
78            layout.size() != 0,
79            "Block should never be zero-sized, as it has an AtomicPtr field"
80        );
81        layout
82    };
83
84    /// Creates an empty block.
85    fn new() -> Box<Self> {
86        // SAFETY: layout is not zero-sized
87        let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
88        // Handle allocation failure
89        if ptr.is_null() {
90            handle_alloc_error(Self::LAYOUT)
91        }
92        // SAFETY: This is safe because:
93        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
94        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
95        //  [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
96        //       holds a MaybeUninit.
97        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
98        // TODO: unsafe { Box::new_zeroed().assume_init() }
99        unsafe { Box::from_raw(ptr.cast()) }
100    }
101
102    /// Waits until the next pointer is set.
103    fn wait_next(&self) -> *mut Block<T> {
104        let backoff = Backoff::new();
105        loop {
106            let next = self.next.load(Ordering::Acquire);
107            if !next.is_null() {
108                return next;
109            }
110            backoff.snooze();
111        }
112    }
113
114    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
115    unsafe fn destroy(this: *mut Block<T>, start: usize) {
116        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
117        // begun destruction of the block.
118        for i in start..BLOCK_CAP - 1 {
119            let slot = (*this).slots.get_unchecked(i);
120
121            // Mark the `DESTROY` bit if a thread is still using the slot.
122            if slot.state.load(Ordering::Acquire) & READ == 0
123                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
124            {
125                // If a thread is still using the slot, it will continue destruction of the block.
126                return;
127            }
128        }
129
130        // No thread is using the block, now it is safe to destroy it.
131        drop(Box::from_raw(this));
132    }
133}
134
135/// A position in a channel.
136#[derive(Debug)]
137struct Position<T> {
138    /// The index in the channel.
139    index: AtomicUsize,
140
141    /// The block in the linked list.
142    block: AtomicPtr<Block<T>>,
143}
144
145/// The token type for the list flavor.
146#[derive(Debug)]
147pub(crate) struct ListToken {
148    /// The block of slots.
149    block: *const u8,
150
151    /// The offset into the block.
152    offset: usize,
153}
154
155impl Default for ListToken {
156    #[inline]
157    fn default() -> Self {
158        ListToken {
159            block: ptr::null(),
160            offset: 0,
161        }
162    }
163}
164
165/// Unbounded channel implemented as a linked list.
166///
167/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
168/// represented as numbers of type `usize` and wrap on overflow.
169///
170/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
171/// improve cache efficiency.
172pub(crate) struct Channel<T> {
173    /// The head of the channel.
174    head: CachePadded<Position<T>>,
175
176    /// The tail of the channel.
177    tail: CachePadded<Position<T>>,
178
179    /// Receivers waiting while the channel is empty and not disconnected.
180    receivers: SyncWaker,
181
182    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
183    _marker: PhantomData<T>,
184}
185
186impl<T> Channel<T> {
187    /// Creates a new unbounded channel.
188    pub(crate) fn new() -> Self {
189        Channel {
190            head: CachePadded::new(Position {
191                block: AtomicPtr::new(ptr::null_mut()),
192                index: AtomicUsize::new(0),
193            }),
194            tail: CachePadded::new(Position {
195                block: AtomicPtr::new(ptr::null_mut()),
196                index: AtomicUsize::new(0),
197            }),
198            receivers: SyncWaker::new(),
199            _marker: PhantomData,
200        }
201    }
202
203    /// Returns a receiver handle to the channel.
204    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
205        Receiver(self)
206    }
207
208    /// Returns a sender handle to the channel.
209    pub(crate) fn sender(&self) -> Sender<'_, T> {
210        Sender(self)
211    }
212
213    /// Attempts to reserve a slot for sending a message.
214    fn start_send(&self, token: &mut Token) -> bool {
215        let backoff = Backoff::new();
216        let mut tail = self.tail.index.load(Ordering::Acquire);
217        let mut block = self.tail.block.load(Ordering::Acquire);
218        let mut next_block = None;
219
220        loop {
221            // Check if the channel is disconnected.
222            if tail & MARK_BIT != 0 {
223                token.list.block = ptr::null();
224                return true;
225            }
226
227            // Calculate the offset of the index into the block.
228            let offset = (tail >> SHIFT) % LAP;
229
230            // If we reached the end of the block, wait until the next one is installed.
231            if offset == BLOCK_CAP {
232                backoff.snooze();
233                tail = self.tail.index.load(Ordering::Acquire);
234                block = self.tail.block.load(Ordering::Acquire);
235                continue;
236            }
237
238            // If we're going to have to install the next block, allocate it in advance in order to
239            // make the wait for other threads as short as possible.
240            if offset + 1 == BLOCK_CAP && next_block.is_none() {
241                next_block = Some(Block::<T>::new());
242            }
243
244            // If this is the first message to be sent into the channel, we need to allocate the
245            // first block and install it.
246            if block.is_null() {
247                let new = Box::into_raw(Block::<T>::new());
248
249                if self
250                    .tail
251                    .block
252                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
253                    .is_ok()
254                {
255                    self.head.block.store(new, Ordering::Release);
256                    block = new;
257                } else {
258                    next_block = unsafe { Some(Box::from_raw(new)) };
259                    tail = self.tail.index.load(Ordering::Acquire);
260                    block = self.tail.block.load(Ordering::Acquire);
261                    continue;
262                }
263            }
264
265            let new_tail = tail + (1 << SHIFT);
266
267            // Try advancing the tail forward.
268            match self.tail.index.compare_exchange_weak(
269                tail,
270                new_tail,
271                Ordering::SeqCst,
272                Ordering::Acquire,
273            ) {
274                Ok(_) => unsafe {
275                    // If we've reached the end of the block, install the next one.
276                    if offset + 1 == BLOCK_CAP {
277                        let next_block = Box::into_raw(next_block.unwrap());
278                        self.tail.block.store(next_block, Ordering::Release);
279                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
280                        (*block).next.store(next_block, Ordering::Release);
281                    }
282
283                    token.list.block = block as *const u8;
284                    token.list.offset = offset;
285                    return true;
286                },
287                Err(t) => {
288                    tail = t;
289                    block = self.tail.block.load(Ordering::Acquire);
290                    backoff.spin();
291                }
292            }
293        }
294    }
295
296    /// Writes a message into the channel.
297    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
298        // If there is no slot, the channel is disconnected.
299        if token.list.block.is_null() {
300            return Err(msg);
301        }
302
303        // Write the message into the slot.
304        let block = token.list.block.cast::<Block<T>>();
305        let offset = token.list.offset;
306        let slot = (*block).slots.get_unchecked(offset);
307        slot.msg.get().write(MaybeUninit::new(msg));
308        slot.state.fetch_or(WRITE, Ordering::Release);
309
310        // Wake a sleeping receiver.
311        self.receivers.notify();
312        Ok(())
313    }
314
315    /// Attempts to reserve a slot for receiving a message.
316    fn start_recv(&self, token: &mut Token) -> bool {
317        let backoff = Backoff::new();
318        let mut head = self.head.index.load(Ordering::Acquire);
319        let mut block = self.head.block.load(Ordering::Acquire);
320
321        loop {
322            // Calculate the offset of the index into the block.
323            let offset = (head >> SHIFT) % LAP;
324
325            // If we reached the end of the block, wait until the next one is installed.
326            if offset == BLOCK_CAP {
327                backoff.snooze();
328                head = self.head.index.load(Ordering::Acquire);
329                block = self.head.block.load(Ordering::Acquire);
330                continue;
331            }
332
333            let mut new_head = head + (1 << SHIFT);
334
335            if new_head & MARK_BIT == 0 {
336                atomic::fence(Ordering::SeqCst);
337                let tail = self.tail.index.load(Ordering::Relaxed);
338
339                // If the tail equals the head, that means the channel is empty.
340                if head >> SHIFT == tail >> SHIFT {
341                    // If the channel is disconnected...
342                    if tail & MARK_BIT != 0 {
343                        // ...then receive an error.
344                        token.list.block = ptr::null();
345                        return true;
346                    } else {
347                        // Otherwise, the receive operation is not ready.
348                        return false;
349                    }
350                }
351
352                // If head and tail are not in the same block, set `MARK_BIT` in head.
353                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
354                    new_head |= MARK_BIT;
355                }
356            }
357
358            // The block can be null here only if the first message is being sent into the channel.
359            // In that case, just wait until it gets initialized.
360            if block.is_null() {
361                backoff.snooze();
362                head = self.head.index.load(Ordering::Acquire);
363                block = self.head.block.load(Ordering::Acquire);
364                continue;
365            }
366
367            // Try moving the head index forward.
368            match self.head.index.compare_exchange_weak(
369                head,
370                new_head,
371                Ordering::SeqCst,
372                Ordering::Acquire,
373            ) {
374                Ok(_) => unsafe {
375                    // If we've reached the end of the block, move to the next one.
376                    if offset + 1 == BLOCK_CAP {
377                        let next = (*block).wait_next();
378                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
379                        if !(*next).next.load(Ordering::Relaxed).is_null() {
380                            next_index |= MARK_BIT;
381                        }
382
383                        self.head.block.store(next, Ordering::Release);
384                        self.head.index.store(next_index, Ordering::Release);
385                    }
386
387                    token.list.block = block as *const u8;
388                    token.list.offset = offset;
389                    return true;
390                },
391                Err(h) => {
392                    head = h;
393                    block = self.head.block.load(Ordering::Acquire);
394                    backoff.spin();
395                }
396            }
397        }
398    }
399
400    /// Reads a message from the channel.
401    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
402        if token.list.block.is_null() {
403            // The channel is disconnected.
404            return Err(());
405        }
406
407        // Read the message.
408        let block = token.list.block as *mut Block<T>;
409        let offset = token.list.offset;
410        let slot = (*block).slots.get_unchecked(offset);
411        slot.wait_write();
412        let msg = slot.msg.get().read().assume_init();
413
414        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
415        // couldn't because we were busy reading from the slot.
416        if offset + 1 == BLOCK_CAP {
417            Block::destroy(block, 0);
418        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
419            Block::destroy(block, offset + 1);
420        }
421
422        Ok(msg)
423    }
424
425    /// Attempts to send a message into the channel.
426    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
427        self.send(msg, None).map_err(|err| match err {
428            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
429            SendTimeoutError::Timeout(_) => unreachable!(),
430        })
431    }
432
433    /// Sends a message into the channel.
434    pub(crate) fn send(
435        &self,
436        msg: T,
437        _deadline: Option<Instant>,
438    ) -> Result<(), SendTimeoutError<T>> {
439        let token = &mut Token::default();
440        assert!(self.start_send(token));
441        unsafe {
442            self.write(token, msg)
443                .map_err(SendTimeoutError::Disconnected)
444        }
445    }
446
447    /// Attempts to receive a message without blocking.
448    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
449        let token = &mut Token::default();
450
451        if self.start_recv(token) {
452            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
453        } else {
454            Err(TryRecvError::Empty)
455        }
456    }
457
458    /// Receives a message from the channel.
459    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
460        let token = &mut Token::default();
461        loop {
462            // Try receiving a message several times.
463            let backoff = Backoff::new();
464            loop {
465                if self.start_recv(token) {
466                    unsafe {
467                        return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
468                    }
469                }
470
471                if backoff.is_completed() {
472                    break;
473                } else {
474                    backoff.snooze();
475                }
476            }
477
478            if let Some(d) = deadline {
479                if Instant::now() >= d {
480                    return Err(RecvTimeoutError::Timeout);
481                }
482            }
483
484            // Prepare for blocking until a sender wakes us up.
485            Context::with(|cx| {
486                let oper = Operation::hook(token);
487                self.receivers.register(oper, cx);
488
489                // Has the channel become ready just now?
490                if !self.is_empty() || self.is_disconnected() {
491                    let _ = cx.try_select(Selected::Aborted);
492                }
493
494                // Block the current thread.
495                let sel = cx.wait_until(deadline);
496
497                match sel {
498                    Selected::Waiting => unreachable!(),
499                    Selected::Aborted | Selected::Disconnected => {
500                        self.receivers.unregister(oper).unwrap();
501                        // If the channel was disconnected, we still have to check for remaining
502                        // messages.
503                    }
504                    Selected::Operation(_) => {}
505                }
506            });
507        }
508    }
509
510    /// Returns the current number of messages inside the channel.
511    pub(crate) fn len(&self) -> usize {
512        loop {
513            // Load the tail index, then load the head index.
514            let mut tail = self.tail.index.load(Ordering::SeqCst);
515            let mut head = self.head.index.load(Ordering::SeqCst);
516
517            // If the tail index didn't change, we've got consistent indices to work with.
518            if self.tail.index.load(Ordering::SeqCst) == tail {
519                // Erase the lower bits.
520                tail &= !((1 << SHIFT) - 1);
521                head &= !((1 << SHIFT) - 1);
522
523                // Fix up indices if they fall onto block ends.
524                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
525                    tail = tail.wrapping_add(1 << SHIFT);
526                }
527                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
528                    head = head.wrapping_add(1 << SHIFT);
529                }
530
531                // Rotate indices so that head falls into the first block.
532                let lap = (head >> SHIFT) / LAP;
533                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
534                head = head.wrapping_sub((lap * LAP) << SHIFT);
535
536                // Remove the lower bits.
537                tail >>= SHIFT;
538                head >>= SHIFT;
539
540                // Return the difference minus the number of blocks between tail and head.
541                return tail - head - tail / LAP;
542            }
543        }
544    }
545
546    /// Returns the capacity of the channel.
547    pub(crate) fn capacity(&self) -> Option<usize> {
548        None
549    }
550
551    /// Disconnects senders and wakes up all blocked receivers.
552    ///
553    /// Returns `true` if this call disconnected the channel.
554    pub(crate) fn disconnect_senders(&self) -> bool {
555        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
556
557        if tail & MARK_BIT == 0 {
558            self.receivers.disconnect();
559            true
560        } else {
561            false
562        }
563    }
564
565    /// Disconnects receivers.
566    ///
567    /// Returns `true` if this call disconnected the channel.
568    pub(crate) fn disconnect_receivers(&self) -> bool {
569        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
570
571        if tail & MARK_BIT == 0 {
572            // If receivers are dropped first, discard all messages to free
573            // memory eagerly.
574            self.discard_all_messages();
575            true
576        } else {
577            false
578        }
579    }
580
581    /// Discards all messages.
582    ///
583    /// This method should only be called when all receivers are dropped.
584    fn discard_all_messages(&self) {
585        let backoff = Backoff::new();
586        let mut tail = self.tail.index.load(Ordering::Acquire);
587        loop {
588            let offset = (tail >> SHIFT) % LAP;
589            if offset != BLOCK_CAP {
590                break;
591            }
592
593            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
594            // at boundary. We need to wait for the updates take affect otherwise there
595            // can be memory leaks.
596            backoff.snooze();
597            tail = self.tail.index.load(Ordering::Acquire);
598        }
599
600        let mut head = self.head.index.load(Ordering::Acquire);
601        // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
602        // to initialize the first block before noticing that the receivers disconnected. Late allocations
603        // will be deallocated by the sender in Drop
604        let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
605
606        // If we're going to be dropping messages we need to synchronize with initialization
607        if head >> SHIFT != tail >> SHIFT {
608            // The block can be null here only if a sender is in the process of initializing the
609            // channel while another sender managed to send a message by inserting it into the
610            // semi-initialized channel and advanced the tail.
611            // In that case, just wait until it gets initialized.
612            while block.is_null() {
613                backoff.snooze();
614                block = self.head.block.load(Ordering::Acquire);
615            }
616        }
617
618        unsafe {
619            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
620            while head >> SHIFT != tail >> SHIFT {
621                let offset = (head >> SHIFT) % LAP;
622
623                if offset < BLOCK_CAP {
624                    // Drop the message in the slot.
625                    let slot = (*block).slots.get_unchecked(offset);
626                    slot.wait_write();
627                    (*slot.msg.get()).assume_init_drop();
628                } else {
629                    (*block).wait_next();
630                    // Deallocate the block and move to the next one.
631                    let next = (*block).next.load(Ordering::Acquire);
632                    drop(Box::from_raw(block));
633                    block = next;
634                }
635
636                head = head.wrapping_add(1 << SHIFT);
637            }
638
639            // Deallocate the last remaining block.
640            if !block.is_null() {
641                drop(Box::from_raw(block));
642            }
643        }
644        head &= !MARK_BIT;
645        self.head.index.store(head, Ordering::Release);
646    }
647
648    /// Returns `true` if the channel is disconnected.
649    pub(crate) fn is_disconnected(&self) -> bool {
650        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
651    }
652
653    /// Returns `true` if the channel is empty.
654    pub(crate) fn is_empty(&self) -> bool {
655        let head = self.head.index.load(Ordering::SeqCst);
656        let tail = self.tail.index.load(Ordering::SeqCst);
657        head >> SHIFT == tail >> SHIFT
658    }
659
660    /// Returns `true` if the channel is full.
661    pub(crate) fn is_full(&self) -> bool {
662        false
663    }
664}
665
666impl<T> Drop for Channel<T> {
667    fn drop(&mut self) {
668        let mut head = *self.head.index.get_mut();
669        let mut tail = *self.tail.index.get_mut();
670        let mut block = *self.head.block.get_mut();
671
672        // Erase the lower bits.
673        head &= !((1 << SHIFT) - 1);
674        tail &= !((1 << SHIFT) - 1);
675
676        unsafe {
677            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
678            while head != tail {
679                let offset = (head >> SHIFT) % LAP;
680
681                if offset < BLOCK_CAP {
682                    // Drop the message in the slot.
683                    let slot = (*block).slots.get_unchecked(offset);
684                    (*slot.msg.get()).assume_init_drop();
685                } else {
686                    // Deallocate the block and move to the next one.
687                    let next = *(*block).next.get_mut();
688                    drop(Box::from_raw(block));
689                    block = next;
690                }
691
692                head = head.wrapping_add(1 << SHIFT);
693            }
694
695            // Deallocate the last remaining block.
696            if !block.is_null() {
697                drop(Box::from_raw(block));
698            }
699        }
700    }
701}
702
703/// Receiver handle to a channel.
704pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
705
706/// Sender handle to a channel.
707pub(crate) struct Sender<'a, T>(&'a Channel<T>);
708
709impl<T> SelectHandle for Receiver<'_, T> {
710    fn try_select(&self, token: &mut Token) -> bool {
711        self.0.start_recv(token)
712    }
713
714    fn deadline(&self) -> Option<Instant> {
715        None
716    }
717
718    fn register(&self, oper: Operation, cx: &Context) -> bool {
719        self.0.receivers.register(oper, cx);
720        self.is_ready()
721    }
722
723    fn unregister(&self, oper: Operation) {
724        self.0.receivers.unregister(oper);
725    }
726
727    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
728        self.try_select(token)
729    }
730
731    fn is_ready(&self) -> bool {
732        !self.0.is_empty() || self.0.is_disconnected()
733    }
734
735    fn watch(&self, oper: Operation, cx: &Context) -> bool {
736        self.0.receivers.watch(oper, cx);
737        self.is_ready()
738    }
739
740    fn unwatch(&self, oper: Operation) {
741        self.0.receivers.unwatch(oper);
742    }
743}
744
745impl<T> SelectHandle for Sender<'_, T> {
746    fn try_select(&self, token: &mut Token) -> bool {
747        self.0.start_send(token)
748    }
749
750    fn deadline(&self) -> Option<Instant> {
751        None
752    }
753
754    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
755        self.is_ready()
756    }
757
758    fn unregister(&self, _oper: Operation) {}
759
760    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
761        self.try_select(token)
762    }
763
764    fn is_ready(&self) -> bool {
765        true
766    }
767
768    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
769        self.is_ready()
770    }
771
772    fn unwatch(&self, _oper: Operation) {}
773}