更新libclamav库1.0.0版本

This commit is contained in:
2023-01-14 18:28:39 +08:00
parent b879ee0b2e
commit 45fe15f472
8531 changed files with 1222046 additions and 177272 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,463 @@
/// Epoch-based garbage collector.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::Collector;
///
/// let collector = Collector::new();
///
/// let handle = collector.register();
/// drop(collector); // `handle` still works after dropping `collector`
///
/// handle.pin().flush();
/// ```
use core::fmt;
use crate::guard::Guard;
use crate::internal::{Global, Local};
use crate::primitive::sync::Arc;
/// An epoch-based garbage collector.
pub struct Collector {
pub(crate) global: Arc<Global>,
}
unsafe impl Send for Collector {}
unsafe impl Sync for Collector {}
impl Default for Collector {
fn default() -> Self {
Self {
global: Arc::new(Global::new()),
}
}
}
impl Collector {
/// Creates a new collector.
pub fn new() -> Self {
Self::default()
}
/// Registers a new handle for the collector.
pub fn register(&self) -> LocalHandle {
Local::register(self)
}
}
impl Clone for Collector {
/// Creates another reference to the same garbage collector.
fn clone(&self) -> Self {
Collector {
global: self.global.clone(),
}
}
}
impl fmt::Debug for Collector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Collector { .. }")
}
}
impl PartialEq for Collector {
/// Checks if both handles point to the same collector.
fn eq(&self, rhs: &Collector) -> bool {
Arc::ptr_eq(&self.global, &rhs.global)
}
}
impl Eq for Collector {}
/// A handle to a garbage collector.
pub struct LocalHandle {
pub(crate) local: *const Local,
}
impl LocalHandle {
/// Pins the handle.
#[inline]
pub fn pin(&self) -> Guard {
unsafe { (*self.local).pin() }
}
/// Returns `true` if the handle is pinned.
#[inline]
pub fn is_pinned(&self) -> bool {
unsafe { (*self.local).is_pinned() }
}
/// Returns the `Collector` associated with this handle.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { (*self.local).collector() }
}
}
impl Drop for LocalHandle {
#[inline]
fn drop(&mut self) {
unsafe {
Local::release_handle(&*self.local);
}
}
}
impl fmt::Debug for LocalHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("LocalHandle { .. }")
}
}
#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
use std::mem::ManuallyDrop;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_utils::thread;
use crate::{Collector, Owned};
const NUM_THREADS: usize = 8;
#[test]
fn pin_reentrant() {
let collector = Collector::new();
let handle = collector.register();
drop(collector);
assert!(!handle.is_pinned());
{
let _guard = &handle.pin();
assert!(handle.is_pinned());
{
let _guard = &handle.pin();
assert!(handle.is_pinned());
}
assert!(handle.is_pinned());
}
assert!(!handle.is_pinned());
}
#[test]
fn flush_local_bag() {
let collector = Collector::new();
let handle = collector.register();
drop(collector);
for _ in 0..100 {
let guard = &handle.pin();
unsafe {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
while !(*guard.local).bag.with(|b| (*b).is_empty()) {
guard.flush();
}
}
}
}
#[test]
fn garbage_buffering() {
let collector = Collector::new();
let handle = collector.register();
drop(collector);
let guard = &handle.pin();
unsafe {
for _ in 0..10 {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
}
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
}
}
#[test]
fn pin_holds_advance() {
#[cfg(miri)]
const N: usize = 500;
#[cfg(not(miri))]
const N: usize = 500_000;
let collector = Collector::new();
thread::scope(|scope| {
for _ in 0..NUM_THREADS {
scope.spawn(|_| {
let handle = collector.register();
for _ in 0..N {
let guard = &handle.pin();
let before = collector.global.epoch.load(Ordering::Relaxed);
collector.global.collect(guard);
let after = collector.global.epoch.load(Ordering::Relaxed);
assert!(after.wrapping_sub(before) <= 2);
}
});
}
})
.unwrap();
}
#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
#[test]
fn incremental() {
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);
let collector = Collector::new();
let handle = collector.register();
unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(7i32).into_shared(guard);
guard.defer_unchecked(move || {
drop(a.into_owned());
DESTROYS.fetch_add(1, Ordering::Relaxed);
});
}
guard.flush();
}
let mut last = 0;
while last < COUNT {
let curr = DESTROYS.load(Ordering::Relaxed);
assert!(curr - last <= 1024);
last = curr;
let guard = &handle.pin();
collector.global.collect(guard);
}
assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
}
#[test]
fn buffering() {
const COUNT: usize = 10;
#[cfg(miri)]
const N: usize = 500;
#[cfg(not(miri))]
const N: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);
let collector = Collector::new();
let handle = collector.register();
unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(7i32).into_shared(guard);
guard.defer_unchecked(move || {
drop(a.into_owned());
DESTROYS.fetch_add(1, Ordering::Relaxed);
});
}
}
for _ in 0..N {
collector.global.collect(&handle.pin());
}
assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
handle.pin().flush();
while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}
#[test]
fn count_drops() {
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::Relaxed);
}
}
let collector = Collector::new();
let handle = collector.register();
unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(Elem(7i32)).into_shared(guard);
guard.defer_destroy(a);
}
guard.flush();
}
while DROPS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
#[test]
fn count_destroy() {
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);
let collector = Collector::new();
let handle = collector.register();
unsafe {
let guard = &handle.pin();
for _ in 0..COUNT {
let a = Owned::new(7i32).into_shared(guard);
guard.defer_unchecked(move || {
drop(a.into_owned());
DESTROYS.fetch_add(1, Ordering::Relaxed);
});
}
guard.flush();
}
while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}
#[test]
fn drop_array() {
const COUNT: usize = 700;
static DROPS: AtomicUsize = AtomicUsize::new(0);
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::Relaxed);
}
}
let collector = Collector::new();
let handle = collector.register();
let mut guard = handle.pin();
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}
{
let a = Owned::new(v).into_shared(&guard);
unsafe {
guard.defer_destroy(a);
}
guard.flush();
}
while DROPS.load(Ordering::Relaxed) < COUNT {
guard.repin();
collector.global.collect(&guard);
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
#[test]
fn destroy_array() {
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = AtomicUsize::new(0);
let collector = Collector::new();
let handle = collector.register();
unsafe {
let guard = &handle.pin();
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(i as i32);
}
let len = v.len();
let ptr = ManuallyDrop::new(v).as_mut_ptr() as usize;
guard.defer_unchecked(move || {
drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len));
DESTROYS.fetch_add(len, Ordering::Relaxed);
});
guard.flush();
}
while DESTROYS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
}
#[test]
fn stress() {
const THREADS: usize = 8;
#[cfg(miri)]
const COUNT: usize = 500;
#[cfg(not(miri))]
const COUNT: usize = 100_000;
static DROPS: AtomicUsize = AtomicUsize::new(0);
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::Relaxed);
}
}
let collector = Collector::new();
thread::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| {
let handle = collector.register();
for _ in 0..COUNT {
let guard = &handle.pin();
unsafe {
let a = Owned::new(Elem(7i32)).into_shared(guard);
guard.defer_destroy(a);
}
}
});
}
})
.unwrap();
let handle = collector.register();
while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
let guard = &handle.pin();
collector.global.collect(guard);
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
}
}

View File

@@ -0,0 +1,93 @@
//! The default garbage collector.
//!
//! For each thread, a participant is lazily initialized on its first use, when the current thread
//! is registered in the default collector. If initialized, the thread's participant will get
//! destructed on thread exit, which in turn unregisters the thread.
use crate::collector::{Collector, LocalHandle};
use crate::guard::Guard;
use crate::primitive::thread_local;
#[cfg(not(crossbeam_loom))]
use crate::sync::once_lock::OnceLock;
fn collector() -> &'static Collector {
#[cfg(not(crossbeam_loom))]
{
/// The global data for the default garbage collector.
static COLLECTOR: OnceLock<Collector> = OnceLock::new();
COLLECTOR.get_or_init(Collector::new)
}
// FIXME: loom does not currently provide the equivalent of Lazy:
// https://github.com/tokio-rs/loom/issues/263
#[cfg(crossbeam_loom)]
{
loom::lazy_static! {
/// The global data for the default garbage collector.
static ref COLLECTOR: Collector = Collector::new();
}
&COLLECTOR
}
}
thread_local! {
/// The per-thread participant for the default garbage collector.
static HANDLE: LocalHandle = collector().register();
}
/// Pins the current thread.
#[inline]
pub fn pin() -> Guard {
with_handle(|handle| handle.pin())
}
/// Returns `true` if the current thread is pinned.
#[inline]
pub fn is_pinned() -> bool {
with_handle(|handle| handle.is_pinned())
}
/// Returns the default global collector.
pub fn default_collector() -> &'static Collector {
collector()
}
#[inline]
fn with_handle<F, R>(mut f: F) -> R
where
F: FnMut(&LocalHandle) -> R,
{
HANDLE
.try_with(|h| f(h))
.unwrap_or_else(|_| f(&collector().register()))
}
#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
use crossbeam_utils::thread;
#[test]
fn pin_while_exiting() {
struct Foo;
impl Drop for Foo {
fn drop(&mut self) {
// Pin after `HANDLE` has been dropped. This must not panic.
super::pin();
}
}
thread_local! {
static FOO: Foo = Foo;
}
thread::scope(|scope| {
scope.spawn(|_| {
// Initialize `FOO` and then `HANDLE`.
FOO.with(|_| ());
super::pin();
// At thread exit, `HANDLE` gets dropped first and `FOO` second.
});
})
.unwrap();
}
}

View File

