1512 lines
47 KiB
Rust
1512 lines
47 KiB
Rust
//! The channel interface.
|
|
|
|
use std::fmt;
|
|
use std::iter::FusedIterator;
|
|
use std::mem;
|
|
use std::panic::{RefUnwindSafe, UnwindSafe};
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use crate::context::Context;
|
|
use crate::counter;
|
|
use crate::err::{
|
|
RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
|
|
};
|
|
use crate::flavors;
|
|
use crate::select::{Operation, SelectHandle, Token};
|
|
use crate::utils;
|
|
|
|
/// Creates a channel of unbounded capacity.
|
|
///
|
|
/// This channel has a growable buffer that can hold any number of messages at a time.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// // Computes the n-th Fibonacci number.
|
|
/// fn fib(n: i32) -> i32 {
|
|
/// if n <= 1 {
|
|
/// n
|
|
/// } else {
|
|
/// fib(n - 1) + fib(n - 2)
|
|
/// }
|
|
/// }
|
|
///
|
|
/// // Spawn an asynchronous computation.
|
|
/// thread::spawn(move || s.send(fib(20)).unwrap());
|
|
///
|
|
/// // Print the result of the computation.
|
|
/// println!("{}", r.recv().unwrap());
|
|
/// ```
|
|
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
|
|
let (s, r) = counter::new(flavors::list::Channel::new());
|
|
let s = Sender {
|
|
flavor: SenderFlavor::List(s),
|
|
};
|
|
let r = Receiver {
|
|
flavor: ReceiverFlavor::List(r),
|
|
};
|
|
(s, r)
|
|
}
|
|
|
|
/// Creates a channel of bounded capacity.
|
|
///
|
|
/// This channel has a buffer that can hold at most `cap` messages at a time.
|
|
///
|
|
/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
|
|
/// receive operations must appear at the same time in order to pair up and pass the message over.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// A channel of capacity 1:
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::bounded;
|
|
///
|
|
/// let (s, r) = bounded(1);
|
|
///
|
|
/// // This call returns immediately because there is enough space in the channel.
|
|
/// s.send(1).unwrap();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// // This call blocks the current thread because the channel is full.
|
|
/// // It will be able to complete only after the first message is received.
|
|
/// s.send(2).unwrap();
|
|
/// });
|
|
///
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// assert_eq!(r.recv(), Ok(1));
|
|
/// assert_eq!(r.recv(), Ok(2));
|
|
/// ```
|
|
///
|
|
/// A zero-capacity channel:
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::bounded;
|
|
///
|
|
/// let (s, r) = bounded(0);
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// // This call blocks the current thread until a receive operation appears
|
|
/// // on the other side of the channel.
|
|
/// s.send(1).unwrap();
|
|
/// });
|
|
///
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// assert_eq!(r.recv(), Ok(1));
|
|
/// ```
|
|
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
|
|
if cap == 0 {
|
|
let (s, r) = counter::new(flavors::zero::Channel::new());
|
|
let s = Sender {
|
|
flavor: SenderFlavor::Zero(s),
|
|
};
|
|
let r = Receiver {
|
|
flavor: ReceiverFlavor::Zero(r),
|
|
};
|
|
(s, r)
|
|
} else {
|
|
let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
|
|
let s = Sender {
|
|
flavor: SenderFlavor::Array(s),
|
|
};
|
|
let r = Receiver {
|
|
flavor: ReceiverFlavor::Array(r),
|
|
};
|
|
(s, r)
|
|
}
|
|
}
|
|
|
|
/// Creates a receiver that delivers a message after a certain duration of time.
|
|
///
|
|
/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
|
|
/// be sent into the channel after `duration` elapses. The message is the instant at which it is
|
|
/// sent.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Using an `after` channel for timeouts:
|
|
///
|
|
/// ```
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{after, select, unbounded};
|
|
///
|
|
/// let (s, r) = unbounded::<i32>();
|
|
/// let timeout = Duration::from_millis(100);
|
|
///
|
|
/// select! {
|
|
/// recv(r) -> msg => println!("received {:?}", msg),
|
|
/// recv(after(timeout)) -> _ => println!("timed out"),
|
|
/// }
|
|
/// ```
|
|
///
|
|
/// When the message gets sent:
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::{Duration, Instant};
|
|
/// use crossbeam_channel::after;
|
|
///
|
|
/// // Converts a number of milliseconds into a `Duration`.
|
|
/// let ms = |ms| Duration::from_millis(ms);
|
|
///
|
|
/// // Returns `true` if `a` and `b` are very close `Instant`s.
|
|
/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
|
|
///
|
|
/// let start = Instant::now();
|
|
/// let r = after(ms(100));
|
|
///
|
|
/// thread::sleep(ms(500));
|
|
///
|
|
/// // This message was sent 100 ms from the start and received 500 ms from the start.
|
|
/// assert!(eq(r.recv().unwrap(), start + ms(100)));
|
|
/// assert!(eq(Instant::now(), start + ms(500)));
|
|
/// ```
|
|
pub fn after(duration: Duration) -> Receiver<Instant> {
|
|
Receiver {
|
|
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
|
|
}
|
|
}
|
|
|
|
/// Creates a receiver that delivers a message at a certain instant in time.
|
|
///
|
|
/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
|
|
/// be sent into the channel at the moment in time `when`. The message is the instant at which it
|
|
/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
|
|
/// instantly to the receiver.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Using an `at` channel for timeouts:
|
|
///
|
|
/// ```
|
|
/// use std::time::{Instant, Duration};
|
|
/// use crossbeam_channel::{at, select, unbounded};
|
|
///
|
|
/// let (s, r) = unbounded::<i32>();
|
|
/// let deadline = Instant::now() + Duration::from_millis(500);
|
|
///
|
|
/// select! {
|
|
/// recv(r) -> msg => println!("received {:?}", msg),
|
|
/// recv(at(deadline)) -> _ => println!("timed out"),
|
|
/// }
|
|
/// ```
|
|
///
|
|
/// When the message gets sent:
|
|
///
|
|
/// ```
|
|
/// use std::time::{Duration, Instant};
|
|
/// use crossbeam_channel::at;
|
|
///
|
|
/// // Converts a number of milliseconds into a `Duration`.
|
|
/// let ms = |ms| Duration::from_millis(ms);
|
|
///
|
|
/// let start = Instant::now();
|
|
/// let end = start + ms(100);
|
|
///
|
|
/// let r = at(end);
|
|
///
|
|
/// // This message was sent 100 ms from the start
|
|
/// assert_eq!(r.recv().unwrap(), end);
|
|
/// assert!(Instant::now() > start + ms(100));
|
|
/// ```
|
|
pub fn at(when: Instant) -> Receiver<Instant> {
|
|
Receiver {
|
|
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
|
|
}
|
|
}
|
|
|
|
/// Creates a receiver that never delivers messages.
|
|
///
|
|
/// The channel is bounded with capacity of 0 and never gets disconnected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Using a `never` channel to optionally add a timeout to [`select!`]:
|
|
///
|
|
/// [`select!`]: crate::select!
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{after, select, never, unbounded};
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(1).unwrap();
|
|
/// });
|
|
///
|
|
/// // Suppose this duration can be a `Some` or a `None`.
|
|
/// let duration = Some(Duration::from_millis(100));
|
|
///
|
|
/// // Create a channel that times out after the specified duration.
|
|
/// let timeout = duration
|
|
/// .map(|d| after(d))
|
|
/// .unwrap_or(never());
|
|
///
|
|
/// select! {
|
|
/// recv(r) -> msg => assert_eq!(msg, Ok(1)),
|
|
/// recv(timeout) -> _ => println!("timed out"),
|
|
/// }
|
|
/// ```
|
|
pub fn never<T>() -> Receiver<T> {
|
|
Receiver {
|
|
flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
|
|
}
|
|
}
|
|
|
|
/// Creates a receiver that delivers messages periodically.
|
|
///
|
|
/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
|
|
/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
|
|
/// sent.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Using a `tick` channel to periodically print elapsed time:
|
|
///
|
|
/// ```
|
|
/// use std::time::{Duration, Instant};
|
|
/// use crossbeam_channel::tick;
|
|
///
|
|
/// let start = Instant::now();
|
|
/// let ticker = tick(Duration::from_millis(100));
|
|
///
|
|
/// for _ in 0..5 {
|
|
/// ticker.recv().unwrap();
|
|
/// println!("elapsed: {:?}", start.elapsed());
|
|
/// }
|
|
/// ```
|
|
///
|
|
/// When messages get sent:
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::{Duration, Instant};
|
|
/// use crossbeam_channel::tick;
|
|
///
|
|
/// // Converts a number of milliseconds into a `Duration`.
|
|
/// let ms = |ms| Duration::from_millis(ms);
|
|
///
|
|
/// // Returns `true` if `a` and `b` are very close `Instant`s.
|
|
/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
|
|
///
|
|
/// let start = Instant::now();
|
|
/// let r = tick(ms(100));
|
|
///
|
|
/// // This message was sent 100 ms from the start and received 100 ms from the start.
|
|
/// assert!(eq(r.recv().unwrap(), start + ms(100)));
|
|
/// assert!(eq(Instant::now(), start + ms(100)));
|
|
///
|
|
/// thread::sleep(ms(500));
|
|
///
|
|
/// // This message was sent 200 ms from the start and received 600 ms from the start.
|
|
/// assert!(eq(r.recv().unwrap(), start + ms(200)));
|
|
/// assert!(eq(Instant::now(), start + ms(600)));
|
|
///
|
|
/// // This message was sent 700 ms from the start and received 700 ms from the start.
|
|
/// assert!(eq(r.recv().unwrap(), start + ms(700)));
|
|
/// assert!(eq(Instant::now(), start + ms(700)));
|
|
/// ```
|
|
pub fn tick(duration: Duration) -> Receiver<Instant> {
|
|
Receiver {
|
|
flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
|
|
}
|
|
}
|
|
|
|
/// The sending side of a channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s1, r) = unbounded();
|
|
/// let s2 = s1.clone();
|
|
///
|
|
/// thread::spawn(move || s1.send(1).unwrap());
|
|
/// thread::spawn(move || s2.send(2).unwrap());
|
|
///
|
|
/// let msg1 = r.recv().unwrap();
|
|
/// let msg2 = r.recv().unwrap();
|
|
///
|
|
/// assert_eq!(msg1 + msg2, 3);
|
|
/// ```
|
|
pub struct Sender<T> {
|
|
flavor: SenderFlavor<T>,
|
|
}
|
|
|
|
/// Sender flavors.
|
|
enum SenderFlavor<T> {
|
|
/// Bounded channel based on a preallocated array.
|
|
Array(counter::Sender<flavors::array::Channel<T>>),
|
|
|
|
/// Unbounded channel implemented as a linked list.
|
|
List(counter::Sender<flavors::list::Channel<T>>),
|
|
|
|
/// Zero-capacity channel.
|
|
Zero(counter::Sender<flavors::zero::Channel<T>>),
|
|
}
|
|
|
|
unsafe impl<T: Send> Send for Sender<T> {}
|
|
unsafe impl<T: Send> Sync for Sender<T> {}
|
|
|
|
impl<T> UnwindSafe for Sender<T> {}
|
|
impl<T> RefUnwindSafe for Sender<T> {}
|
|
|
|
impl<T> Sender<T> {
|
|
/// Attempts to send a message into the channel without blocking.
|
|
///
|
|
/// This method will either send a message into the channel immediately or return an error if
|
|
/// the channel is full or disconnected. The returned error contains the original message.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will send the message only if there
|
|
/// happens to be a receive operation on the other side of the channel at the same time.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::{bounded, TrySendError};
|
|
///
|
|
/// let (s, r) = bounded(1);
|
|
///
|
|
/// assert_eq!(s.try_send(1), Ok(()));
|
|
/// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
|
|
///
|
|
/// drop(r);
|
|
/// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
|
|
/// ```
|
|
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.try_send(msg),
|
|
SenderFlavor::List(chan) => chan.try_send(msg),
|
|
SenderFlavor::Zero(chan) => chan.try_send(msg),
|
|
}
|
|
}
|
|
|
|
/// Blocks the current thread until a message is sent or the channel is disconnected.
|
|
///
|
|
/// If the channel is full and not disconnected, this call will block until the send operation
|
|
/// can proceed. If the channel becomes disconnected, this call will wake up and return an
|
|
/// error. The returned error contains the original message.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a receive operation to
|
|
/// appear on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{bounded, SendError};
|
|
///
|
|
/// let (s, r) = bounded(1);
|
|
/// assert_eq!(s.send(1), Ok(()));
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// assert_eq!(r.recv(), Ok(1));
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// drop(r);
|
|
/// });
|
|
///
|
|
/// assert_eq!(s.send(2), Ok(()));
|
|
/// assert_eq!(s.send(3), Err(SendError(3)));
|
|
/// ```
|
|
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.send(msg, None),
|
|
SenderFlavor::List(chan) => chan.send(msg, None),
|
|
SenderFlavor::Zero(chan) => chan.send(msg, None),
|
|
}
|
|
.map_err(|err| match err {
|
|
SendTimeoutError::Disconnected(msg) => SendError(msg),
|
|
SendTimeoutError::Timeout(_) => unreachable!(),
|
|
})
|
|
}
|
|
|
|
/// Waits for a message to be sent into the channel, but only for a limited time.
|
|
///
|
|
/// If the channel is full and not disconnected, this call will block until the send operation
|
|
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
|
|
/// wake up and return an error. The returned error contains the original message.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a receive operation to
|
|
/// appear on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{bounded, SendTimeoutError};
|
|
///
|
|
/// let (s, r) = bounded(0);
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// assert_eq!(r.recv(), Ok(2));
|
|
/// drop(r);
|
|
/// });
|
|
///
|
|
/// assert_eq!(
|
|
/// s.send_timeout(1, Duration::from_millis(500)),
|
|
/// Err(SendTimeoutError::Timeout(1)),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// s.send_timeout(2, Duration::from_secs(1)),
|
|
/// Ok(()),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// s.send_timeout(3, Duration::from_millis(500)),
|
|
/// Err(SendTimeoutError::Disconnected(3)),
|
|
/// );
|
|
/// ```
|
|
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
|
|
self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout))
|
|
}
|
|
|
|
/// Waits for a message to be sent into the channel, but only until a given deadline.
|
|
///
|
|
/// If the channel is full and not disconnected, this call will block until the send operation
|
|
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
|
|
/// wake up and return an error. The returned error contains the original message.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a receive operation to
|
|
/// appear on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::{Duration, Instant};
|
|
/// use crossbeam_channel::{bounded, SendTimeoutError};
|
|
///
|
|
/// let (s, r) = bounded(0);
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// assert_eq!(r.recv(), Ok(2));
|
|
/// drop(r);
|
|
/// });
|
|
///
|
|
/// let now = Instant::now();
|
|
///
|
|
/// assert_eq!(
|
|
/// s.send_deadline(1, now + Duration::from_millis(500)),
|
|
/// Err(SendTimeoutError::Timeout(1)),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// s.send_deadline(2, now + Duration::from_millis(1500)),
|
|
/// Ok(()),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// s.send_deadline(3, now + Duration::from_millis(2000)),
|
|
/// Err(SendTimeoutError::Disconnected(3)),
|
|
/// );
|
|
/// ```
|
|
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
|
|
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
|
|
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the channel is empty.
|
|
///
|
|
/// Note: Zero-capacity channels are always empty.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
/// assert!(s.is_empty());
|
|
///
|
|
/// s.send(0).unwrap();
|
|
/// assert!(!s.is_empty());
|
|
/// ```
|
|
pub fn is_empty(&self) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.is_empty(),
|
|
SenderFlavor::List(chan) => chan.is_empty(),
|
|
SenderFlavor::Zero(chan) => chan.is_empty(),
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the channel is full.
|
|
///
|
|
/// Note: Zero-capacity channels are always full.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::bounded;
|
|
///
|
|
/// let (s, r) = bounded(1);
|
|
///
|
|
/// assert!(!s.is_full());
|
|
/// s.send(0).unwrap();
|
|
/// assert!(s.is_full());
|
|
/// ```
|
|
pub fn is_full(&self) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.is_full(),
|
|
SenderFlavor::List(chan) => chan.is_full(),
|
|
SenderFlavor::Zero(chan) => chan.is_full(),
|
|
}
|
|
}
|
|
|
|
/// Returns the number of messages in the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
/// assert_eq!(s.len(), 0);
|
|
///
|
|
/// s.send(1).unwrap();
|
|
/// s.send(2).unwrap();
|
|
/// assert_eq!(s.len(), 2);
|
|
/// ```
|
|
pub fn len(&self) -> usize {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.len(),
|
|
SenderFlavor::List(chan) => chan.len(),
|
|
SenderFlavor::Zero(chan) => chan.len(),
|
|
}
|
|
}
|
|
|
|
/// If the channel is bounded, returns its capacity.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::{bounded, unbounded};
|
|
///
|
|
/// let (s, _) = unbounded::<i32>();
|
|
/// assert_eq!(s.capacity(), None);
|
|
///
|
|
/// let (s, _) = bounded::<i32>(5);
|
|
/// assert_eq!(s.capacity(), Some(5));
|
|
///
|
|
/// let (s, _) = bounded::<i32>(0);
|
|
/// assert_eq!(s.capacity(), Some(0));
|
|
/// ```
|
|
pub fn capacity(&self) -> Option<usize> {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.capacity(),
|
|
SenderFlavor::List(chan) => chan.capacity(),
|
|
SenderFlavor::Zero(chan) => chan.capacity(),
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if senders belong to the same channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```rust
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, _) = unbounded::<usize>();
|
|
///
|
|
/// let s2 = s.clone();
|
|
/// assert!(s.same_channel(&s2));
|
|
///
|
|
/// let (s3, _) = unbounded();
|
|
/// assert!(!s.same_channel(&s3));
|
|
/// ```
|
|
pub fn same_channel(&self, other: &Sender<T>) -> bool {
|
|
match (&self.flavor, &other.flavor) {
|
|
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
|
|
(SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
|
|
(SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
|
|
_ => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Sender<T> {
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
|
|
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
|
|
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Clone for Sender<T> {
|
|
fn clone(&self) -> Self {
|
|
let flavor = match &self.flavor {
|
|
SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
|
|
SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
|
|
SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
|
|
};
|
|
|
|
Sender { flavor }
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for Sender<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Sender { .. }")
|
|
}
|
|
}
|
|
|
|
/// The receiving side of a channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// let _ = s.send(1);
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// let _ = s.send(2);
|
|
/// });
|
|
///
|
|
/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
|
|
/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
|
|
/// ```
|
|
pub struct Receiver<T> {
|
|
flavor: ReceiverFlavor<T>,
|
|
}
|
|
|
|
/// Receiver flavors.
|
|
enum ReceiverFlavor<T> {
|
|
/// Bounded channel based on a preallocated array.
|
|
Array(counter::Receiver<flavors::array::Channel<T>>),
|
|
|
|
/// Unbounded channel implemented as a linked list.
|
|
List(counter::Receiver<flavors::list::Channel<T>>),
|
|
|
|
/// Zero-capacity channel.
|
|
Zero(counter::Receiver<flavors::zero::Channel<T>>),
|
|
|
|
/// The after flavor.
|
|
At(Arc<flavors::at::Channel>),
|
|
|
|
/// The tick flavor.
|
|
Tick(Arc<flavors::tick::Channel>),
|
|
|
|
/// The never flavor.
|
|
Never(flavors::never::Channel<T>),
|
|
}
|
|
|
|
unsafe impl<T: Send> Send for Receiver<T> {}
|
|
unsafe impl<T: Send> Sync for Receiver<T> {}
|
|
|
|
impl<T> UnwindSafe for Receiver<T> {}
|
|
impl<T> RefUnwindSafe for Receiver<T> {}
|
|
|
|
impl<T> Receiver<T> {
|
|
/// Attempts to receive a message from the channel without blocking.
|
|
///
|
|
/// This method will either receive a message from the channel immediately or return an error
|
|
/// if the channel is empty.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will receive a message only if there
|
|
/// happens to be a send operation on the other side of the channel at the same time.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::{unbounded, TryRecvError};
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
|
|
///
|
|
/// s.send(5).unwrap();
|
|
/// drop(s);
|
|
///
|
|
/// assert_eq!(r.try_recv(), Ok(5));
|
|
/// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
|
|
/// ```
|
|
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.try_recv(),
|
|
ReceiverFlavor::List(chan) => chan.try_recv(),
|
|
ReceiverFlavor::Zero(chan) => chan.try_recv(),
|
|
ReceiverFlavor::At(chan) => {
|
|
let msg = chan.try_recv();
|
|
unsafe {
|
|
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
|
|
&msg,
|
|
)
|
|
}
|
|
}
|
|
ReceiverFlavor::Tick(chan) => {
|
|
let msg = chan.try_recv();
|
|
unsafe {
|
|
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
|
|
&msg,
|
|
)
|
|
}
|
|
}
|
|
ReceiverFlavor::Never(chan) => chan.try_recv(),
|
|
}
|
|
}
|
|
|
|
/// Blocks the current thread until a message is received or the channel is empty and
|
|
/// disconnected.
|
|
///
|
|
/// If the channel is empty and not disconnected, this call will block until the receive
|
|
/// operation can proceed. If the channel is empty and becomes disconnected, this call will
|
|
/// wake up and return an error.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
|
|
/// on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{unbounded, RecvError};
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(5).unwrap();
|
|
/// drop(s);
|
|
/// });
|
|
///
|
|
/// assert_eq!(r.recv(), Ok(5));
|
|
/// assert_eq!(r.recv(), Err(RecvError));
|
|
/// ```
|
|
pub fn recv(&self) -> Result<T, RecvError> {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.recv(None),
|
|
ReceiverFlavor::List(chan) => chan.recv(None),
|
|
ReceiverFlavor::Zero(chan) => chan.recv(None),
|
|
ReceiverFlavor::At(chan) => {
|
|
let msg = chan.recv(None);
|
|
unsafe {
|
|
mem::transmute_copy::<
|
|
Result<Instant, RecvTimeoutError>,
|
|
Result<T, RecvTimeoutError>,
|
|
>(&msg)
|
|
}
|
|
}
|
|
ReceiverFlavor::Tick(chan) => {
|
|
let msg = chan.recv(None);
|
|
unsafe {
|
|
mem::transmute_copy::<
|
|
Result<Instant, RecvTimeoutError>,
|
|
Result<T, RecvTimeoutError>,
|
|
>(&msg)
|
|
}
|
|
}
|
|
ReceiverFlavor::Never(chan) => chan.recv(None),
|
|
}
|
|
.map_err(|_| RecvError)
|
|
}
|
|
|
|
/// Waits for a message to be received from the channel, but only for a limited time.
|
|
///
|
|
/// If the channel is empty and not disconnected, this call will block until the receive
|
|
/// operation can proceed or the operation times out. If the channel is empty and becomes
|
|
/// disconnected, this call will wake up and return an error.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
|
|
/// on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::{unbounded, RecvTimeoutError};
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(5).unwrap();
|
|
/// drop(s);
|
|
/// });
|
|
///
|
|
/// assert_eq!(
|
|
/// r.recv_timeout(Duration::from_millis(500)),
|
|
/// Err(RecvTimeoutError::Timeout),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.recv_timeout(Duration::from_secs(1)),
|
|
/// Ok(5),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.recv_timeout(Duration::from_secs(1)),
|
|
/// Err(RecvTimeoutError::Disconnected),
|
|
/// );
|
|
/// ```
|
|
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
|
|
self.recv_deadline(utils::convert_timeout_to_deadline(timeout))
|
|
}
|
|
|
|
/// Waits for a message to be received from the channel, but only before a given deadline.
|
|
///
|
|
/// If the channel is empty and not disconnected, this call will block until the receive
|
|
/// operation can proceed or the operation times out. If the channel is empty and becomes
|
|
/// disconnected, this call will wake up and return an error.
|
|
///
|
|
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
|
|
/// on the other side of the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::{Instant, Duration};
|
|
/// use crossbeam_channel::{unbounded, RecvTimeoutError};
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(5).unwrap();
|
|
/// drop(s);
|
|
/// });
|
|
///
|
|
/// let now = Instant::now();
|
|
///
|
|
/// assert_eq!(
|
|
/// r.recv_deadline(now + Duration::from_millis(500)),
|
|
/// Err(RecvTimeoutError::Timeout),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.recv_deadline(now + Duration::from_millis(1500)),
|
|
/// Ok(5),
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.recv_deadline(now + Duration::from_secs(5)),
|
|
/// Err(RecvTimeoutError::Disconnected),
|
|
/// );
|
|
/// ```
|
|
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
|
|
ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
|
|
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
|
|
ReceiverFlavor::At(chan) => {
|
|
let msg = chan.recv(Some(deadline));
|
|
unsafe {
|
|
mem::transmute_copy::<
|
|
Result<Instant, RecvTimeoutError>,
|
|
Result<T, RecvTimeoutError>,
|
|
>(&msg)
|
|
}
|
|
}
|
|
ReceiverFlavor::Tick(chan) => {
|
|
let msg = chan.recv(Some(deadline));
|
|
unsafe {
|
|
mem::transmute_copy::<
|
|
Result<Instant, RecvTimeoutError>,
|
|
Result<T, RecvTimeoutError>,
|
|
>(&msg)
|
|
}
|
|
}
|
|
ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the channel is empty.
|
|
///
|
|
/// Note: Zero-capacity channels are always empty.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// assert!(r.is_empty());
|
|
/// s.send(0).unwrap();
|
|
/// assert!(!r.is_empty());
|
|
/// ```
|
|
pub fn is_empty(&self) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.is_empty(),
|
|
ReceiverFlavor::List(chan) => chan.is_empty(),
|
|
ReceiverFlavor::Zero(chan) => chan.is_empty(),
|
|
ReceiverFlavor::At(chan) => chan.is_empty(),
|
|
ReceiverFlavor::Tick(chan) => chan.is_empty(),
|
|
ReceiverFlavor::Never(chan) => chan.is_empty(),
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the channel is full.
|
|
///
|
|
/// Note: Zero-capacity channels are always full.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::bounded;
|
|
///
|
|
/// let (s, r) = bounded(1);
|
|
///
|
|
/// assert!(!r.is_full());
|
|
/// s.send(0).unwrap();
|
|
/// assert!(r.is_full());
|
|
/// ```
|
|
pub fn is_full(&self) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.is_full(),
|
|
ReceiverFlavor::List(chan) => chan.is_full(),
|
|
ReceiverFlavor::Zero(chan) => chan.is_full(),
|
|
ReceiverFlavor::At(chan) => chan.is_full(),
|
|
ReceiverFlavor::Tick(chan) => chan.is_full(),
|
|
ReceiverFlavor::Never(chan) => chan.is_full(),
|
|
}
|
|
}
|
|
|
|
/// Returns the number of messages in the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
/// assert_eq!(r.len(), 0);
|
|
///
|
|
/// s.send(1).unwrap();
|
|
/// s.send(2).unwrap();
|
|
/// assert_eq!(r.len(), 2);
|
|
/// ```
|
|
pub fn len(&self) -> usize {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.len(),
|
|
ReceiverFlavor::List(chan) => chan.len(),
|
|
ReceiverFlavor::Zero(chan) => chan.len(),
|
|
ReceiverFlavor::At(chan) => chan.len(),
|
|
ReceiverFlavor::Tick(chan) => chan.len(),
|
|
ReceiverFlavor::Never(chan) => chan.len(),
|
|
}
|
|
}
|
|
|
|
/// If the channel is bounded, returns its capacity.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_channel::{bounded, unbounded};
|
|
///
|
|
/// let (_, r) = unbounded::<i32>();
|
|
/// assert_eq!(r.capacity(), None);
|
|
///
|
|
/// let (_, r) = bounded::<i32>(5);
|
|
/// assert_eq!(r.capacity(), Some(5));
|
|
///
|
|
/// let (_, r) = bounded::<i32>(0);
|
|
/// assert_eq!(r.capacity(), Some(0));
|
|
/// ```
|
|
pub fn capacity(&self) -> Option<usize> {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.capacity(),
|
|
ReceiverFlavor::List(chan) => chan.capacity(),
|
|
ReceiverFlavor::Zero(chan) => chan.capacity(),
|
|
ReceiverFlavor::At(chan) => chan.capacity(),
|
|
ReceiverFlavor::Tick(chan) => chan.capacity(),
|
|
ReceiverFlavor::Never(chan) => chan.capacity(),
|
|
}
|
|
}
|
|
|
|
/// A blocking iterator over messages in the channel.
|
|
///
|
|
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
|
|
/// the channel becomes empty and disconnected, it returns [`None`] without blocking.
|
|
///
|
|
/// [`next`]: Iterator::next
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// s.send(1).unwrap();
|
|
/// s.send(2).unwrap();
|
|
/// s.send(3).unwrap();
|
|
/// drop(s); // Disconnect the channel.
|
|
/// });
|
|
///
|
|
/// // Collect all messages from the channel.
|
|
/// // Note that the call to `collect` blocks until the sender is dropped.
|
|
/// let v: Vec<_> = r.iter().collect();
|
|
///
|
|
/// assert_eq!(v, [1, 2, 3]);
|
|
/// ```
|
|
pub fn iter(&self) -> Iter<'_, T> {
|
|
Iter { receiver: self }
|
|
}
|
|
|
|
/// A non-blocking iterator over messages in the channel.
|
|
///
|
|
/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
|
|
/// never blocks waiting for the next message.
|
|
///
|
|
/// [`next`]: Iterator::next
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded::<i32>();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// s.send(1).unwrap();
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(2).unwrap();
|
|
/// thread::sleep(Duration::from_secs(2));
|
|
/// s.send(3).unwrap();
|
|
/// });
|
|
///
|
|
/// thread::sleep(Duration::from_secs(2));
|
|
///
|
|
/// // Collect all messages from the channel without blocking.
|
|
/// // The third message hasn't been sent yet so we'll collect only the first two.
|
|
/// let v: Vec<_> = r.try_iter().collect();
|
|
///
|
|
/// assert_eq!(v, [1, 2]);
|
|
/// ```
|
|
pub fn try_iter(&self) -> TryIter<'_, T> {
|
|
TryIter { receiver: self }
|
|
}
|
|
|
|
/// Returns `true` if receivers belong to the same channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```rust
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (_, r) = unbounded::<usize>();
|
|
///
|
|
/// let r2 = r.clone();
|
|
/// assert!(r.same_channel(&r2));
|
|
///
|
|
/// let (_, r3) = unbounded();
|
|
/// assert!(!r.same_channel(&r3));
|
|
/// ```
|
|
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
|
|
match (&self.flavor, &other.flavor) {
|
|
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
|
|
(ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
|
|
(ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
|
|
(ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
|
|
(ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
|
|
(ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Receiver<T> {
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
|
|
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
|
|
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
|
|
ReceiverFlavor::At(_) => {}
|
|
ReceiverFlavor::Tick(_) => {}
|
|
ReceiverFlavor::Never(_) => {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Clone for Receiver<T> {
|
|
fn clone(&self) -> Self {
|
|
let flavor = match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
|
|
ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
|
|
ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
|
|
ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
|
|
ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
|
|
ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
|
|
};
|
|
|
|
Receiver { flavor }
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for Receiver<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Receiver { .. }")
|
|
}
|
|
}
|
|
|
|
impl<'a, T> IntoIterator for &'a Receiver<T> {
|
|
type Item = T;
|
|
type IntoIter = Iter<'a, T>;
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
self.iter()
|
|
}
|
|
}
|
|
|
|
impl<T> IntoIterator for Receiver<T> {
|
|
type Item = T;
|
|
type IntoIter = IntoIter<T>;
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
IntoIter { receiver: self }
|
|
}
|
|
}
|
|
|
|
/// A blocking iterator over messages in a channel.
|
|
///
|
|
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
|
|
/// channel becomes empty and disconnected, it returns [`None`] without blocking.
|
|
///
|
|
/// [`next`]: Iterator::next
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// s.send(1).unwrap();
|
|
/// s.send(2).unwrap();
|
|
/// s.send(3).unwrap();
|
|
/// drop(s); // Disconnect the channel.
|
|
/// });
|
|
///
|
|
/// // Collect all messages from the channel.
|
|
/// // Note that the call to `collect` blocks until the sender is dropped.
|
|
/// let v: Vec<_> = r.iter().collect();
|
|
///
|
|
/// assert_eq!(v, [1, 2, 3]);
|
|
/// ```
|
|
pub struct Iter<'a, T> {
|
|
receiver: &'a Receiver<T>,
|
|
}
|
|
|
|
impl<T> FusedIterator for Iter<'_, T> {}
|
|
|
|
impl<T> Iterator for Iter<'_, T> {
|
|
type Item = T;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.receiver.recv().ok()
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for Iter<'_, T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Iter { .. }")
|
|
}
|
|
}
|
|
|
|
/// A non-blocking iterator over messages in a channel.
|
|
///
|
|
/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
|
|
/// never blocks waiting for the next message.
|
|
///
|
|
/// [`next`]: Iterator::next
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded::<i32>();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// s.send(1).unwrap();
|
|
/// thread::sleep(Duration::from_secs(1));
|
|
/// s.send(2).unwrap();
|
|
/// thread::sleep(Duration::from_secs(2));
|
|
/// s.send(3).unwrap();
|
|
/// });
|
|
///
|
|
/// thread::sleep(Duration::from_secs(2));
|
|
///
|
|
/// // Collect all messages from the channel without blocking.
|
|
/// // The third message hasn't been sent yet so we'll collect only the first two.
|
|
/// let v: Vec<_> = r.try_iter().collect();
|
|
///
|
|
/// assert_eq!(v, [1, 2]);
|
|
/// ```
|
|
pub struct TryIter<'a, T> {
|
|
receiver: &'a Receiver<T>,
|
|
}
|
|
|
|
impl<T> Iterator for TryIter<'_, T> {
|
|
type Item = T;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.receiver.try_recv().ok()
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for TryIter<'_, T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("TryIter { .. }")
|
|
}
|
|
}
|
|
|
|
/// A blocking iterator over messages in a channel.
|
|
///
|
|
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
|
|
/// channel becomes empty and disconnected, it returns [`None`] without blocking.
|
|
///
|
|
/// [`next`]: Iterator::next
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use std::thread;
|
|
/// use crossbeam_channel::unbounded;
|
|
///
|
|
/// let (s, r) = unbounded();
|
|
///
|
|
/// thread::spawn(move || {
|
|
/// s.send(1).unwrap();
|
|
/// s.send(2).unwrap();
|
|
/// s.send(3).unwrap();
|
|
/// drop(s); // Disconnect the channel.
|
|
/// });
|
|
///
|
|
/// // Collect all messages from the channel.
|
|
/// // Note that the call to `collect` blocks until the sender is dropped.
|
|
/// let v: Vec<_> = r.into_iter().collect();
|
|
///
|
|
/// assert_eq!(v, [1, 2, 3]);
|
|
/// ```
|
|
pub struct IntoIter<T> {
|
|
receiver: Receiver<T>,
|
|
}
|
|
|
|
impl<T> FusedIterator for IntoIter<T> {}
|
|
|
|
impl<T> Iterator for IntoIter<T> {
|
|
type Item = T;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.receiver.recv().ok()
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for IntoIter<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("IntoIter { .. }")
|
|
}
|
|
}
|
|
|
|
impl<T> SelectHandle for Sender<T> {
|
|
fn try_select(&self, token: &mut Token) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().try_select(token),
|
|
SenderFlavor::List(chan) => chan.sender().try_select(token),
|
|
SenderFlavor::Zero(chan) => chan.sender().try_select(token),
|
|
}
|
|
}
|
|
|
|
fn deadline(&self) -> Option<Instant> {
|
|
None
|
|
}
|
|
|
|
fn register(&self, oper: Operation, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
|
|
SenderFlavor::List(chan) => chan.sender().register(oper, cx),
|
|
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
|
|
}
|
|
}
|
|
|
|
fn unregister(&self, oper: Operation) {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().unregister(oper),
|
|
SenderFlavor::List(chan) => chan.sender().unregister(oper),
|
|
SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
|
|
}
|
|
}
|
|
|
|
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
|
|
SenderFlavor::List(chan) => chan.sender().accept(token, cx),
|
|
SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
|
|
}
|
|
}
|
|
|
|
fn is_ready(&self) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().is_ready(),
|
|
SenderFlavor::List(chan) => chan.sender().is_ready(),
|
|
SenderFlavor::Zero(chan) => chan.sender().is_ready(),
|
|
}
|
|
}
|
|
|
|
fn watch(&self, oper: Operation, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
|
|
SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
|
|
SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
|
|
}
|
|
}
|
|
|
|
fn unwatch(&self, oper: Operation) {
|
|
match &self.flavor {
|
|
SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
|
|
SenderFlavor::List(chan) => chan.sender().unwatch(oper),
|
|
SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> SelectHandle for Receiver<T> {
|
|
fn try_select(&self, token: &mut Token) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
|
|
ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
|
|
ReceiverFlavor::At(chan) => chan.try_select(token),
|
|
ReceiverFlavor::Tick(chan) => chan.try_select(token),
|
|
ReceiverFlavor::Never(chan) => chan.try_select(token),
|
|
}
|
|
}
|
|
|
|
fn deadline(&self) -> Option<Instant> {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(_) => None,
|
|
ReceiverFlavor::List(_) => None,
|
|
ReceiverFlavor::Zero(_) => None,
|
|
ReceiverFlavor::At(chan) => chan.deadline(),
|
|
ReceiverFlavor::Tick(chan) => chan.deadline(),
|
|
ReceiverFlavor::Never(chan) => chan.deadline(),
|
|
}
|
|
}
|
|
|
|
fn register(&self, oper: Operation, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
|
|
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
|
|
ReceiverFlavor::At(chan) => chan.register(oper, cx),
|
|
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
|
|
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
|
|
}
|
|
}
|
|
|
|
fn unregister(&self, oper: Operation) {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
|
|
ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
|
|
ReceiverFlavor::At(chan) => chan.unregister(oper),
|
|
ReceiverFlavor::Tick(chan) => chan.unregister(oper),
|
|
ReceiverFlavor::Never(chan) => chan.unregister(oper),
|
|
}
|
|
}
|
|
|
|
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
|
|
ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
|
|
ReceiverFlavor::At(chan) => chan.accept(token, cx),
|
|
ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
|
|
ReceiverFlavor::Never(chan) => chan.accept(token, cx),
|
|
}
|
|
}
|
|
|
|
fn is_ready(&self) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
|
|
ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
|
|
ReceiverFlavor::At(chan) => chan.is_ready(),
|
|
ReceiverFlavor::Tick(chan) => chan.is_ready(),
|
|
ReceiverFlavor::Never(chan) => chan.is_ready(),
|
|
}
|
|
}
|
|
|
|
fn watch(&self, oper: Operation, cx: &Context) -> bool {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
|
|
ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
|
|
ReceiverFlavor::At(chan) => chan.watch(oper, cx),
|
|
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
|
|
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
|
|
}
|
|
}
|
|
|
|
fn unwatch(&self, oper: Operation) {
|
|
match &self.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
|
|
ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
|
|
ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
|
|
ReceiverFlavor::At(chan) => chan.unwatch(oper),
|
|
ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
|
|
ReceiverFlavor::Never(chan) => chan.unwatch(oper),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Writes a message into the channel.
|
|
pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
|
|
match &s.flavor {
|
|
SenderFlavor::Array(chan) => chan.write(token, msg),
|
|
SenderFlavor::List(chan) => chan.write(token, msg),
|
|
SenderFlavor::Zero(chan) => chan.write(token, msg),
|
|
}
|
|
}
|
|
|
|
/// Reads a message from the channel.
|
|
pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
|
|
match &r.flavor {
|
|
ReceiverFlavor::Array(chan) => chan.read(token),
|
|
ReceiverFlavor::List(chan) => chan.read(token),
|
|
ReceiverFlavor::Zero(chan) => chan.read(token),
|
|
ReceiverFlavor::At(chan) => {
|
|
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
|
|
}
|
|
ReceiverFlavor::Tick(chan) => {
|
|
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
|
|
}
|
|
ReceiverFlavor::Never(chan) => chan.read(token),
|
|
}
|
|
}
|