crossbeam_channel/
waker.rs

1//! Waking mechanism for threads blocked on channel operations.
2
3use std::ptr;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Mutex;
6use std::thread::{self, ThreadId};
7use std::vec::Vec;
8
9use crate::context::Context;
10use crate::select::{Operation, Selected};
11
12/// Represents a thread blocked on a specific channel operation.
13pub(crate) struct Entry {
14    /// The operation.
15    pub(crate) oper: Operation,
16
17    /// Optional packet.
18    pub(crate) packet: *mut (),
19
20    /// Context associated with the thread owning this operation.
21    pub(crate) cx: Context,
22}
23
24/// A queue of threads blocked on channel operations.
25///
26/// This data structure is used by threads to register blocking operations and get woken up once
27/// an operation becomes ready.
28pub(crate) struct Waker {
29    /// A list of select operations.
30    selectors: Vec<Entry>,
31
32    /// A list of operations waiting to be ready.
33    observers: Vec<Entry>,
34}
35
36impl Waker {
37    /// Creates a new `Waker`.
38    #[inline]
39    pub(crate) fn new() -> Self {
40        Waker {
41            selectors: Vec::new(),
42            observers: Vec::new(),
43        }
44    }
45
46    /// Registers a select operation.
47    #[inline]
48    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
49        self.register_with_packet(oper, ptr::null_mut(), cx);
50    }
51
52    /// Registers a select operation and a packet.
53    #[inline]
54    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
55        self.selectors.push(Entry {
56            oper,
57            packet,
58            cx: cx.clone(),
59        });
60    }
61
62    /// Unregisters a select operation.
63    #[inline]
64    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
65        if let Some((i, _)) = self
66            .selectors
67            .iter()
68            .enumerate()
69            .find(|&(_, entry)| entry.oper == oper)
70        {
71            let entry = self.selectors.remove(i);
72            Some(entry)
73        } else {
74            None
75        }
76    }
77
78    /// Attempts to find another thread's entry, select the operation, and wake it up.
79    #[inline]
80    pub(crate) fn try_select(&mut self) -> Option<Entry> {
81        if self.selectors.is_empty() {
82            None
83        } else {
84            let thread_id = current_thread_id();
85
86            self.selectors
87                .iter()
88                .position(|selector| {
89                    // Does the entry belong to a different thread?
90                    selector.cx.thread_id() != thread_id
91                        && selector // Try selecting this operation.
92                            .cx
93                            .try_select(Selected::Operation(selector.oper))
94                            .is_ok()
95                        && {
96                            // Provide the packet.
97                            selector.cx.store_packet(selector.packet);
98                            // Wake the thread up.
99                            selector.cx.unpark();
100                            true
101                        }
102                })
103                // Remove the entry from the queue to keep it clean and improve
104                // performance.
105                .map(|pos| self.selectors.remove(pos))
106        }
107    }
108
109    /// Returns `true` if there is an entry which can be selected by the current thread.
110    #[inline]
111    pub(crate) fn can_select(&self) -> bool {
112        if self.selectors.is_empty() {
113            false
114        } else {
115            let thread_id = current_thread_id();
116
117            self.selectors.iter().any(|entry| {
118                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119            })
120        }
121    }
122
123    /// Registers an operation waiting to be ready.
124    #[inline]
125    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126        self.observers.push(Entry {
127            oper,
128            packet: ptr::null_mut(),
129            cx: cx.clone(),
130        });
131    }
132
133    /// Unregisters an operation waiting to be ready.
134    #[inline]
135    pub(crate) fn unwatch(&mut self, oper: Operation) {
136        self.observers.retain(|e| e.oper != oper);
137    }
138
139    /// Notifies all operations waiting to be ready.
140    #[inline]
141    pub(crate) fn notify(&mut self) {
142        for entry in self.observers.drain(..) {
143            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144                entry.cx.unpark();
145            }
146        }
147    }
148
149    /// Notifies all registered operations that the channel is disconnected.
150    #[inline]
151    pub(crate) fn disconnect(&mut self) {
152        for entry in self.selectors.iter() {
153            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154                // Wake the thread up.
155                //
156                // Here we don't remove the entry from the queue. Registered threads must
157                // unregister from the waker by themselves. They might also want to recover the
158                // packet value and destroy it, if necessary.
159                entry.cx.unpark();
160            }
161        }
162
163        self.notify();
164    }
165}
166
167impl Drop for Waker {
168    #[inline]
169    fn drop(&mut self) {
170        debug_assert_eq!(self.selectors.len(), 0);
171        debug_assert_eq!(self.observers.len(), 0);
172    }
173}
174
175/// A waker that can be shared among threads without locking.
176///
177/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
178pub(crate) struct SyncWaker {
179    /// The inner `Waker`.
180    inner: Mutex<Waker>,
181
182    /// `true` if the waker is empty.
183    is_empty: AtomicBool,
184}
185
186impl SyncWaker {
187    /// Creates a new `SyncWaker`.
188    #[inline]
189    pub(crate) fn new() -> Self {
190        SyncWaker {
191            inner: Mutex::new(Waker::new()),
192            is_empty: AtomicBool::new(true),
193        }
194    }
195
196    /// Registers the current thread with an operation.
197    #[inline]
198    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199        let mut inner = self.inner.lock().unwrap();
200        inner.register(oper, cx);
201        self.is_empty.store(
202            inner.selectors.is_empty() && inner.observers.is_empty(),
203            Ordering::SeqCst,
204        );
205    }
206
207    /// Unregisters an operation previously registered by the current thread.
208    #[inline]
209    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210        let mut inner = self.inner.lock().unwrap();
211        let entry = inner.unregister(oper);
212        self.is_empty.store(
213            inner.selectors.is_empty() && inner.observers.is_empty(),
214            Ordering::SeqCst,
215        );
216        entry
217    }
218
219    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
220    #[inline]
221    pub(crate) fn notify(&self) {
222        if !self.is_empty.load(Ordering::SeqCst) {
223            let mut inner = self.inner.lock().unwrap();
224            if !self.is_empty.load(Ordering::SeqCst) {
225                inner.try_select();
226                inner.notify();
227                self.is_empty.store(
228                    inner.selectors.is_empty() && inner.observers.is_empty(),
229                    Ordering::SeqCst,
230                );
231            }
232        }
233    }
234
235    /// Registers an operation waiting to be ready.
236    #[inline]
237    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238        let mut inner = self.inner.lock().unwrap();
239        inner.watch(oper, cx);
240        self.is_empty.store(
241            inner.selectors.is_empty() && inner.observers.is_empty(),
242            Ordering::SeqCst,
243        );
244    }
245
246    /// Unregisters an operation waiting to be ready.
247    #[inline]
248    pub(crate) fn unwatch(&self, oper: Operation) {
249        let mut inner = self.inner.lock().unwrap();
250        inner.unwatch(oper);
251        self.is_empty.store(
252            inner.selectors.is_empty() && inner.observers.is_empty(),
253            Ordering::SeqCst,
254        );
255    }
256
257    /// Notifies all threads that the channel is disconnected.
258    #[inline]
259    pub(crate) fn disconnect(&self) {
260        let mut inner = self.inner.lock().unwrap();
261        inner.disconnect();
262        self.is_empty.store(
263            inner.selectors.is_empty() && inner.observers.is_empty(),
264            Ordering::SeqCst,
265        );
266    }
267}
268
269impl Drop for SyncWaker {
270    #[inline]
271    fn drop(&mut self) {
272        debug_assert!(self.is_empty.load(Ordering::SeqCst));
273    }
274}
275
276/// Returns the id of the current thread.
277#[inline]
278fn current_thread_id() -> ThreadId {
279    std::thread_local! {
280        /// Cached thread-local id.
281        static THREAD_ID: ThreadId = thread::current().id();
282    }
283
284    THREAD_ID
285        .try_with(|id| *id)
286        .unwrap_or_else(|_| thread::current().id())
287}