@@ -0,0 +1,147 @@
use alloc::boxed::Box;
use core::fmt;
use core::marker::PhantomData;
use core::mem::{self, MaybeUninit};
use core::ptr;
/// Number of words a piece of `Data` can hold.
///
/// Three words should be enough for the majority of cases. For example, you can fit inside it the
/// function pointer together with a fat pointer representing an object that needs to be destroyed.
const DATA_WORDS: usize = 3;
/// Some space to keep a `FnOnce()` object on the stack.
type Data = [usize; DATA_WORDS];
/// A `FnOnce()` that is stored inline if small, or otherwise boxed on the heap.
///
/// This is a handy way of keeping an unsized `FnOnce()` within a sized structure.
pub(crate) struct Deferred {
call: unsafe fn(*mut u8),
data: MaybeUninit<Data>,
_marker: PhantomData<*mut ()>, // !Send + !Sync
}
impl fmt::Debug for Deferred {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.pad("Deferred { .. }")
}
}
impl Deferred {
pub(crate) const NO_OP: Self = {
fn no_op_call(_raw: *mut u8) {}
Self {
call: no_op_call,
data: MaybeUninit::uninit(),
_marker: PhantomData,
}
};
/// Constructs a new `Deferred` from a `FnOnce()`.
pub(crate) fn new<F: FnOnce()>(f: F) -> Self {
let size = mem::size_of::<F>();
let align = mem::align_of::<F>();
unsafe {
if size <= mem::size_of::<Data>() && align <= mem::align_of::<Data>() {
let mut data = MaybeUninit::<Data>::uninit();
ptr::write(data.as_mut_ptr().cast::<F>(), f);
unsafe fn call<F: FnOnce()>(raw: *mut u8) {
let f: F = ptr::read(raw.cast::<F>());
f();
}
Deferred {
call: call::<F>,
data,
_marker: PhantomData,
}
} else {
let b: Box<F> = Box::new(f);
let mut data = MaybeUninit::<Data>::uninit();
ptr::write(data.as_mut_ptr().cast::<Box<F>>(), b);
unsafe fn call<F: FnOnce()>(raw: *mut u8) {
// It's safe to cast `raw` from `*mut u8` to `*mut Box<F>`, because `raw` is
// originally derived from `*mut Box<F>`.
let b: Box<F> = ptr::read(raw.cast::<Box<F>>());
(*b)();
}
Deferred {
call: call::<F>,
data,
_marker: PhantomData,
}
}
}
}
/// Calls the function.
#[inline]
pub(crate) fn call(mut self) {
let call = self.call;
unsafe { call(self.data.as_mut_ptr().cast::<u8>()) };
}
}
#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
#![allow(clippy::drop_copy)]
use super::Deferred;
use std::cell::Cell;
#[test]
fn on_stack() {
let fired = &Cell::new(false);
let a = [0usize; 1];
let d = Deferred::new(move || {
drop(a);
fired.set(true);
});
assert!(!fired.get());
d.call();
assert!(fired.get());
}
#[test]
fn on_heap() {
let fired = &Cell::new(false);
let a = [0usize; 10];
let d = Deferred::new(move || {
drop(a);
fired.set(true);
});
assert!(!fired.get());
d.call();
assert!(fired.get());
}
#[test]
fn string() {
let a = "hello".to_string();
let d = Deferred::new(move || assert_eq!(a, "hello"));
d.call();
}
#[test]
fn boxed_slice_i32() {
let a: Box<[i32]> = vec![2, 3, 5, 7].into_boxed_slice();
let d = Deferred::new(move || assert_eq!(*a, [2, 3, 5, 7]));
d.call();
}
#[test]
fn long_slice_usize() {
let a: [usize; 5] = [2, 3, 5, 7, 11];
let d = Deferred::new(move || assert_eq!(a, [2, 3, 5, 7, 11]));
d.call();
}
}

View File

@@ -0,0 +1,133 @@
//! The global epoch
//!
//! The last bit in this number is unused and is always zero. Every so often the global epoch is
//! incremented, i.e. we say it "advances". A pinned participant may advance the global epoch only
//! if all currently pinned participants have been pinned in the current epoch.
//!
//! If an object became garbage in some epoch, then we can be sure that after two advancements no
//! participant will hold a reference to it. That is the crux of safe memory reclamation.
use crate::primitive::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering;
/// An epoch that can be marked as pinned or unpinned.
///
/// Internally, the epoch is represented as an integer that wraps around at some unspecified point
/// and a flag that represents whether it is pinned or unpinned.
#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
pub(crate) struct Epoch {
/// The least significant bit is set if pinned. The rest of the bits hold the epoch.
data: usize,
}
impl Epoch {
/// Returns the starting epoch in unpinned state.
#[inline]
pub(crate) fn starting() -> Self {
Self::default()
}
/// Returns the number of epochs `self` is ahead of `rhs`.
///
/// Internally, epochs are represented as numbers in the range `(isize::MIN / 2) .. (isize::MAX
/// / 2)`, so the returned distance will be in the same interval.
pub(crate) fn wrapping_sub(self, rhs: Self) -> isize {
// The result is the same with `(self.data & !1).wrapping_sub(rhs.data & !1) as isize >> 1`,
// because the possible difference of LSB in `(self.data & !1).wrapping_sub(rhs.data & !1)`
// will be ignored in the shift operation.
self.data.wrapping_sub(rhs.data & !1) as isize >> 1
}
/// Returns `true` if the epoch is marked as pinned.
#[inline]
pub(crate) fn is_pinned(self) -> bool {
(self.data & 1) == 1
}
/// Returns the same epoch, but marked as pinned.
#[inline]
pub(crate) fn pinned(self) -> Epoch {
Epoch {
data: self.data | 1,
}
}
/// Returns the same epoch, but marked as unpinned.
#[inline]
pub(crate) fn unpinned(self) -> Epoch {
Epoch {
data: self.data & !1,
}
}
/// Returns the successor epoch.
///
/// The returned epoch will be marked as pinned only if the previous one was as well.
#[inline]
pub(crate) fn successor(self) -> Epoch {
Epoch {
data: self.data.wrapping_add(2),
}
}
}
/// An atomic value that holds an `Epoch`.
#[derive(Default, Debug)]
pub(crate) struct AtomicEpoch {
/// Since `Epoch` is just a wrapper around `usize`, an `AtomicEpoch` is similarly represented
/// using an `AtomicUsize`.
data: AtomicUsize,
}
impl AtomicEpoch {
/// Creates a new atomic epoch.
#[inline]
pub(crate) fn new(epoch: Epoch) -> Self {
let data = AtomicUsize::new(epoch.data);
AtomicEpoch { data }
}
/// Loads a value from the atomic epoch.
#[inline]
pub(crate) fn load(&self, ord: Ordering) -> Epoch {
Epoch {
data: self.data.load(ord),
}
}
/// Stores a value into the atomic epoch.
#[inline]
pub(crate) fn store(&self, epoch: Epoch, ord: Ordering) {
self.data.store(epoch.data, ord);
}
/// Stores a value into the atomic epoch if the current value is the same as `current`.
///
/// The return value is a result indicating whether the new value was written and containing
/// the previous value. On success this value is guaranteed to be equal to `current`.
///
/// This method takes two `Ordering` arguments to describe the memory
/// ordering of this operation. `success` describes the required ordering for the
/// read-modify-write operation that takes place if the comparison with `current` succeeds.
/// `failure` describes the required ordering for the load operation that takes place when
/// the comparison fails. Using `Acquire` as success ordering makes the store part
/// of this operation `Relaxed`, and using `Release` makes the successful load
/// `Relaxed`. The failure ordering can only be `SeqCst`, `Acquire` or `Relaxed`
/// and must be equivalent to or weaker than the success ordering.
#[inline]
pub(crate) fn compare_exchange(
&self,
current: Epoch,
new: Epoch,
success: Ordering,
failure: Ordering,
) -> Result<Epoch, Epoch> {
match self
.data
.compare_exchange(current.data, new.data, success, failure)
{
Ok(data) => Ok(Epoch { data }),
Err(data) => Err(Epoch { data }),
}
}
}

View File

