crossbeam_channel/
waker.rs1use std::ptr;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Mutex;
6use std::thread::{self, ThreadId};
7use std::vec::Vec;
8
9use crate::context::Context;
10use crate::select::{Operation, Selected};
11
12pub(crate) struct Entry {
14 pub(crate) oper: Operation,
16
17 pub(crate) packet: *mut (),
19
20 pub(crate) cx: Context,
22}
23
24pub(crate) struct Waker {
29 selectors: Vec<Entry>,
31
32 observers: Vec<Entry>,
34}
35
36impl Waker {
37 #[inline]
39 pub(crate) fn new() -> Self {
40 Waker {
41 selectors: Vec::new(),
42 observers: Vec::new(),
43 }
44 }
45
46 #[inline]
48 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
49 self.register_with_packet(oper, ptr::null_mut(), cx);
50 }
51
52 #[inline]
54 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
55 self.selectors.push(Entry {
56 oper,
57 packet,
58 cx: cx.clone(),
59 });
60 }
61
62 #[inline]
64 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
65 if let Some((i, _)) = self
66 .selectors
67 .iter()
68 .enumerate()
69 .find(|&(_, entry)| entry.oper == oper)
70 {
71 let entry = self.selectors.remove(i);
72 Some(entry)
73 } else {
74 None
75 }
76 }
77
78 #[inline]
80 pub(crate) fn try_select(&mut self) -> Option<Entry> {
81 if self.selectors.is_empty() {
82 None
83 } else {
84 let thread_id = current_thread_id();
85
86 self.selectors
87 .iter()
88 .position(|selector| {
89 selector.cx.thread_id() != thread_id
91 && selector .cx
93 .try_select(Selected::Operation(selector.oper))
94 .is_ok()
95 && {
96 selector.cx.store_packet(selector.packet);
98 selector.cx.unpark();
100 true
101 }
102 })
103 .map(|pos| self.selectors.remove(pos))
106 }
107 }
108
109 #[inline]
111 pub(crate) fn can_select(&self) -> bool {
112 if self.selectors.is_empty() {
113 false
114 } else {
115 let thread_id = current_thread_id();
116
117 self.selectors.iter().any(|entry| {
118 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119 })
120 }
121 }
122
123 #[inline]
125 pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126 self.observers.push(Entry {
127 oper,
128 packet: ptr::null_mut(),
129 cx: cx.clone(),
130 });
131 }
132
133 #[inline]
135 pub(crate) fn unwatch(&mut self, oper: Operation) {
136 self.observers.retain(|e| e.oper != oper);
137 }
138
139 #[inline]
141 pub(crate) fn notify(&mut self) {
142 for entry in self.observers.drain(..) {
143 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144 entry.cx.unpark();
145 }
146 }
147 }
148
149 #[inline]
151 pub(crate) fn disconnect(&mut self) {
152 for entry in self.selectors.iter() {
153 if entry.cx.try_select(Selected::Disconnected).is_ok() {
154 entry.cx.unpark();
160 }
161 }
162
163 self.notify();
164 }
165}
166
167impl Drop for Waker {
168 #[inline]
169 fn drop(&mut self) {
170 debug_assert_eq!(self.selectors.len(), 0);
171 debug_assert_eq!(self.observers.len(), 0);
172 }
173}
174
175pub(crate) struct SyncWaker {
179 inner: Mutex<Waker>,
181
182 is_empty: AtomicBool,
184}
185
186impl SyncWaker {
187 #[inline]
189 pub(crate) fn new() -> Self {
190 SyncWaker {
191 inner: Mutex::new(Waker::new()),
192 is_empty: AtomicBool::new(true),
193 }
194 }
195
196 #[inline]
198 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199 let mut inner = self.inner.lock().unwrap();
200 inner.register(oper, cx);
201 self.is_empty.store(
202 inner.selectors.is_empty() && inner.observers.is_empty(),
203 Ordering::SeqCst,
204 );
205 }
206
207 #[inline]
209 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210 let mut inner = self.inner.lock().unwrap();
211 let entry = inner.unregister(oper);
212 self.is_empty.store(
213 inner.selectors.is_empty() && inner.observers.is_empty(),
214 Ordering::SeqCst,
215 );
216 entry
217 }
218
219 #[inline]
221 pub(crate) fn notify(&self) {
222 if !self.is_empty.load(Ordering::SeqCst) {
223 let mut inner = self.inner.lock().unwrap();
224 if !self.is_empty.load(Ordering::SeqCst) {
225 inner.try_select();
226 inner.notify();
227 self.is_empty.store(
228 inner.selectors.is_empty() && inner.observers.is_empty(),
229 Ordering::SeqCst,
230 );
231 }
232 }
233 }
234
235 #[inline]
237 pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238 let mut inner = self.inner.lock().unwrap();
239 inner.watch(oper, cx);
240 self.is_empty.store(
241 inner.selectors.is_empty() && inner.observers.is_empty(),
242 Ordering::SeqCst,
243 );
244 }
245
246 #[inline]
248 pub(crate) fn unwatch(&self, oper: Operation) {
249 let mut inner = self.inner.lock().unwrap();
250 inner.unwatch(oper);
251 self.is_empty.store(
252 inner.selectors.is_empty() && inner.observers.is_empty(),
253 Ordering::SeqCst,
254 );
255 }
256
257 #[inline]
259 pub(crate) fn disconnect(&self) {
260 let mut inner = self.inner.lock().unwrap();
261 inner.disconnect();
262 self.is_empty.store(
263 inner.selectors.is_empty() && inner.observers.is_empty(),
264 Ordering::SeqCst,
265 );
266 }
267}
268
269impl Drop for SyncWaker {
270 #[inline]
271 fn drop(&mut self) {
272 debug_assert!(self.is_empty.load(Ordering::SeqCst));
273 }
274}
275
276#[inline]
278fn current_thread_id() -> ThreadId {
279 std::thread_local! {
280 static THREAD_ID: ThreadId = thread::current().id();
282 }
283
284 THREAD_ID
285 .try_with(|id| *id)
286 .unwrap_or_else(|_| thread::current().id())
287}