crossbeam_epoch/
collector.rs

1/// Epoch-based garbage collector.
2///
3/// # Examples
4///
5/// ```
6/// use crossbeam_epoch::Collector;
7///
8/// let collector = Collector::new();
9///
10/// let handle = collector.register();
11/// drop(collector); // `handle` still works after dropping `collector`
12///
13/// handle.pin().flush();
14/// ```
15use core::fmt;
16
17use crate::guard::Guard;
18use crate::internal::{Global, Local};
19use crate::primitive::sync::Arc;
20
21/// An epoch-based garbage collector.
22pub struct Collector {
23    pub(crate) global: Arc<Global>,
24}
25
26unsafe impl Send for Collector {}
27unsafe impl Sync for Collector {}
28
29impl Default for Collector {
30    fn default() -> Self {
31        Self {
32            global: Arc::new(Global::new()),
33        }
34    }
35}
36
37impl Collector {
38    /// Creates a new collector.
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Registers a new handle for the collector.
44    pub fn register(&self) -> LocalHandle {
45        Local::register(self)
46    }
47}
48
49impl Clone for Collector {
50    /// Creates another reference to the same garbage collector.
51    fn clone(&self) -> Self {
52        Collector {
53            global: self.global.clone(),
54        }
55    }
56}
57
58impl fmt::Debug for Collector {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.pad("Collector { .. }")
61    }
62}
63
64impl PartialEq for Collector {
65    /// Checks if both handles point to the same collector.
66    fn eq(&self, rhs: &Collector) -> bool {
67        Arc::ptr_eq(&self.global, &rhs.global)
68    }
69}
70impl Eq for Collector {}
71
72/// A handle to a garbage collector.
73pub struct LocalHandle {
74    pub(crate) local: *const Local,
75}
76
77impl LocalHandle {
78    /// Pins the handle.
79    #[inline]
80    pub fn pin(&self) -> Guard {
81        unsafe { (*self.local).pin() }
82    }
83
84    /// Returns `true` if the handle is pinned.
85    #[inline]
86    pub fn is_pinned(&self) -> bool {
87        unsafe { (*self.local).is_pinned() }
88    }
89
90    /// Returns the `Collector` associated with this handle.
91    #[inline]
92    pub fn collector(&self) -> &Collector {
93        unsafe { (*self.local).collector() }
94    }
95}
96
97impl Drop for LocalHandle {
98    #[inline]
99    fn drop(&mut self) {
100        unsafe {
101            Local::release_handle(&*self.local);
102        }
103    }
104}
105
106impl fmt::Debug for LocalHandle {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.pad("LocalHandle { .. }")
109    }
110}
111
112#[cfg(all(test, not(crossbeam_loom)))]
113mod tests {
114    use std::mem::ManuallyDrop;
115    use std::sync::atomic::{AtomicUsize, Ordering};
116
117    use crossbeam_utils::thread;
118
119    use crate::{Collector, Owned};
120
121    const NUM_THREADS: usize = 8;
122
123    #[test]
124    fn pin_reentrant() {
125        let collector = Collector::new();
126        let handle = collector.register();
127        drop(collector);
128
129        assert!(!handle.is_pinned());
130        {
131            let _guard = &handle.pin();
132            assert!(handle.is_pinned());
133            {
134                let _guard = &handle.pin();
135                assert!(handle.is_pinned());
136            }
137            assert!(handle.is_pinned());
138        }
139        assert!(!handle.is_pinned());
140    }
141
142    #[test]
143    fn flush_local_bag() {
144        let collector = Collector::new();
145        let handle = collector.register();
146        drop(collector);
147
148        for _ in 0..100 {
149            let guard = &handle.pin();
150            unsafe {
151                let a = Owned::new(7).into_shared(guard);
152                guard.defer_destroy(a);
153
154                assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155
156                while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157                    guard.flush();
158                }
159            }
160        }
161    }
162
163    #[test]
164    fn garbage_buffering() {
165        let collector = Collector::new();
166        let handle = collector.register();
167        drop(collector);
168
169        let guard = &handle.pin();
170        unsafe {
171            for _ in 0..10 {
172                let a = Owned::new(7).into_shared(guard);
173                guard.defer_destroy(a);
174            }
175            assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176        }
177    }
178
179    #[test]
180    fn pin_holds_advance() {
181        #[cfg(miri)]
182        const N: usize = 500;
183        #[cfg(not(miri))]
184        const N: usize = 500_000;
185
186        let collector = Collector::new();
187
188        thread::scope(|scope| {
189            for _ in 0..NUM_THREADS {
190                scope.spawn(|_| {
191                    let handle = collector.register();
192                    for _ in 0..N {
193                        let guard = &handle.pin();
194
195                        let before = collector.global.epoch.load(Ordering::Relaxed);
196                        collector.global.collect(guard);
197                        let after = collector.global.epoch.load(Ordering::Relaxed);
198
199                        assert!(after.wrapping_sub(before) <= 2);
200                    }
201                });
202            }
203        })
204        .unwrap();
205    }
206
207    #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
208    #[test]
209    fn incremental() {
210        #[cfg(miri)]
211        const COUNT: usize = 500;
212        #[cfg(not(miri))]
213        const COUNT: usize = 100_000;
214        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
215
216        let collector = Collector::new();
217        let handle = collector.register();
218
219        unsafe {
220            let guard = &handle.pin();
221            for _ in 0..COUNT {
222                let a = Owned::new(7i32).into_shared(guard);
223                guard.defer_unchecked(move || {
224                    drop(a.into_owned());
225                    DESTROYS.fetch_add(1, Ordering::Relaxed);
226                });
227            }
228            guard.flush();
229        }
230
231        let mut last = 0;
232
233        while last < COUNT {
234            let curr = DESTROYS.load(Ordering::Relaxed);
235            assert!(curr - last <= 1024);
236            last = curr;
237
238            let guard = &handle.pin();
239            collector.global.collect(guard);
240        }
241        assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
242    }
243
244    #[test]
245    fn buffering() {
246        const COUNT: usize = 10;
247        #[cfg(miri)]
248        const N: usize = 500;
249        #[cfg(not(miri))]
250        const N: usize = 100_000;
251        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
252
253        let collector = Collector::new();
254        let handle = collector.register();
255
256        unsafe {
257            let guard = &handle.pin();
258            for _ in 0..COUNT {
259                let a = Owned::new(7i32).into_shared(guard);
260                guard.defer_unchecked(move || {
261                    drop(a.into_owned());
262                    DESTROYS.fetch_add(1, Ordering::Relaxed);
263                });
264            }
265        }
266
267        for _ in 0..N {
268            collector.global.collect(&handle.pin());
269        }
270        assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
271
272        handle.pin().flush();
273
274        while DESTROYS.load(Ordering::Relaxed) < COUNT {
275            let guard = &handle.pin();
276            collector.global.collect(guard);
277        }
278        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
279    }
280
281    #[test]
282    fn count_drops() {
283        #[cfg(miri)]
284        const COUNT: usize = 500;
285        #[cfg(not(miri))]
286        const COUNT: usize = 100_000;
287        static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289        struct Elem(#[allow(dead_code)] i32);
290
291        impl Drop for Elem {
292            fn drop(&mut self) {
293                DROPS.fetch_add(1, Ordering::Relaxed);
294            }
295        }
296
297        let collector = Collector::new();
298        let handle = collector.register();
299
300        unsafe {
301            let guard = &handle.pin();
302
303            for _ in 0..COUNT {
304                let a = Owned::new(Elem(7i32)).into_shared(guard);
305                guard.defer_destroy(a);
306            }
307            guard.flush();
308        }
309
310        while DROPS.load(Ordering::Relaxed) < COUNT {
311            let guard = &handle.pin();
312            collector.global.collect(guard);
313        }
314        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
315    }
316
317    #[test]
318    fn count_destroy() {
319        #[cfg(miri)]
320        const COUNT: usize = 500;
321        #[cfg(not(miri))]
322        const COUNT: usize = 100_000;
323        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
324
325        let collector = Collector::new();
326        let handle = collector.register();
327
328        unsafe {
329            let guard = &handle.pin();
330
331            for _ in 0..COUNT {
332                let a = Owned::new(7i32).into_shared(guard);
333                guard.defer_unchecked(move || {
334                    drop(a.into_owned());
335                    DESTROYS.fetch_add(1, Ordering::Relaxed);
336                });
337            }
338            guard.flush();
339        }
340
341        while DESTROYS.load(Ordering::Relaxed) < COUNT {
342            let guard = &handle.pin();
343            collector.global.collect(guard);
344        }
345        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
346    }
347
348    #[test]
349    fn drop_array() {
350        const COUNT: usize = 700;
351        static DROPS: AtomicUsize = AtomicUsize::new(0);
352
353        struct Elem(#[allow(dead_code)] i32);
354
355        impl Drop for Elem {
356            fn drop(&mut self) {
357                DROPS.fetch_add(1, Ordering::Relaxed);
358            }
359        }
360
361        let collector = Collector::new();
362        let handle = collector.register();
363
364        let mut guard = handle.pin();
365
366        let mut v = Vec::with_capacity(COUNT);
367        for i in 0..COUNT {
368            v.push(Elem(i as i32));
369        }
370
371        {
372            let a = Owned::new(v).into_shared(&guard);
373            unsafe {
374                guard.defer_destroy(a);
375            }
376            guard.flush();
377        }
378
379        while DROPS.load(Ordering::Relaxed) < COUNT {
380            guard.repin();
381            collector.global.collect(&guard);
382        }
383        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
384    }
385
386    #[test]
387    fn destroy_array() {
388        #[cfg(miri)]
389        const COUNT: usize = 500;
390        #[cfg(not(miri))]
391        const COUNT: usize = 100_000;
392        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
393
394        let collector = Collector::new();
395        let handle = collector.register();
396
397        unsafe {
398            let guard = &handle.pin();
399
400            let mut v = Vec::with_capacity(COUNT);
401            for i in 0..COUNT {
402                v.push(i as i32);
403            }
404
405            let len = v.len();
406            let cap = v.capacity();
407            let ptr = ManuallyDrop::new(v).as_mut_ptr();
408            guard.defer_unchecked(move || {
409                drop(Vec::from_raw_parts(ptr, len, cap));
410                DESTROYS.fetch_add(len, Ordering::Relaxed);
411            });
412            guard.flush();
413        }
414
415        while DESTROYS.load(Ordering::Relaxed) < COUNT {
416            let guard = &handle.pin();
417            collector.global.collect(guard);
418        }
419        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
420    }
421
422    #[test]
423    fn stress() {
424        const THREADS: usize = 8;
425        #[cfg(miri)]
426        const COUNT: usize = 500;
427        #[cfg(not(miri))]
428        const COUNT: usize = 100_000;
429        static DROPS: AtomicUsize = AtomicUsize::new(0);
430
431        struct Elem(#[allow(dead_code)] i32);
432
433        impl Drop for Elem {
434            fn drop(&mut self) {
435                DROPS.fetch_add(1, Ordering::Relaxed);
436            }
437        }
438
439        let collector = Collector::new();
440
441        thread::scope(|scope| {
442            for _ in 0..THREADS {
443                scope.spawn(|_| {
444                    let handle = collector.register();
445                    for _ in 0..COUNT {
446                        let guard = &handle.pin();
447                        unsafe {
448                            let a = Owned::new(Elem(7i32)).into_shared(guard);
449                            guard.defer_destroy(a);
450                        }
451                    }
452                });
453            }
454        })
455        .unwrap();
456
457        let handle = collector.register();
458        while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
459            let guard = &handle.pin();
460            collector.global.collect(guard);
461        }
462        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
463    }
464}