@@ -0,0 +1,520 @@
use core::fmt;
use core::mem;
use scopeguard::defer;
use crate::atomic::Shared;
use crate::collector::Collector;
use crate::deferred::Deferred;
use crate::internal::Local;
/// A guard that keeps the current thread pinned.
///
/// # Pinning
///
/// The current thread is pinned by calling [`pin`], which returns a new guard:
///
/// ```
/// use crossbeam_epoch as epoch;
///
/// // It is often convenient to prefix a call to `pin` with a `&` in order to create a reference.
/// // This is not really necessary, but makes passing references to the guard a bit easier.
/// let guard = &epoch::pin();
/// ```
///
/// When a guard gets dropped, the current thread is automatically unpinned.
///
/// # Pointers on the stack
///
/// Having a guard allows us to create pointers on the stack to heap-allocated objects.
/// For example:
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::SeqCst;
///
/// // Create a heap-allocated number.
/// let a = Atomic::new(777);
///
/// // Pin the current thread.
/// let guard = &epoch::pin();
///
/// // Load the heap-allocated object and create pointer `p` on the stack.
/// let p = a.load(SeqCst, guard);
///
/// // Dereference the pointer and print the value:
/// if let Some(num) = unsafe { p.as_ref() } {
/// println!("The number is {}.", num);
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
///
/// # Multiple guards
///
/// Pinning is reentrant and it is perfectly legal to create multiple guards. In that case, the
/// thread will actually be pinned only when the first guard is created and unpinned when the last
/// one is dropped:
///
/// ```
/// use crossbeam_epoch as epoch;
///
/// let guard1 = epoch::pin();
/// let guard2 = epoch::pin();
/// assert!(epoch::is_pinned());
/// drop(guard1);
/// assert!(epoch::is_pinned());
/// drop(guard2);
/// assert!(!epoch::is_pinned());
/// ```
///
/// [`pin`]: super::pin
pub struct Guard {
pub(crate) local: *const Local,
}
impl Guard {
/// Stores a function so that it can be executed at some point after all currently pinned
/// threads get unpinned.
///
/// This method first stores `f` into the thread-local (or handle-local) cache. If this cache
/// becomes full, some functions are moved into the global cache. At the same time, some
/// functions from both local and global caches may get executed in order to incrementally
/// clean up the caches as they fill up.
///
/// There is no guarantee when exactly `f` will be executed. The only guarantee is that it
/// won't be executed until all currently pinned threads get unpinned. In theory, `f` might
/// never run, but the epoch-based garbage collection will make an effort to execute it
/// reasonably soon.
///
/// If this method is called from an [`unprotected`] guard, the function will simply be
/// executed immediately.
pub fn defer<F, R>(&self, f: F)
where
F: FnOnce() -> R,
F: Send + 'static,
{
unsafe {
self.defer_unchecked(f);
}
}
/// Stores a function so that it can be executed at some point after all currently pinned
/// threads get unpinned.
///
/// This method first stores `f` into the thread-local (or handle-local) cache. If this cache
/// becomes full, some functions are moved into the global cache. At the same time, some
/// functions from both local and global caches may get executed in order to incrementally
/// clean up the caches as they fill up.
///
/// There is no guarantee when exactly `f` will be executed. The only guarantee is that it
/// won't be executed until all currently pinned threads get unpinned. In theory, `f` might
/// never run, but the epoch-based garbage collection will make an effort to execute it
/// reasonably soon.
///
/// If this method is called from an [`unprotected`] guard, the function will simply be
/// executed immediately.
///
/// # Safety
///
/// The given function must not hold reference onto the stack. It is highly recommended that
/// the passed function is **always** marked with `move` in order to prevent accidental
/// borrows.
///
/// ```
/// use crossbeam_epoch as epoch;
///
/// let guard = &epoch::pin();
/// let message = "Hello!";
/// unsafe {
/// // ALWAYS use `move` when sending a closure into `defer_unchecked`.
/// guard.defer_unchecked(move || {
/// println!("{}", message);
/// });
/// }
/// ```
///
/// Apart from that, keep in mind that another thread may execute `f`, so anything accessed by
/// the closure must be `Send`.
///
/// We intentionally didn't require `F: Send`, because Rust's type systems usually cannot prove
/// `F: Send` for typical use cases. For example, consider the following code snippet, which
/// exemplifies the typical use case of deferring the deallocation of a shared reference:
///
/// ```ignore
/// let shared = Owned::new(7i32).into_shared(guard);
/// guard.defer_unchecked(move || shared.into_owned()); // `Shared` is not `Send`!
/// ```
///
/// While `Shared` is not `Send`, it's safe for another thread to call the deferred function,
/// because it's called only after the grace period and `shared` is no longer shared with other
/// threads. But we don't expect type systems to prove this.
///
/// # Examples
///
/// When a heap-allocated object in a data structure becomes unreachable, it has to be
/// deallocated. However, the current thread and other threads may be still holding references
/// on the stack to that same object. Therefore it cannot be deallocated before those references
/// get dropped. This method can defer deallocation until all those threads get unpinned and
/// consequently drop all their references on the stack.
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic, Owned};
/// use std::sync::atomic::Ordering::SeqCst;
///
/// let a = Atomic::new("foo");
///
/// // Now suppose that `a` is shared among multiple threads and concurrently
/// // accessed and modified...
///
/// // Pin the current thread.
/// let guard = &epoch::pin();
///
/// // Steal the object currently stored in `a` and swap it with another one.
/// let p = a.swap(Owned::new("bar").into_shared(guard), SeqCst, guard);
///
/// if !p.is_null() {
/// // The object `p` is pointing to is now unreachable.
/// // Defer its deallocation until all currently pinned threads get unpinned.
/// unsafe {
/// // ALWAYS use `move` when sending a closure into `defer_unchecked`.
/// guard.defer_unchecked(move || {
/// println!("{} is now being deallocated.", p.deref());
/// // Now we have unique access to the object pointed to by `p` and can turn it
/// // into an `Owned`. Dropping the `Owned` will deallocate the object.
/// drop(p.into_owned());
/// });
/// }
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
pub unsafe fn defer_unchecked<F, R>(&self, f: F)
where
F: FnOnce() -> R,
{
if let Some(local) = self.local.as_ref() {
local.defer(Deferred::new(move || drop(f())), self);
} else {
drop(f());
}
}
/// Stores a destructor for an object so that it can be deallocated and dropped at some point
/// after all currently pinned threads get unpinned.
///
/// This method first stores the destructor into the thread-local (or handle-local) cache. If
/// this cache becomes full, some destructors are moved into the global cache. At the same
/// time, some destructors from both local and global caches may get executed in order to
/// incrementally clean up the caches as they fill up.
///
/// There is no guarantee when exactly the destructor will be executed. The only guarantee is
/// that it won't be executed until all currently pinned threads get unpinned. In theory, the
/// destructor might never run, but the epoch-based garbage collection will make an effort to
/// execute it reasonably soon.
///
/// If this method is called from an [`unprotected`] guard, the destructor will simply be
/// executed immediately.
///
/// # Safety
///
/// The object must not be reachable by other threads anymore, otherwise it might be still in
/// use when the destructor runs.
///
/// Apart from that, keep in mind that another thread may execute the destructor, so the object
/// must be sendable to other threads.
///
/// We intentionally didn't require `T: Send`, because Rust's type systems usually cannot prove
/// `T: Send` for typical use cases. For example, consider the following code snippet, which
/// exemplifies the typical use case of deferring the deallocation of a shared reference:
///
/// ```ignore
/// let shared = Owned::new(7i32).into_shared(guard);
/// guard.defer_destroy(shared); // `Shared` is not `Send`!
/// ```
///
/// While `Shared` is not `Send`, it's safe for another thread to call the destructor, because
/// it's called only after the grace period and `shared` is no longer shared with other
/// threads. But we don't expect type systems to prove this.
///
/// # Examples
///
/// When a heap-allocated object in a data structure becomes unreachable, it has to be
/// deallocated. However, the current thread and other threads may be still holding references
/// on the stack to that same object. Therefore it cannot be deallocated before those references
/// get dropped. This method can defer deallocation until all those threads get unpinned and
/// consequently drop all their references on the stack.
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic, Owned};
/// use std::sync::atomic::Ordering::SeqCst;
///
/// let a = Atomic::new("foo");
///
/// // Now suppose that `a` is shared among multiple threads and concurrently
/// // accessed and modified...
///
/// // Pin the current thread.
/// let guard = &epoch::pin();
///
/// // Steal the object currently stored in `a` and swap it with another one.
/// let p = a.swap(Owned::new("bar").into_shared(guard), SeqCst, guard);
///
/// if !p.is_null() {
/// // The object `p` is pointing to is now unreachable.
/// // Defer its deallocation until all currently pinned threads get unpinned.
/// unsafe {
/// guard.defer_destroy(p);
/// }
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
pub unsafe fn defer_destroy<T>(&self, ptr: Shared<'_, T>) {
self.defer_unchecked(move || ptr.into_owned());
}
/// Clears up the thread-local cache of deferred functions by executing them or moving into the
/// global cache.
///
/// Call this method after deferring execution of a function if you want to get it executed as
/// soon as possible. Flushing will make sure it is residing in in the global cache, so that
/// any thread has a chance of taking the function and executing it.
///
/// If this method is called from an [`unprotected`] guard, it is a no-op (nothing happens).
///
/// # Examples
///
/// ```
/// use crossbeam_epoch as epoch;
///
/// let guard = &epoch::pin();
/// guard.defer(move || {
/// println!("This better be printed as soon as possible!");
/// });
/// guard.flush();
/// ```
pub fn flush(&self) {
if let Some(local) = unsafe { self.local.as_ref() } {
local.flush(self);
}
}
/// Unpins and then immediately re-pins the thread.
///
/// This method is useful when you don't want delay the advancement of the global epoch by
/// holding an old epoch. For safety, you should not maintain any guard-based reference across
/// the call (the latter is enforced by `&mut self`). The thread will only be repinned if this
/// is the only active guard for the current thread.
///
/// If this method is called from an [`unprotected`] guard, then the call will be just no-op.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::SeqCst;
///
/// let a = Atomic::new(777);
/// let mut guard = epoch::pin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// guard.repin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
pub fn repin(&mut self) {
if let Some(local) = unsafe { self.local.as_ref() } {
local.repin();
}
}
/// Temporarily unpins the thread, executes the given function and then re-pins the thread.
///
/// This method is useful when you need to perform a long-running operation (e.g. sleeping)
/// and don't need to maintain any guard-based reference across the call (the latter is enforced
/// by `&mut self`). The thread will only be unpinned if this is the only active guard for the
/// current thread.
///
/// If this method is called from an [`unprotected`] guard, then the passed function is called
/// directly without unpinning the thread.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::SeqCst;
/// use std::thread;
/// use std::time::Duration;
///
/// let a = Atomic::new(777);
/// let mut guard = epoch::pin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// guard.repin_after(|| thread::sleep(Duration::from_millis(50)));
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
pub fn repin_after<F, R>(&mut self, f: F) -> R
where
F: FnOnce() -> R,
{
if let Some(local) = unsafe { self.local.as_ref() } {
// We need to acquire a handle here to ensure the Local doesn't
// disappear from under us.
local.acquire_handle();
local.unpin();
}
// Ensure the Guard is re-pinned even if the function panics
defer! {
if let Some(local) = unsafe { self.local.as_ref() } {
mem::forget(local.pin());
local.release_handle();
}
}
f()
}
/// Returns the `Collector` associated with this guard.
///
/// This method is useful when you need to ensure that all guards used with
/// a data structure come from the same collector.
///
/// If this method is called from an [`unprotected`] guard, then `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch as epoch;
///
/// let guard1 = epoch::pin();
/// let guard2 = epoch::pin();
/// assert!(guard1.collector() == guard2.collector());
/// ```
pub fn collector(&self) -> Option<&Collector> {
unsafe { self.local.as_ref().map(|local| local.collector()) }
}
}
impl Drop for Guard {
#[inline]
fn drop(&mut self) {
if let Some(local) = unsafe { self.local.as_ref() } {
local.unpin();
}
}
}
impl fmt::Debug for Guard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Guard { .. }")
}
}
/// Returns a reference to a dummy guard that allows unprotected access to [`Atomic`]s.
///
/// This guard should be used in special occasions only. Note that it doesn't actually keep any
/// thread pinned - it's just a fake guard that allows loading from [`Atomic`]s unsafely.
///
/// Note that calling [`defer`] with a dummy guard will not defer the function - it will just
/// execute the function immediately.
///
/// If necessary, it's possible to create more dummy guards by cloning: `unprotected().clone()`.
///
/// # Safety
///
/// Loading and dereferencing data from an [`Atomic`] using this guard is safe only if the
/// [`Atomic`] is not being concurrently modified by other threads.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::Relaxed;
///
/// let a = Atomic::new(7);
///
/// unsafe {
/// // Load `a` without pinning the current thread.
/// a.load(Relaxed, epoch::unprotected());
///
/// // It's possible to create more dummy guards by calling `clone()`.
/// let dummy = &epoch::unprotected().clone();
///
/// dummy.defer(move || {
/// println!("This gets executed immediately.");
/// });
///
/// // Dropping `dummy` doesn't affect the current thread - it's just a noop.
/// }
/// # unsafe { drop(a.into_owned()); } // avoid leak
/// ```
///
/// The most common use of this function is when constructing or destructing a data structure.
///
/// For example, we can use a dummy guard in the destructor of a Treiber stack because at that
/// point no other thread could concurrently modify the [`Atomic`]s we are accessing.
///
/// If we were to actually pin the current thread during destruction, that would just unnecessarily
/// delay garbage collection and incur some performance cost, so in cases like these `unprotected`
/// is very helpful.
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::mem::ManuallyDrop;
/// use std::sync::atomic::Ordering::Relaxed;
///
/// struct Stack<T> {
/// head: Atomic<Node<T>>,
/// }
///
/// struct Node<T> {
/// data: ManuallyDrop<T>,
/// next: Atomic<Node<T>>,
/// }
///
/// impl<T> Drop for Stack<T> {
/// fn drop(&mut self) {
/// unsafe {
/// // Unprotected load.
/// let mut node = self.head.load(Relaxed, epoch::unprotected());
///
/// while let Some(n) = node.as_ref() {
/// // Unprotected load.
/// let next = n.next.load(Relaxed, epoch::unprotected());
///
/// // Take ownership of the node, then drop its data and deallocate it.
/// let mut o = node.into_owned();
/// ManuallyDrop::drop(&mut o.data);
/// drop(o);
///
/// node = next;
/// }
/// }
/// }
/// }
/// ```
///
/// [`Atomic`]: super::Atomic
/// [`defer`]: Guard::defer
#[inline]
pub unsafe fn unprotected() -> &'static Guard {
// An unprotected guard is just a `Guard` with its field `local` set to null.
// We make a newtype over `Guard` because `Guard` isn't `Sync`, so can't be directly stored in
// a `static`
struct GuardWrapper(Guard);
unsafe impl Sync for GuardWrapper {}
static UNPROTECTED: GuardWrapper = GuardWrapper(Guard {
local: core::ptr::null(),
});
&UNPROTECTED.0
}

