cs431_homework/
boc.rs

1//! Concurrent Owner (Cown) type.
2
3use core::cell::UnsafeCell;
4use core::sync::atomic::Ordering::SeqCst;
5use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
6use core::{fmt, hint, ptr};
7use std::sync::Arc;
8
9/// A trait representing a `Cown`.
10///
11/// Instead of directly using a `Cown<T>`, which fixes _a single_ `T` we use a trait object to allow
12/// multiple requests with different `T`s to be used with the same cown.
13///
14/// # Safety
15///
16/// `last` should actually return the last request for the corresponding cown.
17unsafe trait CownBase: Send {
18    /// Return a pointer to the tail of this cown's request queue.
19    fn last(&self) -> &AtomicPtr<Request>;
20}
21
22/// A request for a cown.
23pub struct Request {
24    /// Pointer to the next scheduled behavior.
25    next: AtomicPtr<Behavior>,
26    /// Is this request scheduled?
27    scheduled: AtomicBool,
28    /// The cown that this request wants to access.
29    ///
30    /// This is an `Arc` as the all exposed `CownPtr`s may have been dropped while the behavior is
31    /// still scheduled.
32    target: Arc<dyn CownBase>,
33}
34
35// SAFETY: In the basic version of BoC, user cannot get shared reference through the [`CownBase`],
36// so `Sync` bound on it is not necessary.
37unsafe impl Send for Request {}
38
39impl Request {
40    /// Creates a new Request.
41    fn new(target: Arc<dyn CownBase>) -> Request {
42        Request {
43            next: AtomicPtr::new(ptr::null_mut()),
44            scheduled: AtomicBool::new(false),
45            target,
46        }
47    }
48
49    /// Start the first phase of the 2PL enqueue operation.
50    ///
51    /// Enqueues `self` onto the `target` cown. Returns once all previous behaviors on this cown has
52    /// finished enqueueing on all of its required cowns. This ensures the 2PL protocol.
53    ///
54    /// # SAFETY
55    ///
56    /// `behavior` must be a valid raw pointer to the behavior for `self`, and this should be the
57    /// only enqueueing of this request and behavior.
58    unsafe fn start_enqueue(&self, behavior: *const Behavior) {
59        todo!()
60    }
61
62    /// Finish the second phase of the 2PL enqueue operation.
63    ///
64    /// Sets the scheduled flag so that subsequent behaviors can continue the 2PL enqueue.
65    ///
66    /// # Safety
67    ///
68    /// All enqueues for smaller requests on this cown must have been completed.
69    unsafe fn finish_enqueue(&self) {
70        todo!()
71    }
72
73    /// Release the cown to the next behavior.
74    ///
75    /// Called when `self` has been completed, and thus can allow the next waiting behavior to run.
76    /// If there is no next behavior, then the cown's tail pointer is set to null.
77    ///
78    /// # Safety
79    ///
80    /// `self` must have been actually completed.
81    unsafe fn release(&self) {
82        todo!()
83    }
84}
85
86impl Ord for Request {
87    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
88        #[allow(warnings)]
89        Arc::as_ptr(&self.target).cmp(&Arc::as_ptr(&other.target))
90    }
91}
92impl PartialOrd for Request {
93    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
94        Some(self.cmp(other))
95    }
96}
97impl PartialEq for Request {
98    fn eq(&self, other: &Self) -> bool {
99        matches!(self.cmp(other), core::cmp::Ordering::Equal)
100    }
101}
102impl Eq for Request {}
103
104impl fmt::Debug for Request {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("Request")
107            .field("next", &self.next)
108            .field("scheduled", &self.scheduled)
109            .finish()
110    }
111}
112
113/// The value should only be accessed inside a `when!` block.
114#[derive(Debug)]
115struct Cown<T: Send> {
116    /// MCS lock tail.
117    ///
118    /// When a new node is enqueued, the enqueuer of the previous tail node will wait until the
119    /// current enqueuer sets that node's `.next`.
120    last: AtomicPtr<Request>,
121    /// The value of this cown.
122    value: UnsafeCell<T>,
123}
124
125// SAFETY: `self.tail` is indeed the actual tail.
126unsafe impl<T: Send> CownBase for Cown<T> {
127    fn last(&self) -> &AtomicPtr<Request> {
128        &self.last
129    }
130}
131
132/// Public interface to Cown.
133#[derive(Debug)]
134pub struct CownPtr<T: Send> {
135    inner: Arc<Cown<T>>,
136}
137
138// SAFETY: In the basic version of BoC, user cannot get `&T`, so `Sync` is not necessary.
139unsafe impl<T: Send> Send for CownPtr<T> {}
140
141impl<T: Send> Clone for CownPtr<T> {
142    fn clone(&self) -> Self {
143        CownPtr {
144            inner: self.inner.clone(),
145        }
146    }
147}
148
149impl<T: Send> CownPtr<T> {
150    /// Creates a new Cown.
151    pub fn new(value: T) -> CownPtr<T> {
152        CownPtr {
153            inner: Arc::new(Cown {
154                last: AtomicPtr::new(ptr::null_mut()),
155                value: UnsafeCell::new(value),
156            }),
157        }
158    }
159}
160
161type BehaviorThunk = Box<dyn FnOnce() + Send>;
162
163/// Behavior that captures the content of a when body.
164struct Behavior {
165    /// The body of the Behavior.
166    thunk: BehaviorThunk,
167    /// Number of not-yet enqueued requests.
168    count: AtomicUsize,
169    /// The requests for this behavior.
170    requests: Vec<Request>,
171}
172
173impl Behavior {
174    /// Schedules the Behavior.
175    ///
176    /// Performs two phase locking (2PL) over the enqueuing of the requests.
177    /// This ensures that the overall effect of the enqueue is atomic.
178    fn schedule(self) {
179        todo!()
180    }
181
182    /// Resolves a single outstanding request for `this`.
183    ///
184    /// Called when a request for `this` is at the head of the queue for a particular cown. If it is
185    /// the last request, then the thunk is scheduled.
186    ///
187    /// # Safety
188    ///
189    /// `this` must be a valid behavior.
190    unsafe fn resolve_one(this: *const Self) {
191        todo!()
192    }
193}
194
195impl fmt::Debug for Behavior {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("Behavior")
198            .field("thunk", &"BehaviorThunk")
199            .field("count", &self.count)
200            .field("requests", &self.requests)
201            .finish()
202    }
203}
204
205// TODO: terminator?
206impl Behavior {
207    fn new<C, F>(cowns: C, f: F) -> Behavior
208    where
209        C: CownPtrs + Send + 'static,
210        F: for<'l> Fn(C::CownRefs<'l>) + Send + 'static,
211    {
212        todo!()
213    }
214}
215
216/// Trait for a collection of `CownPtr`s.
217///
218/// Users pass `CownPtrs` to `when!` clause to specify a collection of shared resources, and such
219/// resources can be accessed via `CownRefs` inside the thunk.
220///
221/// # Safety
222///
223/// `requests` should actually return the requests for the corresponding cowns.
224pub unsafe trait CownPtrs {
225    /// Types for references corresponding to `CownPtrs`.
226    type CownRefs<'l>
227    where
228        Self: 'l;
229
230    /// Returns a collection of `Request`.
231    // This could return a `Box<[Request]>`, but we use a `Vec` to avoid possible reallocation in
232    // the implementation.
233    fn requests(&self) -> Vec<Request>;
234
235    /// Returns mutable references of type `CownRefs`.
236    ///
237    /// # Safety
238    ///
239    /// Must be called only if it is safe to access the shared resources.
240    unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l>;
241}
242
243unsafe impl CownPtrs for () {
244    type CownRefs<'l>
245        = ()
246    where
247        Self: 'l;
248
249    fn requests(&self) -> Vec<Request> {
250        Vec::new()
251    }
252
253    unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> {}
254}
255
256unsafe impl<T: Send + 'static, Ts: CownPtrs> CownPtrs for (CownPtr<T>, Ts) {
257    type CownRefs<'l>
258        = (&'l mut T, Ts::CownRefs<'l>)
259    where
260        Self: 'l;
261
262    fn requests(&self) -> Vec<Request> {
263        let mut rs = self.1.requests();
264        let cown_base: Arc<dyn CownBase> = self.0.inner.clone();
265        rs.push(Request::new(cown_base));
266        rs
267    }
268
269    unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> {
270        unsafe { (&mut *self.0.inner.value.get(), self.1.get_mut()) }
271    }
272}
273
274unsafe impl<T: Send + 'static> CownPtrs for Vec<CownPtr<T>> {
275    type CownRefs<'l>
276        = Vec<&'l mut T>
277    where
278        Self: 'l;
279
280    fn requests(&self) -> Vec<Request> {
281        self.iter().map(|x| Request::new(x.inner.clone())).collect()
282    }
283
284    unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> {
285        self.iter()
286            .map(|x| unsafe { &mut *x.inner.value.get() })
287            .collect()
288    }
289}
290
291/// Creates a `Behavior` and schedules it. Used by "When" block.
292pub fn run_when<C, F>(cowns: C, f: F)
293where
294    C: CownPtrs + Send + 'static,
295    F: for<'l> Fn(C::CownRefs<'l>) + Send + 'static,
296{
297    Behavior::new(cowns, f).schedule();
298}
299
300/// from <https://docs.rs/tuple_list/latest/tuple_list/>
301#[macro_export]
302macro_rules! tuple_list {
303    () => ( () );
304
305    // handling simple identifiers, for limited types and patterns support
306    ($i:ident)  => ( ($i, ()) );
307    ($i:ident,) => ( ($i, ()) );
308    ($i:ident, $($e:ident),*)  => ( ($i, $crate::tuple_list!($($e),*)) );
309    ($i:ident, $($e:ident),*,) => ( ($i, $crate::tuple_list!($($e),*)) );
310
311    // handling complex expressions
312    ($i:expr_2021)  => ( ($i, ()) );
313    ($i:expr_2021,) => ( ($i, ()) );
314    ($i:expr_2021, $($e:expr_2021),*)  => ( ($i, $crate::tuple_list!($($e),*)) );
315    ($i:expr_2021, $($e:expr_2021),*,) => ( ($i, $crate::tuple_list!($($e),*)) );
316}
317
318/// "When" block.
319#[macro_export]
320macro_rules! when {
321    ( $( $cs:ident ),* ; $( $gs:ident ),* ; $thunk:expr_2021 ) => {{
322        run_when(tuple_list!($($cs.clone()),*), move |tuple_list!($($gs),*)| $thunk);
323    }};
324}
325
326#[test]
327fn boc() {
328    let c1 = CownPtr::new(0);
329    let c2 = CownPtr::new(0);
330    let c3 = CownPtr::new(false);
331    let c2_ = c2.clone();
332    let c3_ = c3.clone();
333
334    let (finish_sender, finish_receiver) = crossbeam_channel::bounded(0);
335
336    when!(c1, c2; g1, g2; {
337        // c3, c2 are moved into this thunk. There's no such thing as auto-cloning move closure.
338        *g1 += 1;
339        *g2 += 1;
340        when!(c3, c2; g3, g2; {
341            *g2 += 1;
342            *g3 = true;
343        });
344    });
345
346    when!(c1, c2_, c3_; g1, g2, g3; {
347        assert_eq!(*g1, 1);
348        assert_eq!(*g2, if *g3 { 2 } else { 1 });
349        finish_sender.send(()).unwrap();
350    });
351
352    // wait for termination
353    finish_receiver.recv().unwrap();
354}
355
356#[test]
357fn boc_vec() {
358    let c1 = CownPtr::new(0);
359    let c2 = CownPtr::new(0);
360    let c3 = CownPtr::new(false);
361    let c2_ = c2.clone();
362    let c3_ = c3.clone();
363
364    let (finish_sender, finish_receiver) = crossbeam_channel::bounded(0);
365
366    run_when(vec![c1.clone(), c2.clone()], move |mut x| {
367        // c3, c2 are moved into this thunk. There's no such thing as auto-cloning move closure.
368        *x[0] += 1;
369        *x[1] += 1;
370        when!(c3, c2; g3, g2; {
371            *g2 += 1;
372            *g3 = true;
373        });
374    });
375
376    when!(c1, c2_, c3_; g1, g2, g3; {
377        assert_eq!(*g1, 1);
378        assert_eq!(*g2, if *g3 { 2 } else { 1 });
379        finish_sender.send(()).unwrap();
380    });
381
382    // wait for termination
383    finish_receiver.recv().unwrap();
384}