crossbeam_channel/
context.rs

1//! Thread-local context used in select.
2
3use std::cell::Cell;
4use std::ptr;
5use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, Thread, ThreadId};
8use std::time::Instant;
9
10use crossbeam_utils::Backoff;
11
12use crate::select::Selected;
13
14/// Thread-local context used in select.
15// This is a private API that is used by the select macro.
16#[derive(Debug, Clone)]
17pub struct Context {
18    inner: Arc<Inner>,
19}
20
21/// Inner representation of `Context`.
22#[derive(Debug)]
23struct Inner {
24    /// Selected operation.
25    select: AtomicUsize,
26
27    /// A slot into which another thread may store a pointer to its `Packet`.
28    packet: AtomicPtr<()>,
29
30    /// Thread handle.
31    thread: Thread,
32
33    /// Thread id.
34    thread_id: ThreadId,
35}
36
37impl Context {
38    /// Creates a new context for the duration of the closure.
39    #[inline]
40    pub fn with<F, R>(f: F) -> R
41    where
42        F: FnOnce(&Context) -> R,
43    {
44        std::thread_local! {
45            /// Cached thread-local context.
46            static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
47        }
48
49        let mut f = Some(f);
50        let mut f = |cx: &Context| -> R {
51            let f = f.take().unwrap();
52            f(cx)
53        };
54
55        CONTEXT
56            .try_with(|cell| match cell.take() {
57                None => f(&Context::new()),
58                Some(cx) => {
59                    cx.reset();
60                    let res = f(&cx);
61                    cell.set(Some(cx));
62                    res
63                }
64            })
65            .unwrap_or_else(|_| f(&Context::new()))
66    }
67
68    /// Creates a new `Context`.
69    #[cold]
70    fn new() -> Context {
71        Context {
72            inner: Arc::new(Inner {
73                select: AtomicUsize::new(Selected::Waiting.into()),
74                packet: AtomicPtr::new(ptr::null_mut()),
75                thread: thread::current(),
76                thread_id: thread::current().id(),
77            }),
78        }
79    }
80
81    /// Resets `select` and `packet`.
82    #[inline]
83    fn reset(&self) {
84        self.inner
85            .select
86            .store(Selected::Waiting.into(), Ordering::Release);
87        self.inner.packet.store(ptr::null_mut(), Ordering::Release);
88    }
89
90    /// Attempts to select an operation.
91    ///
92    /// On failure, the previously selected operation is returned.
93    #[inline]
94    pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
95        self.inner
96            .select
97            .compare_exchange(
98                Selected::Waiting.into(),
99                select.into(),
100                Ordering::AcqRel,
101                Ordering::Acquire,
102            )
103            .map(|_| ())
104            .map_err(|e| e.into())
105    }
106
107    /// Returns the selected operation.
108    #[inline]
109    pub fn selected(&self) -> Selected {
110        Selected::from(self.inner.select.load(Ordering::Acquire))
111    }
112
113    /// Stores a packet.
114    ///
115    /// This method must be called after `try_select` succeeds and there is a packet to provide.
116    #[inline]
117    pub fn store_packet(&self, packet: *mut ()) {
118        if !packet.is_null() {
119            self.inner.packet.store(packet, Ordering::Release);
120        }
121    }
122
123    /// Waits until a packet is provided and returns it.
124    #[inline]
125    pub fn wait_packet(&self) -> *mut () {
126        let backoff = Backoff::new();
127        loop {
128            let packet = self.inner.packet.load(Ordering::Acquire);
129            if !packet.is_null() {
130                return packet;
131            }
132            backoff.snooze();
133        }
134    }
135
136    /// Waits until an operation is selected and returns it.
137    ///
138    /// If the deadline is reached, `Selected::Aborted` will be selected.
139    #[inline]
140    pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
141        loop {
142            // Check whether an operation has been selected.
143            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
144            if sel != Selected::Waiting {
145                return sel;
146            }
147
148            // If there's a deadline, park the current thread until the deadline is reached.
149            if let Some(end) = deadline {
150                let now = Instant::now();
151
152                if now < end {
153                    thread::park_timeout(end - now);
154                } else {
155                    // The deadline has been reached. Try aborting select.
156                    return match self.try_select(Selected::Aborted) {
157                        Ok(()) => Selected::Aborted,
158                        Err(s) => s,
159                    };
160                }
161            } else {
162                thread::park();
163            }
164        }
165    }
166
167    /// Unparks the thread this context belongs to.
168    #[inline]
169    pub fn unpark(&self) {
170        self.inner.thread.unpark();
171    }
172
173    /// Returns the id of the thread this context belongs to.
174    #[inline]
175    pub fn thread_id(&self) -> ThreadId {
176        self.inner.thread_id
177    }
178}