View File

@@ -0,0 +1,599 @@
//! The global data and participant for garbage collection.
//!
//! # Registration
//!
//! In order to track all participants in one place, we need some form of participant
//! registration. When a participant is created, it is registered to a global lock-free
//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
//! list.
//!
//! # Pinning
//!
//! Every participant contains an integer that tells whether the participant is pinned and if so,
//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
//! aids in periodic global epoch advancement.
//!
//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
//!
//! # Thread-local bag
//!
//! Objects that get unlinked from concurrent data structures must be stashed away until the global
//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
//! for amortizing the synchronization cost of pushing the garbages to a global queue.
//!
//! # Global queue
//!
//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
//! destroyed along the way. This design reduces contention on data structures. The global queue
//! cannot be explicitly accessed: the only way to interact with it is by calling functions
//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
//! garbage collection.
//!
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
//! destroyed as soon as the data structure gets dropped.
use crate::primitive::cell::UnsafeCell;
use crate::primitive::sync::atomic;
use core::cell::Cell;
use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::sync::atomic::Ordering;
use core::{fmt, ptr};
use crossbeam_utils::CachePadded;
use memoffset::offset_of;
use crate::atomic::{Owned, Shared};
use crate::collector::{Collector, LocalHandle};
use crate::deferred::Deferred;
use crate::epoch::{AtomicEpoch, Epoch};
use crate::guard::{unprotected, Guard};
use crate::sync::list::{Entry, IsElement, IterError, List};
use crate::sync::queue::Queue;
/// Maximum number of objects a bag can contain.
#[cfg(not(any(crossbeam_sanitize, miri)))]
const MAX_OBJECTS: usize = 64;
// Makes it more likely to trigger any potential data races.
#[cfg(any(crossbeam_sanitize, miri))]
const MAX_OBJECTS: usize = 4;
/// A bag of deferred functions.
pub(crate) struct Bag {
/// Stashed objects.
deferreds: [Deferred; MAX_OBJECTS],
len: usize,
}
/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
unsafe impl Send for Bag {}
impl Bag {
/// Returns a new, empty bag.
pub(crate) fn new() -> Self {
Self::default()
}
/// Returns `true` if the bag is empty.
pub(crate) fn is_empty(&self) -> bool {
self.len == 0
}
/// Attempts to insert a deferred function into the bag.
///
/// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
/// full.
///
/// # Safety
///
/// It should be safe for another thread to execute the given function.
pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
if self.len < MAX_OBJECTS {
self.deferreds[self.len] = deferred;
self.len += 1;
Ok(())
} else {
Err(deferred)
}
}
/// Seals the bag with the given epoch.
fn seal(self, epoch: Epoch) -> SealedBag {
SealedBag { epoch, _bag: self }
}
}
impl Default for Bag {
fn default() -> Self {
Bag {
len: 0,
deferreds: [Deferred::NO_OP; MAX_OBJECTS],
}
}
}
impl Drop for Bag {
fn drop(&mut self) {
// Call all deferred functions.
for deferred in &mut self.deferreds[..self.len] {
let no_op = Deferred::NO_OP;
let owned_deferred = mem::replace(deferred, no_op);
owned_deferred.call();
}
}
}
// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
impl fmt::Debug for Bag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Bag")
.field("deferreds", &&self.deferreds[..self.len])
.finish()
}
}
/// A pair of an epoch and a bag.
#[derive(Default, Debug)]
struct SealedBag {
epoch: Epoch,
_bag: Bag,
}
/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
unsafe impl Sync for SealedBag {}
impl SealedBag {
/// Checks if it is safe to drop the bag w.r.t. the given global epoch.
fn is_expired(&self, global_epoch: Epoch) -> bool {
// A pinned participant can witness at most one epoch advancement. Therefore, any bag that
// is within one epoch of the current one cannot be destroyed yet.
global_epoch.wrapping_sub(self.epoch) >= 2
}
}
/// The global data for a garbage collector.
pub(crate) struct Global {
/// The intrusive linked list of `Local`s.
locals: List<Local>,
/// The global queue of bags of deferred functions.
queue: Queue<SealedBag>,
/// The global epoch.
pub(crate) epoch: CachePadded<AtomicEpoch>,
}
impl Global {
/// Number of bags to destroy.
const COLLECT_STEPS: usize = 8;
/// Creates a new global data for garbage collection.
#[inline]
pub(crate) fn new() -> Self {
Self {
locals: List::new(),
queue: Queue::new(),
epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
}
}
/// Pushes the bag into the global queue and replaces the bag with a new empty bag.
pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let bag = mem::replace(bag, Bag::new());
atomic::fence(Ordering::SeqCst);
let epoch = self.epoch.load(Ordering::Relaxed);
self.queue.push(bag.seal(epoch), guard);
}
/// Collects several bags from the global queue and executes deferred functions in them.
///
/// Note: This may itself produce garbage and in turn allocate new bags.
///
/// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
/// path. In other words, we want the compiler to optimize branching for the case when
/// `collect()` is not called.
#[cold]
pub(crate) fn collect(&self, guard: &Guard) {
let global_epoch = self.try_advance(guard);
let steps = if cfg!(crossbeam_sanitize) {
usize::max_value()
} else {
Self::COLLECT_STEPS
};
for _ in 0..steps {
match self.queue.try_pop_if(
&|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
guard,
) {
None => break,
Some(sealed_bag) => drop(sealed_bag),
}
}
}
/// Attempts to advance the global epoch.
///
/// The global epoch can advance only if all currently pinned participants have been pinned in
/// the current epoch.
///
/// Returns the current global epoch.
///
/// `try_advance()` is annotated `#[cold]` because it is rarely called.
#[cold]
pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
// TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
// easy to implement in a lock-free manner. However, traversal can be slow due to cache
// misses and data dependencies. We should experiment with other data structures as well.
for local in self.locals.iter(guard) {
match local {
Err(IterError::Stalled) => {
// A concurrent thread stalled this iteration. That thread might also try to
// advance the epoch, in which case we leave the job to it. Otherwise, the
// epoch will not be advanced.
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Ordering::Relaxed);
// If the participant was pinned in a different epoch, we cannot advance the
// global epoch just yet.
if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
return global_epoch;
}
}
}
}
atomic::fence(Ordering::Acquire);
// All pinned participants were pinned in the current global epoch.
// Now let's advance the global epoch...
//
// Note that if another thread already advanced it before us, this store will simply
// overwrite the global epoch with the same value. This is true because `try_advance` was
// called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
// advanced two steps ahead of it.
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}
/// Participant for garbage collection.
pub(crate) struct Local {
/// A node in the intrusive linked list of `Local`s.
entry: Entry,
/// The local epoch.
epoch: AtomicEpoch,
/// A reference to the global data.
///
/// When all guards and handles get dropped, this reference is destroyed.
collector: UnsafeCell<ManuallyDrop<Collector>>,
/// The local bag of deferred functions.
pub(crate) bag: UnsafeCell<Bag>,
/// The number of guards keeping this participant pinned.
guard_count: Cell<usize>,
/// The number of active handles.
handle_count: Cell<usize>,
/// Total number of pinnings performed.
///
/// This is just an auxiliary counter that sometimes kicks off collection.
pin_count: Cell<Wrapping<usize>>,
}
// Make sure `Local` is less than or equal to 2048 bytes.
// https://github.com/crossbeam-rs/crossbeam/issues/551
#[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
#[test]
fn local_size() {
// TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
// assert!(
// core::mem::size_of::<Local>() <= 2048,
// "An allocation of `Local` should be <= 2048 bytes."
// );
}
impl Local {
/// Number of pinnings after which a participant will execute some deferred functions from the
/// global queue.
const PINNINGS_BETWEEN_COLLECT: usize = 128;
/// Registers a new `Local` in the provided `Global`.
pub(crate) fn register(collector: &Collector) -> LocalHandle {
unsafe {
// Since we dereference no pointers in this block, it is safe to use `unprotected`.
let local = Owned::new(Local {
entry: Entry::default(),
epoch: AtomicEpoch::new(Epoch::starting()),
collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
bag: UnsafeCell::new(Bag::new()),
guard_count: Cell::new(0),
handle_count: Cell::new(1),
pin_count: Cell::new(Wrapping(0)),
})
.into_shared(unprotected());
collector.global.locals.insert(local, unprotected());
LocalHandle {
local: local.as_raw(),
}
}
}
/// Returns a reference to the `Global` in which this `Local` resides.
#[inline]
pub(crate) fn global(&self) -> &Global {
&self.collector().global
}
/// Returns a reference to the `Collector` in which this `Local` resides.
#[inline]
pub(crate) fn collector(&self) -> &Collector {
self.collector.with(|c| unsafe { &**c })
}
/// Returns `true` if the current participant is pinned.
#[inline]
pub(crate) fn is_pinned(&self) -> bool {
self.guard_count.get() > 0
}
/// Adds `deferred` to the thread-local bag.
///
/// # Safety
///
/// It should be safe for another thread to execute the given function.
pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
let bag = self.bag.with_mut(|b| &mut *b);
while let Err(d) = bag.try_push(deferred) {
self.global().push_bag(bag, guard);
deferred = d;
}
}
pub(crate) fn flush(&self, guard: &Guard) {
let bag = self.bag.with_mut(|b| unsafe { &mut *b });
if !bag.is_empty() {
self.global().push_bag(bag, guard);
}
self.global().collect(guard);
}
/// Pins the `Local`.
#[inline]
pub(crate) fn pin(&self) -> Guard {
let guard = Guard { local: self };
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count.checked_add(1).unwrap());
if guard_count == 0 {
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
let new_epoch = global_epoch.pinned();
// Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
// The fence makes sure that any future loads from `Atomic`s will not happen before
// this store.
if cfg!(all(
any(target_arch = "x86", target_arch = "x86_64"),
not(miri)
)) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
// instruction.
//
// Both instructions have the effect of a full barrier, but benchmarks have shown
// that the second one makes pinning faster in this particular case. It is not
// clear that this is permitted by the C++ memory model (SC fences work very
// differently from SC accesses), but experimental evidence suggests that this
// works fine. Using inline assembly would be a viable (and correct) alternative,
// but alas, that is not possible on stable Rust.
let current = Epoch::starting();
let res = self.epoch.compare_exchange(
current,
new_epoch,
Ordering::SeqCst,
Ordering::SeqCst,
);
debug_assert!(res.is_ok(), "participant was expected to be unpinned");
// We add a compiler fence to make it less likely for LLVM to do something wrong
// here. Formally, this is not enough to get rid of data races; practically,
// it should go a long way.
atomic::compiler_fence(Ordering::SeqCst);
} else {
self.epoch.store(new_epoch, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}
// Increment the pin counter.
let count = self.pin_count.get();
self.pin_count.set(count + Wrapping(1));
// After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
// some garbage.
if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
self.global().collect(&guard);
}
}
guard
}
/// Unpins the `Local`.
#[inline]
pub(crate) fn unpin(&self) {
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count - 1);
if guard_count == 1 {
self.epoch.store(Epoch::starting(), Ordering::Release);
if self.handle_count.get() == 0 {
self.finalize();
}
}
}
/// Unpins and then pins the `Local`.
#[inline]
pub(crate) fn repin(&self) {
let guard_count = self.guard_count.get();
// Update the local epoch only if there's only one guard.
if guard_count == 1 {
let epoch = self.epoch.load(Ordering::Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
// Update the local epoch only if the global epoch is greater than the local epoch.
if epoch != global_epoch {
// We store the new epoch with `Release` because we need to ensure any memory
// accesses from the previous epoch do not leak into the new one.
self.epoch.store(global_epoch, Ordering::Release);
// However, we don't need a following `SeqCst` fence, because it is safe for memory
// accesses from the new epoch to be executed before updating the local epoch. At
// worse, other threads will see the new epoch late and delay GC slightly.
}
}
}
/// Increments the handle count.
#[inline]
pub(crate) fn acquire_handle(&self) {
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count + 1);
}
/// Decrements the handle count.
#[inline]
pub(crate) fn release_handle(&self) {
let guard_count = self.guard_count.get();
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count - 1);
if guard_count == 0 && handle_count == 1 {
self.finalize();
}
}
/// Removes the `Local` from the global linked list.
#[cold]
fn finalize(&self) {
debug_assert_eq!(self.guard_count.get(), 0);
debug_assert_eq!(self.handle_count.get(), 0);
// Temporarily increment handle count. This is required so that the following call to `pin`
// doesn't call `finalize` again.
self.handle_count.set(1);
unsafe {
// Pin and move the local bag into the global queue. It's important that `push_bag`
// doesn't defer destruction on any new garbage.
let guard = &self.pin();
self.global()
.push_bag(self.bag.with_mut(|b| &mut *b), guard);
}
// Revert the handle count back to zero.
self.handle_count.set(0);
unsafe {
// Take the reference to the `Global` out of this `Local`. Since we're not protected
// by a guard at this time, it's crucial that the reference is read before marking the
// `Local` as deleted.
let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
// Mark this node in the linked list as deleted.
self.entry.delete(unprotected());
// Finally, drop the reference to the global. Note that this might be the last reference
// to the `Global`. If so, the global data will be destroyed and all deferred functions
// in its queue will be executed.
drop(collector);
}
}
}
impl IsElement<Local> for Local {
fn entry_of(local: &Local) -> &Entry {
let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
unsafe { &*entry_ptr }
}
unsafe fn element_of(entry: &Entry) -> &Local {
// offset_of! macro uses unsafe, but it's unnecessary in this context.
#[allow(unused_unsafe)]
let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
&*local_ptr
}
unsafe fn finalize(entry: &Entry, guard: &Guard) {
guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
}
}
#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
#[test]
fn check_defer() {
static FLAG: AtomicUsize = AtomicUsize::new(0);
fn set() {
FLAG.store(42, Ordering::Relaxed);
}
let d = Deferred::new(set);
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
d.call();
assert_eq!(FLAG.load(Ordering::Relaxed), 42);
}
#[test]
fn check_bag() {
static FLAG: AtomicUsize = AtomicUsize::new(0);
fn incr() {
FLAG.fetch_add(1, Ordering::Relaxed);
}
let mut bag = Bag::new();
assert!(bag.is_empty());
for _ in 0..MAX_OBJECTS {
assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
assert!(!bag.is_empty());
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
}
let result = unsafe { bag.try_push(Deferred::new(incr)) };
assert!(result.is_err());
assert!(!bag.is_empty());
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
drop(bag);
assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
}
}

View File

@@ -0,0 +1,169 @@
//! Epoch-based memory reclamation.
//!
//! An interesting problem concurrent collections deal with comes from the remove operation.
//! Suppose that a thread removes an element from a lock-free map, while another thread is reading
//! that same element at the same time. The first thread must wait until the second thread stops
//! reading the element. Only then it is safe to destruct it.
//!
//! Programming languages that come with garbage collectors solve this problem trivially. The
//! garbage collector will destruct the removed element when no thread can hold a reference to it
//! anymore.
//!
//! This crate implements a basic memory reclamation mechanism, which is based on epochs. When an
//! element gets removed from a concurrent collection, it is inserted into a pile of garbage and
//! marked with the current epoch. Every time a thread accesses a collection, it checks the current
//! epoch, attempts to increment it, and destructs some garbage that became so old that no thread
//! can be referencing it anymore.
//!
//! That is the general mechanism behind epoch-based memory reclamation, but the details are a bit
//! more complicated. Anyhow, memory reclamation is designed to be fully automatic and something
//! users of concurrent collections don't have to worry much about.
//!
//! # Pointers
//!
//! Concurrent collections are built using atomic pointers. This module provides [`Atomic`], which
//! is just a shared atomic pointer to a heap-allocated object. Loading an [`Atomic`] yields a
//! [`Shared`], which is an epoch-protected pointer through which the loaded object can be safely
//! read.
//!
//! # Pinning
//!
//! Before an [`Atomic`] can be loaded, a participant must be [`pin`]ned. By pinning a participant
//! we declare that any object that gets removed from now on must not be destructed just
//! yet. Garbage collection of newly removed objects is suspended until the participant gets
//! unpinned.
//!
//! # Garbage
//!
//! Objects that get removed from concurrent collections must be stashed away until all currently
//! pinned participants get unpinned. Such objects can be stored into a thread-local or global
//! storage, where they are kept until the right time for their destruction comes.
//!
//! There is a global shared instance of garbage queue. You can [`defer`](Guard::defer) the execution of an
//! arbitrary function until the global epoch is advanced enough. Most notably, concurrent data
//! structures may defer the deallocation of an object.
//!
//! # APIs
//!
//! For majority of use cases, just use the default garbage collector by invoking [`pin`]. If you
//! want to create your own garbage collector, use the [`Collector`] API.
#![doc(test(
no_crate_inject,
attr(
deny(warnings, rust_2018_idioms),
allow(dead_code, unused_assignments, unused_variables)
)
))]
#![warn(
missing_docs,
missing_debug_implementations,
rust_2018_idioms,
unreachable_pub
)]
#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(crossbeam_loom)]
extern crate loom_crate as loom;
use cfg_if::cfg_if;
#[cfg(crossbeam_loom)]
#[allow(unused_imports, dead_code)]
mod primitive {
pub(crate) mod cell {
pub(crate) use loom::cell::UnsafeCell;
}
pub(crate) mod sync {
pub(crate) mod atomic {
use core::sync::atomic::Ordering;
pub(crate) use loom::sync::atomic::{fence, AtomicUsize};
// FIXME: loom does not support compiler_fence at the moment.
// https://github.com/tokio-rs/loom/issues/117
// we use fence as a stand-in for compiler_fence for the time being.
// this may miss some races since fence is stronger than compiler_fence,
// but it's the best we can do for the time being.
pub(crate) use self::fence as compiler_fence;
}
pub(crate) use loom::sync::Arc;
}
pub(crate) use loom::thread_local;
}
#[cfg(not(crossbeam_no_atomic_cas))]
#[cfg(not(crossbeam_loom))]
#[allow(unused_imports, dead_code)]
mod primitive {
#[cfg(feature = "alloc")]
pub(crate) mod cell {
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct UnsafeCell<T>(::core::cell::UnsafeCell<T>);
// loom's UnsafeCell has a slightly different API than the standard library UnsafeCell.
// Since we want the rest of the code to be agnostic to whether it's running under loom or
// not, we write this small wrapper that provides the loom-supported API for the standard
// library UnsafeCell. This is also what the loom documentation recommends:
// https://github.com/tokio-rs/loom#handling-loom-api-differences
impl<T> UnsafeCell<T> {
#[inline]
pub(crate) const fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(::core::cell::UnsafeCell::new(data))
}
#[inline]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}
#[inline]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
}
}
#[cfg(feature = "alloc")]
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use core::sync::atomic::compiler_fence;
pub(crate) use core::sync::atomic::fence;
pub(crate) use core::sync::atomic::AtomicUsize;
}
pub(crate) use alloc::sync::Arc;
}
#[cfg(feature = "std")]
pub(crate) use std::thread_local;
}
#[cfg(not(crossbeam_no_atomic_cas))]
cfg_if! {
if #[cfg(feature = "alloc")] {
extern crate alloc;
mod atomic;
mod collector;
mod deferred;
mod epoch;
mod guard;
mod internal;
mod sync;
pub use self::atomic::{
Pointable, Atomic, CompareExchangeError,
Owned, Pointer, Shared,
};
pub use self::collector::{Collector, LocalHandle};
pub use self::guard::{unprotected, Guard};
#[allow(deprecated)]
pub use self::atomic::{CompareAndSetError, CompareAndSetOrdering};
}
}
cfg_if! {
if #[cfg(feature = "std")] {
mod default;
pub use self::default::{default_collector, is_pinned, pin};
}
}

View File

@@ -0,0 +1,487 @@
//! Lock-free intrusive linked list.
//!
//! Ideas from Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA
//! 2002. <http://dl.acm.org/citation.cfm?id=564870.564881>
use core::marker::PhantomData;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::{unprotected, Atomic, Guard, Shared};
/// An entry in a linked list.
///
/// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different
/// cache-line than thread-local data in terms of performance.
#[derive(Debug)]
pub(crate) struct Entry {
/// The next entry in the linked list.
/// If the tag is 1, this entry is marked as deleted.
next: Atomic<Entry>,
}
/// Implementing this trait asserts that the type `T` can be used as an element in the intrusive
/// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance
/// of `Entry`.
///
/// # Example
///
/// ```ignore
/// struct A {
/// entry: Entry,
/// data: usize,
/// }
///
/// impl IsElement<A> for A {
/// fn entry_of(a: &A) -> &Entry {
/// let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry;
/// unsafe { &*entry_ptr }
/// }
///
/// unsafe fn element_of(entry: &Entry) -> &T {
/// let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T;
/// &*elem_ptr
/// }
///
/// unsafe fn finalize(entry: &Entry, guard: &Guard) {
/// guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
/// }
/// }
/// ```
///
/// This trait is implemented on a type separate from `T` (although it can be just `T`), because
/// one type might be placeable into multiple lists, in which case it would require multiple
/// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>`
/// represents a distinct `Entry` in `T`.
///
/// For example, we can insert the following struct into two lists using `entry1` for one
/// and `entry2` for the other:
///
/// ```ignore
/// struct B {
/// entry1: Entry,
/// entry2: Entry,
/// data: usize,
/// }
/// ```
///
pub(crate) trait IsElement<T> {
/// Returns a reference to this element's `Entry`.
fn entry_of(_: &T) -> &Entry;
/// Given a reference to an element's entry, returns that element.
///
/// ```ignore
/// let elem = ListElement::new();
/// assert_eq!(elem.entry_of(),
/// unsafe { ListElement::element_of(elem.entry_of()) } );
/// ```
///
/// # Safety
///
/// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
/// of the element type (`T`).
unsafe fn element_of(_: &Entry) -> &T;
/// The function that is called when an entry is unlinked from list.
///
/// # Safety
///
/// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
/// of the element type (`T`).
unsafe fn finalize(_: &Entry, _: &Guard);
}
/// A lock-free, intrusive linked list of type `T`.
#[derive(Debug)]
pub(crate) struct List<T, C: IsElement<T> = T> {
/// The head of the linked list.
head: Atomic<Entry>,
/// The phantom data for using `T` and `C`.
_marker: PhantomData<(T, C)>,
}
/// An iterator used for retrieving values from the list.
pub(crate) struct Iter<'g, T, C: IsElement<T>> {
/// The guard that protects the iteration.
guard: &'g Guard,
/// Pointer from the predecessor to the current entry.
pred: &'g Atomic<Entry>,
/// The current entry.
curr: Shared<'g, Entry>,
/// The list head, needed for restarting iteration.
head: &'g Atomic<Entry>,
/// Logically, we store a borrow of an instance of `T` and
/// use the type information from `C`.
_marker: PhantomData<(&'g T, C)>,
}
/// An error that occurs during iteration over the list.
#[derive(PartialEq, Debug)]
pub(crate) enum IterError {
/// A concurrent thread modified the state of the list at the same place that this iterator
/// was inspecting. Subsequent iteration will restart from the beginning of the list.
Stalled,
}
impl Default for Entry {
/// Returns the empty entry.
fn default() -> Self {
Self {
next: Atomic::null(),
}
}
}
impl Entry {
/// Marks this entry as deleted, deferring the actual deallocation to a later iteration.
///
/// # Safety
///
/// The entry should be a member of a linked list, and it should not have been deleted.
/// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C`
/// is the associated helper for the linked list.
pub(crate) unsafe fn delete(&self, guard: &Guard) {
self.next.fetch_or(1, Release, guard);
}
}
impl<T, C: IsElement<T>> List<T, C> {
/// Returns a new, empty linked list.
pub(crate) fn new() -> Self {
Self {
head: Atomic::null(),
_marker: PhantomData,
}
}
/// Inserts `entry` into the head of the list.
///
/// # Safety
///
/// You should guarantee that:
///
/// - `container` is not null
/// - `container` is immovable, e.g. inside an `Owned`
/// - the same `Entry` is not inserted more than once
/// - the inserted object will be removed before the list is dropped
pub(crate) unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) {
// Insert right after head, i.e. at the beginning of the list.
let to = &self.head;
// Get the intrusively stored Entry of the new element to insert.
let entry: &Entry = C::entry_of(container.deref());
// Make a Shared ptr to that Entry.
let entry_ptr = Shared::from(entry as *const _);
// Read the current successor of where we want to insert.
let mut next = to.load(Relaxed, guard);
loop {
// Set the Entry of the to-be-inserted element to point to the previous successor of
// `to`.
entry.next.store(next, Relaxed);
match to.compare_exchange_weak(next, entry_ptr, Release, Relaxed, guard) {
Ok(_) => break,
// We lost the race or weak CAS failed spuriously. Update the successor and try
// again.
Err(err) => next = err.current,
}
}
}
/// Returns an iterator over all objects.
///
/// # Caveat
///
/// Every object that is inserted at the moment this function is called and persists at least
/// until the end of iteration will be returned. Since this iterator traverses a lock-free
/// linked list that may be concurrently modified, some additional caveats apply:
///
/// 1. If a new object is inserted during iteration, it may or may not be returned.
/// 2. If an object is deleted during iteration, it may or may not be returned.
/// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning
/// thread will continue to iterate over the same list.
pub(crate) fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> {
Iter {
guard,
pred: &self.head,
curr: self.head.load(Acquire, guard),
head: &self.head,
_marker: PhantomData,
}
}
}
impl<T, C: IsElement<T>> Drop for List<T, C> {
fn drop(&mut self) {
unsafe {
let guard = unprotected();
let mut curr = self.head.load(Relaxed, guard);
while let Some(c) = curr.as_ref() {
let succ = c.next.load(Relaxed, guard);
// Verify that all elements have been removed from the list.
assert_eq!(succ.tag(), 1);
C::finalize(curr.deref(), guard);
curr = succ;
}
}
}
}
impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
type Item = Result<&'g T, IterError>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(c) = unsafe { self.curr.as_ref() } {
let succ = c.next.load(Acquire, self.guard);
if succ.tag() == 1 {
// This entry was removed. Try unlinking it from the list.
let succ = succ.with_tag(0);
// The tag should always be zero, because removing a node after a logically deleted
// node leaves the list in an invalid state.
debug_assert!(self.curr.tag() == 0);
// Try to unlink `curr` from the list, and get the new value of `self.pred`.
let succ = match self
.pred
.compare_exchange(self.curr, succ, Acquire, Acquire, self.guard)
{
Ok(_) => {
// We succeeded in unlinking `curr`, so we have to schedule
// deallocation. Deferred drop is okay, because `list.delete()` can only be
// called if `T: 'static`.
unsafe {
C::finalize(self.curr.deref(), self.guard);
}
// `succ` is the new value of `self.pred`.
succ
}
Err(e) => {
// `e.current` is the current value of `self.pred`.
e.current
}
};
// If the predecessor node is already marked as deleted, we need to restart from
// `head`.
if succ.tag() != 0 {
self.pred = self.head;
self.curr = self.head.load(Acquire, self.guard);
return Some(Err(IterError::Stalled));
}
// Move over the removed by only advancing `curr`, not `pred`.
self.curr = succ;
continue;
}
// Move one step forward.
self.pred = &c.next;
self.curr = succ;
return Some(Ok(unsafe { C::element_of(c) }));
}
// We reached the end of the list.
None
}
}
#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
use super::*;
use crate::{Collector, Owned};
use crossbeam_utils::thread;
use std::sync::Barrier;
impl IsElement<Entry> for Entry {
fn entry_of(entry: &Entry) -> &Entry {
entry
}
unsafe fn element_of(entry: &Entry) -> &Entry {
entry
}
unsafe fn finalize(entry: &Entry, guard: &Guard) {
guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
}
}
/// Checks whether the list retains inserted elements
/// and returns them in the correct order.
#[test]
fn insert() {
let collector = Collector::new();
let handle = collector.register();
let guard = handle.pin();
let l: List<Entry> = List::new();
let e1 = Owned::new(Entry::default()).into_shared(&guard);
let e2 = Owned::new(Entry::default()).into_shared(&guard);
let e3 = Owned::new(Entry::default()).into_shared(&guard);
unsafe {
l.insert(e1, &guard);
l.insert(e2, &guard);
l.insert(e3, &guard);
}
let mut iter = l.iter(&guard);
let maybe_e3 = iter.next();
assert!(maybe_e3.is_some());
assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
let maybe_e2 = iter.next();
assert!(maybe_e2.is_some());
assert!(maybe_e2.unwrap().unwrap() as *const Entry == e2.as_raw());
let maybe_e1 = iter.next();
assert!(maybe_e1.is_some());
assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
assert!(iter.next().is_none());
unsafe {
e1.as_ref().unwrap().delete(&guard);
e2.as_ref().unwrap().delete(&guard);
e3.as_ref().unwrap().delete(&guard);
}
}
/// Checks whether elements can be removed from the list and whether
/// the correct elements are removed.
#[test]
fn delete() {
let collector = Collector::new();
let handle = collector.register();
let guard = handle.pin();
let l: List<Entry> = List::new();
let e1 = Owned::new(Entry::default()).into_shared(&guard);
let e2 = Owned::new(Entry::default()).into_shared(&guard);
let e3 = Owned::new(Entry::default()).into_shared(&guard);
unsafe {
l.insert(e1, &guard);
l.insert(e2, &guard);
l.insert(e3, &guard);
e2.as_ref().unwrap().delete(&guard);
}
let mut iter = l.iter(&guard);
let maybe_e3 = iter.next();
assert!(maybe_e3.is_some());
assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
let maybe_e1 = iter.next();
assert!(maybe_e1.is_some());
assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
assert!(iter.next().is_none());
unsafe {
e1.as_ref().unwrap().delete(&guard);
e3.as_ref().unwrap().delete(&guard);
}
let mut iter = l.iter(&guard);
assert!(iter.next().is_none());
}
const THREADS: usize = 8;
const ITERS: usize = 512;
/// Contends the list on insert and delete operations to make sure they can run concurrently.
#[test]
fn insert_delete_multi() {
let collector = Collector::new();
let l: List<Entry> = List::new();
let b = Barrier::new(THREADS);
thread::scope(|s| {
for _ in 0..THREADS {
s.spawn(|_| {
b.wait();
let handle = collector.register();
let guard: Guard = handle.pin();
let mut v = Vec::with_capacity(ITERS);
for _ in 0..ITERS {
let e = Owned::new(Entry::default()).into_shared(&guard);
v.push(e);
unsafe {
l.insert(e, &guard);
}
}
for e in v {
unsafe {
e.as_ref().unwrap().delete(&guard);
}
}
});
}
})
.unwrap();
let handle = collector.register();
let guard = handle.pin();
let mut iter = l.iter(&guard);
assert!(iter.next().is_none());
}
/// Contends the list on iteration to make sure that it can be iterated over concurrently.
#[test]
fn iter_multi() {
let collector = Collector::new();
let l: List<Entry> = List::new();
let b = Barrier::new(THREADS);
thread::scope(|s| {
for _ in 0..THREADS {
s.spawn(|_| {
b.wait();
let handle = collector.register();
let guard: Guard = handle.pin();
let mut v = Vec::with_capacity(ITERS);
for _ in 0..ITERS {
let e = Owned::new(Entry::default()).into_shared(&guard);
v.push(e);
unsafe {
l.insert(e, &guard);
}
}
let mut iter = l.iter(&guard);
for _ in 0..ITERS {
assert!(iter.next().is_some());
}
for e in v {
unsafe {
e.as_ref().unwrap().delete(&guard);
}
}
});
}
})
.unwrap();
let handle = collector.register();
let guard = handle.pin();
let mut iter = l.iter(&guard);
assert!(iter.next().is_none());
}
}

View File

@@ -0,0 +1,7 @@
//! Synchronization primitives.
pub(crate) mod list;
#[cfg(feature = "std")]
#[cfg(not(crossbeam_loom))]
pub(crate) mod once_lock;
pub(crate) mod queue;

View File

@@ -0,0 +1,103 @@
// Based on unstable std::sync::OnceLock.
//
// Source: https://github.com/rust-lang/rust/blob/8e9c93df464b7ada3fc7a1c8ccddd9dcb24ee0a0/library/std/src/sync/once_lock.rs
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
pub(crate) struct OnceLock<T> {
once: Once,
// Once::is_completed requires Rust 1.43, so use this to track of whether they have been initialized.
is_initialized: AtomicBool,
value: UnsafeCell<MaybeUninit<T>>,
// Unlike std::sync::OnceLock, we don't need PhantomData here because
// we don't use #[may_dangle].
}
unsafe impl<T: Sync + Send> Sync for OnceLock<T> {}
unsafe impl<T: Send> Send for OnceLock<T> {}
impl<T> OnceLock<T> {
/// Creates a new empty cell.
#[must_use]
pub(crate) const fn new() -> Self {
Self {
once: Once::new(),
is_initialized: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
/// Gets the contents of the cell, initializing it with `f` if the cell
/// was empty.
///
/// Many threads may call `get_or_init` concurrently with different
/// initializing functions, but it is guaranteed that only one function
/// will be executed.
///
/// # Panics
///
/// If `f` panics, the panic is propagated to the caller, and the cell
/// remains uninitialized.
///
/// It is an error to reentrantly initialize the cell from `f`. The
/// exact outcome is unspecified. Current implementation deadlocks, but
/// this may be changed to a panic in the future.
pub(crate) fn get_or_init<F>(&self, f: F) -> &T
where
F: FnOnce() -> T,
{
// Fast path check
if self.is_initialized() {
// SAFETY: The inner value has been initialized
return unsafe { self.get_unchecked() };
}
self.initialize(f);
debug_assert!(self.is_initialized());
// SAFETY: The inner value has been initialized
unsafe { self.get_unchecked() }
}
#[inline]
fn is_initialized(&self) -> bool {
self.is_initialized.load(Ordering::Acquire)
}
#[cold]
fn initialize<F>(&self, f: F)
where
F: FnOnce() -> T,
{
let slot = self.value.get().cast::<T>();
let is_initialized = &self.is_initialized;
self.once.call_once(|| {
let value = f();
unsafe {
slot.write(value);
}
is_initialized.store(true, Ordering::Release);
});
}
/// # Safety
///
/// The value must be initialized
unsafe fn get_unchecked(&self) -> &T {
debug_assert!(self.is_initialized());
&*self.value.get().cast::<T>()
}
}
impl<T> Drop for OnceLock<T> {
fn drop(&mut self) {
if self.is_initialized() {
// SAFETY: The inner value has been initialized
unsafe { self.value.get().cast::<T>().drop_in_place() };
}
}
}

View File

@@ -0,0 +1,469 @@
//! Michael-Scott lock-free queue.
//!
//! Usable with any number of producers and consumers.
//!
//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
//!
//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
use core::mem::MaybeUninit;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crossbeam_utils::CachePadded;
use crate::{unprotected, Atomic, Guard, Owned, Shared};
// The representation here is a singly-linked list, with a sentinel node at the front. In general
// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
// all `Blocked` (requests for data from blocked threads).
#[derive(Debug)]
pub(crate) struct Queue<T> {
head: CachePadded<Atomic<Node<T>>>,
tail: CachePadded<Atomic<Node<T>>>,
}
struct Node<T> {
/// The slot in which a value of type `T` can be stored.
///
/// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
/// For example, the sentinel node in a queue never contains a value: its slot is always empty.
/// Other nodes start their life with a push operation and contain a value until it gets popped
/// out. After that such empty nodes get added to the collector for destruction.
data: MaybeUninit<T>,
next: Atomic<Node<T>>,
}
// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
unsafe impl<T: Send> Sync for Queue<T> {}
unsafe impl<T: Send> Send for Queue<T> {}
impl<T> Queue<T> {
/// Create a new, empty queue.
pub(crate) fn new() -> Queue<T> {
let q = Queue {
head: CachePadded::new(Atomic::null()),
tail: CachePadded::new(Atomic::null()),
};
let sentinel = Owned::new(Node {
data: MaybeUninit::uninit(),
next: Atomic::null(),
});
unsafe {
let guard = unprotected();
let sentinel = sentinel.into_shared(guard);
q.head.store(sentinel, Relaxed);
q.tail.store(sentinel, Relaxed);
q
}
}
/// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
/// success. The queue's `tail` pointer may be updated.
#[inline(always)]
fn push_internal(
&self,
onto: Shared<'_, Node<T>>,
new: Shared<'_, Node<T>>,
guard: &Guard,
) -> bool {
// is `onto` the actual tail?
let o = unsafe { onto.deref() };
let next = o.next.load(Acquire, guard);
if unsafe { next.as_ref().is_some() } {
// if not, try to "help" by moving the tail pointer forward
let _ = self
.tail
.compare_exchange(onto, next, Release, Relaxed, guard);
false
} else {
// looks like the actual tail; attempt to link in `n`
let result = o
.next
.compare_exchange(Shared::null(), new, Release, Relaxed, guard)
.is_ok();
if result {
// try to move the tail pointer forward
let _ = self
.tail
.compare_exchange(onto, new, Release, Relaxed, guard);
}
result
}
}
/// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
pub(crate) fn push(&self, t: T, guard: &Guard) {
let new = Owned::new(Node {
data: MaybeUninit::new(t),
next: Atomic::null(),
});
let new = Owned::into_shared(new, guard);
loop {
// We push onto the tail, so we'll start optimistically by looking there first.
let tail = self.tail.load(Acquire, guard);
// Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
if self.push_internal(tail, new, guard) {
break;
}
}
}
/// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
#[inline(always)]
fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
let head = self.head.load(Acquire, guard);
let h = unsafe { head.deref() };
let next = h.next.load(Acquire, guard);
match unsafe { next.as_ref() } {
Some(n) => unsafe {
self.head
.compare_exchange(head, next, Release, Relaxed, guard)
.map(|_| {
let tail = self.tail.load(Relaxed, guard);
// Advance the tail so that we don't retire a pointer to a reachable node.
if head == tail {
let _ = self
.tail
.compare_exchange(tail, next, Release, Relaxed, guard);
}
guard.defer_destroy(head);
// TODO: Replace with MaybeUninit::read when api is stable
Some(n.data.as_ptr().read())
})
.map_err(|_| ())
},
None => Ok(None),
}
}
/// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
/// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
#[inline(always)]
fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
where
T: Sync,
F: Fn(&T) -> bool,
{
let head = self.head.load(Acquire, guard);
let h = unsafe { head.deref() };
let next = h.next.load(Acquire, guard);
match unsafe { next.as_ref() } {
Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
self.head
.compare_exchange(head, next, Release, Relaxed, guard)
.map(|_| {
let tail = self.tail.load(Relaxed, guard);
// Advance the tail so that we don't retire a pointer to a reachable node.
if head == tail {
let _ = self
.tail
.compare_exchange(tail, next, Release, Relaxed, guard);
}
guard.defer_destroy(head);
Some(n.data.as_ptr().read())
})
.map_err(|_| ())
},
None | Some(_) => Ok(None),
}
}
/// Attempts to dequeue from the front.
///
/// Returns `None` if the queue is observed to be empty.
pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
loop {
if let Ok(head) = self.pop_internal(guard) {
return head;
}
}
}
/// Attempts to dequeue from the front, if the item satisfies the given condition.
///
/// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
/// condition.
pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
where
T: Sync,
F: Fn(&T) -> bool,
{
loop {
if let Ok(head) = self.pop_if_internal(&condition, guard) {
return head;
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let guard = unprotected();
while self.try_pop(guard).is_some() {}
// Destroy the remaining sentinel node.
let sentinel = self.head.load(Relaxed, guard);
drop(sentinel.into_owned());
}
}
}
#[cfg(all(test, not(crossbeam_loom)))]
mod test {
use super::*;
use crate::pin;
use crossbeam_utils::thread;
struct Queue<T> {
queue: super::Queue<T>,
}
impl<T> Queue<T> {
pub(crate) fn new() -> Queue<T> {
Queue {
queue: super::Queue::new(),
}
}
pub(crate) fn push(&self, t: T) {
let guard = &pin();
self.queue.push(t, guard);
}
pub(crate) fn is_empty(&self) -> bool {
let guard = &pin();
let head = self.queue.head.load(Acquire, guard);
let h = unsafe { head.deref() };
h.next.load(Acquire, guard).is_null()
}
pub(crate) fn try_pop(&self) -> Option<T> {
let guard = &pin();
self.queue.try_pop(guard)
}
pub(crate) fn pop(&self) -> T {
loop {
match self.try_pop() {
None => continue,
Some(t) => return t,
}
}
}
}
#[cfg(miri)]
const CONC_COUNT: i64 = 1000;
#[cfg(not(miri))]
const CONC_COUNT: i64 = 1000000;
#[test]
fn push_try_pop_1() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
q.push(37);
assert!(!q.is_empty());
assert_eq!(q.try_pop(), Some(37));
assert!(q.is_empty());
}
#[test]
fn push_try_pop_2() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
q.push(37);
q.push(48);
assert_eq!(q.try_pop(), Some(37));
assert!(!q.is_empty());
assert_eq!(q.try_pop(), Some(48));
assert!(q.is_empty());
}
#[test]
fn push_try_pop_many_seq() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
for i in 0..200 {
q.push(i)
}
assert!(!q.is_empty());
for i in 0..200 {
assert_eq!(q.try_pop(), Some(i));
}
assert!(q.is_empty());
}
#[test]
fn push_pop_1() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
q.push(37);
assert!(!q.is_empty());
assert_eq!(q.pop(), 37);
assert!(q.is_empty());
}
#[test]
fn push_pop_2() {
let q: Queue<i64> = Queue::new();
q.push(37);
q.push(48);
assert_eq!(q.pop(), 37);
assert_eq!(q.pop(), 48);
}
#[test]
fn push_pop_many_seq() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
for i in 0..200 {
q.push(i)
}
assert!(!q.is_empty());
for i in 0..200 {
assert_eq!(q.pop(), i);
}
assert!(q.is_empty());
}
#[test]
fn push_try_pop_many_spsc() {
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
thread::scope(|scope| {
scope.spawn(|_| {
let mut next = 0;
while next < CONC_COUNT {
if let Some(elem) = q.try_pop() {
assert_eq!(elem, next);
next += 1;
}
}
});
for i in 0..CONC_COUNT {
q.push(i)
}
})
.unwrap();
}
#[test]
fn push_try_pop_many_spmc() {
fn recv(_t: i32, q: &Queue<i64>) {
let mut cur = -1;
for _i in 0..CONC_COUNT {
if let Some(elem) = q.try_pop() {
assert!(elem > cur);
cur = elem;
if cur == CONC_COUNT - 1 {
break;
}
}
}
}
let q: Queue<i64> = Queue::new();
assert!(q.is_empty());
thread::scope(|scope| {
for i in 0..3 {
let q = &q;
scope.spawn(move |_| recv(i, q));
}
scope.spawn(|_| {
for i in 0..CONC_COUNT {
q.push(i);
}
});
})
.unwrap();
}
#[test]
fn push_try_pop_many_mpmc() {
enum LR {
Left(i64),
Right(i64),
}
let q: Queue<LR> = Queue::new();
assert!(q.is_empty());
thread::scope(|scope| {
for _t in 0..2 {
scope.spawn(|_| {
for i in CONC_COUNT - 1..CONC_COUNT {
q.push(LR::Left(i))
}
});
scope.spawn(|_| {
for i in CONC_COUNT - 1..CONC_COUNT {
q.push(LR::Right(i))
}
});
scope.spawn(|_| {
let mut vl = vec![];
let mut vr = vec![];
for _i in 0..CONC_COUNT {
match q.try_pop() {
Some(LR::Left(x)) => vl.push(x),
Some(LR::Right(x)) => vr.push(x),
_ => {}
}
}
let mut vl2 = vl.clone();
let mut vr2 = vr.clone();
vl2.sort_unstable();
vr2.sort_unstable();
assert_eq!(vl, vl2);
assert_eq!(vr, vr2);
});
}
})
.unwrap();
}
#[test]
fn push_pop_many_spsc() {
let q: Queue<i64> = Queue::new();
thread::scope(|scope| {
scope.spawn(|_| {
let mut next = 0;
while next < CONC_COUNT {
assert_eq!(q.pop(), next);
next += 1;
}
});
for i in 0..CONC_COUNT {
q.push(i)
}
})
.unwrap();
assert!(q.is_empty());
}
#[test]
fn is_empty_dont_pop() {
let q: Queue<i64> = Queue::new();
q.push(20);
q.push(20);
assert!(!q.is_empty());
assert!(!q.is_empty());
assert!(q.try_pop().is_some());
}
}