|
@@ -0,0 +1,1272 @@
|
|
|
+//! Imported from crate `async-channel`.
|
|
|
+//!
|
|
|
+//! An async multi-producer multi-consumer channel, where each message can be
|
|
|
+//! received by only one of all existing consumers.
|
|
|
+//!
|
|
|
+//! There are two kinds of channels:
|
|
|
+//!
|
|
|
+//! 1. [Bounded][`bounded()`] channel with limited capacity.
|
|
|
+//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
|
|
|
+//!
|
|
|
+//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable
|
|
|
+//! and can be shared among multiple threads.
|
|
|
+//!
|
|
|
+//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes
|
|
|
+//! closed. When a channel is closed, no more messages can be sent, but
|
|
|
+//! remaining messages can still be received.
|
|
|
+//!
|
|
|
+//! The channel can also be closed manually by calling [`Sender::close()`] or
|
|
|
+//! [`Receiver::close()`].
|
|
|
+//!
|
|
|
+//! # Examples
|
|
|
+//!
|
|
|
+//! ```
|
|
|
+//! # futures_lite::future::block_on(async {
|
|
|
+//! let (s, r) = async_channel::unbounded();
|
|
|
+//!
|
|
|
+//! assert_eq!(s.send("Hello").await, Ok(()));
|
|
|
+//! assert_eq!(r.recv().await, Ok("Hello"));
|
|
|
+//! # });
|
|
|
+//! ```
|
|
|
+
|
|
|
+#![forbid(unsafe_code)]
|
|
|
+#![warn(missing_docs, missing_debug_implementations)]
|
|
|
+
|
|
|
+use alloc::boxed::Box;
|
|
|
+use core::{
|
|
|
+ error, fmt,
|
|
|
+ pin::Pin,
|
|
|
+ sync::atomic::{AtomicBool, AtomicUsize, Ordering},
|
|
|
+ task::{Context, Poll},
|
|
|
+};
|
|
|
+
|
|
|
+use crossbeam_queue::{ArrayQueue, SegQueue};
|
|
|
+use futures_lite::{ready, stream::Stream, Future};
|
|
|
+use solvent_core::sync::Arsc;
|
|
|
+
|
|
|
+use super::{Event, EventListener};
|
|
|
+
|
|
|
+enum Flavor<T> {
|
|
|
+ Bounded(Box<ArrayQueue<T>>),
|
|
|
+ Unbounded(SegQueue<T>),
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Flavor<T> {
|
|
|
+ fn push(&self, value: T) -> Option<T> {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => q.push(value).err(),
|
|
|
+ Flavor::Unbounded(q) => {
|
|
|
+ q.push(value);
|
|
|
+ None
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn pop(&self) -> Option<T> {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => q.pop(),
|
|
|
+ Flavor::Unbounded(q) => q.pop(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn is_empty(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => q.is_empty(),
|
|
|
+ Flavor::Unbounded(q) => q.is_empty(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn len(&self) -> usize {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => q.len(),
|
|
|
+ Flavor::Unbounded(q) => q.len(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn is_full(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => q.is_full(),
|
|
|
+ Flavor::Unbounded(_) => false,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn capacity(&self) -> Option<usize> {
|
|
|
+ match self {
|
|
|
+ Flavor::Bounded(q) => Some(q.capacity()),
|
|
|
+ Flavor::Unbounded(_) => None,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+struct Channel<T> {
|
|
|
+ /// Inner message queue.
|
|
|
+ queue: Flavor<T>,
|
|
|
+
|
|
|
+ /// Indicates whether the channel is closed.
|
|
|
+ closed: AtomicBool,
|
|
|
+
|
|
|
+ /// Send operations waiting while the channel is full.
|
|
|
+ send_ops: Event,
|
|
|
+
|
|
|
+ /// Receive operations waiting while the channel is empty and not closed.
|
|
|
+ recv_ops: Event,
|
|
|
+
|
|
|
+ /// Stream operations while the channel is empty and not closed.
|
|
|
+ stream_ops: Event,
|
|
|
+
|
|
|
+ /// The number of currently active `Sender`s.
|
|
|
+ sender_count: AtomicUsize,
|
|
|
+
|
|
|
+ /// The number of currently active `Receivers`s.
|
|
|
+ receiver_count: AtomicUsize,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Channel<T> {
|
|
|
+ /// Closes the channel and notifies all blocked operations.
|
|
|
+ ///
|
|
|
+ /// Returns `true` if this call has closed the channel and it was not closed
|
|
|
+ /// already.
|
|
|
+ fn close(&self) -> bool {
|
|
|
+ if !self.closed.swap(true, Ordering::SeqCst) {
|
|
|
+ // Notify all send operations.
|
|
|
+ self.send_ops.notify(usize::MAX);
|
|
|
+
|
|
|
+ // Notify all receive and stream operations.
|
|
|
+ self.recv_ops.notify(usize::MAX);
|
|
|
+ self.stream_ops.notify(usize::MAX);
|
|
|
+
|
|
|
+ true
|
|
|
+ } else {
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Creates a bounded channel.
|
|
|
+///
|
|
|
+/// The created channel has space to hold at most `cap` messages at a time.
|
|
|
+///
|
|
|
+/// # Panics
|
|
|
+///
|
|
|
+/// Capacity must be a positive number. If `cap` is zero, this function will
|
|
|
+/// panic.
|
|
|
+///
|
|
|
+/// # Examples
|
|
|
+///
|
|
|
+/// ```
|
|
|
+/// # futures_lite::future::block_on(async {
|
|
|
+/// use async_channel::{bounded, TryRecvError, TrySendError};
|
|
|
+///
|
|
|
+/// let (s, r) = bounded(1);
|
|
|
+///
|
|
|
+/// assert_eq!(s.send(10).await, Ok(()));
|
|
|
+/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
|
|
|
+///
|
|
|
+/// assert_eq!(r.recv().await, Ok(10));
|
|
|
+/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
|
|
|
+/// # });
|
|
|
+/// ```
|
|
|
+pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
|
|
|
+ assert!(cap > 0, "capacity cannot be zero");
|
|
|
+
|
|
|
+ let channel = Arsc::new(Channel {
|
|
|
+ queue: Flavor::Bounded(Box::new(ArrayQueue::new(cap))),
|
|
|
+ closed: AtomicBool::new(false),
|
|
|
+ send_ops: Event::new(),
|
|
|
+ recv_ops: Event::new(),
|
|
|
+ stream_ops: Event::new(),
|
|
|
+ sender_count: AtomicUsize::new(1),
|
|
|
+ receiver_count: AtomicUsize::new(1),
|
|
|
+ });
|
|
|
+
|
|
|
+ let s = Sender {
|
|
|
+ channel: channel.clone(),
|
|
|
+ };
|
|
|
+ let r = Receiver {
|
|
|
+ channel,
|
|
|
+ listener: None,
|
|
|
+ };
|
|
|
+ (s, r)
|
|
|
+}
|
|
|
+
|
|
|
+/// Creates an unbounded channel.
|
|
|
+///
|
|
|
+/// The created channel can hold an unlimited number of messages.
|
|
|
+///
|
|
|
+/// # Examples
|
|
|
+///
|
|
|
+/// ```
|
|
|
+/// # futures_lite::future::block_on(async {
|
|
|
+/// use async_channel::{unbounded, TryRecvError};
|
|
|
+///
|
|
|
+/// let (s, r) = unbounded();
|
|
|
+///
|
|
|
+/// assert_eq!(s.send(10).await, Ok(()));
|
|
|
+/// assert_eq!(s.send(20).await, Ok(()));
|
|
|
+///
|
|
|
+/// assert_eq!(r.recv().await, Ok(10));
|
|
|
+/// assert_eq!(r.recv().await, Ok(20));
|
|
|
+/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
|
|
|
+/// # });
|
|
|
+/// ```
|
|
|
+pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
|
|
|
+ let channel = Arsc::new(Channel {
|
|
|
+ queue: Flavor::Unbounded(SegQueue::new()),
|
|
|
+ closed: AtomicBool::new(false),
|
|
|
+ send_ops: Event::new(),
|
|
|
+ recv_ops: Event::new(),
|
|
|
+ stream_ops: Event::new(),
|
|
|
+ sender_count: AtomicUsize::new(1),
|
|
|
+ receiver_count: AtomicUsize::new(1),
|
|
|
+ });
|
|
|
+
|
|
|
+ let s = Sender {
|
|
|
+ channel: channel.clone(),
|
|
|
+ };
|
|
|
+ let r = Receiver {
|
|
|
+ channel,
|
|
|
+ listener: None,
|
|
|
+ };
|
|
|
+ (s, r)
|
|
|
+}
|
|
|
+
|
|
|
+/// The sending side of a channel.
|
|
|
+///
|
|
|
+/// Senders can be cloned and shared among threads. When all senders associated
|
|
|
+/// with a channel are dropped, the channel becomes closed.
|
|
|
+///
|
|
|
+/// The channel can also be closed manually by calling [`Sender::close()`].
|
|
|
+pub struct Sender<T> {
|
|
|
+ /// Inner channel state.
|
|
|
+ channel: Arsc<Channel<T>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Sender<T> {
|
|
|
+ /// Attempts to send a message into the channel.
|
|
|
+ ///
|
|
|
+ /// If the channel is full or closed, this method returns an error.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// use async_channel::{bounded, TrySendError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = bounded(1);
|
|
|
+ ///
|
|
|
+ /// assert_eq!(s.try_send(1), Ok(()));
|
|
|
+ /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
|
|
|
+ ///
|
|
|
+ /// drop(r);
|
|
|
+ /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
|
|
|
+ /// ```
|
|
|
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
|
|
|
+ if self.channel.closed.load(Ordering::SeqCst) {
|
|
|
+ return Err(TrySendError::Closed(msg));
|
|
|
+ }
|
|
|
+ match self.channel.queue.push(msg) {
|
|
|
+ None => {
|
|
|
+ // Notify a blocked receive operation. If the notified operation gets canceled,
|
|
|
+ // it will notify another blocked receive operation.
|
|
|
+ self.channel.recv_ops.notify_additional(1);
|
|
|
+
|
|
|
+ // Notify all blocked streams.
|
|
|
+ self.channel.stream_ops.notify(usize::MAX);
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+ Some(msg) => Err(TrySendError::Full(msg)),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Sends a message into the channel.
|
|
|
+ ///
|
|
|
+ /// If the channel is full, this method waits until there is space for a
|
|
|
+ /// message.
|
|
|
+ ///
|
|
|
+ /// If the channel is closed, this method returns an error.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, SendError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(s.send(1).await, Ok(()));
|
|
|
+ /// drop(r);
|
|
|
+ /// assert_eq!(s.send(2).await, Err(SendError(2)));
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn send(&self, msg: T) -> Send<'_, T> {
|
|
|
+ Send {
|
|
|
+ sender: self,
|
|
|
+ listener: None,
|
|
|
+ msg: Some(msg),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Sends a message into this channel using the blocking strategy.
|
|
|
+ ///
|
|
|
+ /// If the channel is full, this method will block until there is room.
|
|
|
+ /// If the channel is closed, this method returns an error.
|
|
|
+ ///
|
|
|
+ /// # Blocking
|
|
|
+ ///
|
|
|
+ /// Rather than using asynchronous waiting, like the [`send`](Self::send)
|
|
|
+ /// method, this method will block the current thread until the message
|
|
|
+ /// is sent.
|
|
|
+ ///
|
|
|
+ /// This method should not be used in an asynchronous context. It is
|
|
|
+ /// intended to be used such that a channel can be used in both
|
|
|
+ /// asynchronous and synchronous contexts. Calling this method in an
|
|
|
+ /// asynchronous context may result in deadlocks.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// use async_channel::{unbounded, SendError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(s.send_blocking(1), Ok(()));
|
|
|
+ /// drop(r);
|
|
|
+ /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
|
|
|
+ /// ```
|
|
|
+ pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
|
|
|
+ self.send(msg).wait()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Closes the channel.
|
|
|
+ ///
|
|
|
+ /// Returns `true` if this call has closed the channel and it was not closed
|
|
|
+ /// already.
|
|
|
+ ///
|
|
|
+ /// The remaining messages can still be received.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ /// assert_eq!(s.send(1).await, Ok(()));
|
|
|
+ /// assert!(s.close());
|
|
|
+ ///
|
|
|
+ /// assert_eq!(r.recv().await, Ok(1));
|
|
|
+ /// assert_eq!(r.recv().await, Err(RecvError));
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn close(&self) -> bool {
|
|
|
+ self.channel.close()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is closed.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert!(!s.is_closed());
|
|
|
+ ///
|
|
|
+ /// drop(r);
|
|
|
+ /// assert!(s.is_closed());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_closed(&self) -> bool {
|
|
|
+ self.channel.closed.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is empty.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert!(s.is_empty());
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// assert!(!s.is_empty());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_empty(&self) -> bool {
|
|
|
+ self.channel.queue.is_empty()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is full.
|
|
|
+ ///
|
|
|
+ /// Unbounded channels are never full.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::bounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = bounded(1);
|
|
|
+ ///
|
|
|
+ /// assert!(!s.is_full());
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// assert!(s.is_full());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_full(&self) -> bool {
|
|
|
+ self.channel.queue.is_full()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of messages in the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ /// assert_eq!(s.len(), 0);
|
|
|
+ ///
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// s.send(2).await;
|
|
|
+ /// assert_eq!(s.len(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn len(&self) -> usize {
|
|
|
+ self.channel.queue.len()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the channel capacity if it's bounded.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// use async_channel::{bounded, unbounded};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = bounded::<i32>(5);
|
|
|
+ /// assert_eq!(s.capacity(), Some(5));
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<i32>();
|
|
|
+ /// assert_eq!(s.capacity(), None);
|
|
|
+ /// ```
|
|
|
+ pub fn capacity(&self) -> Option<usize> {
|
|
|
+ self.channel.queue.capacity()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of receivers for the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert_eq!(s.receiver_count(), 1);
|
|
|
+ ///
|
|
|
+ /// let r2 = r.clone();
|
|
|
+ /// assert_eq!(s.receiver_count(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn receiver_count(&self) -> usize {
|
|
|
+ self.channel.receiver_count.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of senders for the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert_eq!(s.sender_count(), 1);
|
|
|
+ ///
|
|
|
+ /// let s2 = s.clone();
|
|
|
+ /// assert_eq!(s.sender_count(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn sender_count(&self) -> usize {
|
|
|
+ self.channel.sender_count.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Downgrade the sender to a weak reference.
|
|
|
+ pub fn downgrade(&self) -> WeakSender<T> {
|
|
|
+ WeakSender {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Drop for Sender<T> {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ // Decrement the sender count and close the channel if it drops down to zero.
|
|
|
+ if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
|
|
+ self.channel.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for Sender<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "Sender {{ .. }}")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Clone for Sender<T> {
|
|
|
+ fn clone(&self) -> Sender<T> {
|
|
|
+ let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
|
|
|
+
|
|
|
+ // Make sure the count never overflows, even if lots of sender clones are
|
|
|
+ // leaked.
|
|
|
+ assert!(count <= usize::MAX / 2);
|
|
|
+
|
|
|
+ Sender {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// The receiving side of a channel.
|
|
|
+///
|
|
|
+/// Receivers can be cloned and shared among threads. When all receivers
|
|
|
+/// associated with a channel are dropped, the channel becomes closed.
|
|
|
+///
|
|
|
+/// The channel can also be closed manually by calling [`Receiver::close()`].
|
|
|
+///
|
|
|
+/// Receivers implement the [`Stream`] trait.
|
|
|
+pub struct Receiver<T> {
|
|
|
+ /// Inner channel state.
|
|
|
+ channel: Arsc<Channel<T>>,
|
|
|
+
|
|
|
+ /// Listens for a send or close event to unblock this stream.
|
|
|
+ listener: Option<EventListener>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Receiver<T> {
|
|
|
+ /// Attempts to receive a message from the channel.
|
|
|
+ ///
|
|
|
+ /// If the channel is empty, or empty and closed, this method returns an
|
|
|
+ /// error.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, TryRecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ /// assert_eq!(s.send(1).await, Ok(()));
|
|
|
+ ///
|
|
|
+ /// assert_eq!(r.try_recv(), Ok(1));
|
|
|
+ /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
|
|
|
+ ///
|
|
|
+ /// drop(s);
|
|
|
+ /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
|
|
+ match self.channel.queue.pop() {
|
|
|
+ Some(msg) => {
|
|
|
+ // Notify a blocked send operation. If the notified operation gets canceled, it
|
|
|
+ // will notify another blocked send operation.
|
|
|
+ self.channel.send_ops.notify_additional(1);
|
|
|
+
|
|
|
+ Ok(msg)
|
|
|
+ }
|
|
|
+ None => Err(if self.channel.closed.load(Ordering::SeqCst) {
|
|
|
+ TryRecvError::Closed
|
|
|
+ } else {
|
|
|
+ TryRecvError::Empty
|
|
|
+ }),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Receives a message from the channel.
|
|
|
+ ///
|
|
|
+ /// If the channel is empty, this method waits until there is a message.
|
|
|
+ ///
|
|
|
+ /// If the channel is closed, this method receives a message or returns an
|
|
|
+ /// error if there are no more messages.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(s.send(1).await, Ok(()));
|
|
|
+ /// drop(s);
|
|
|
+ ///
|
|
|
+ /// assert_eq!(r.recv().await, Ok(1));
|
|
|
+ /// assert_eq!(r.recv().await, Err(RecvError));
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn recv(&self) -> Recv<'_, T> {
|
|
|
+ Recv {
|
|
|
+ receiver: self,
|
|
|
+ listener: None,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Receives a message from the channel using the blocking strategy.
|
|
|
+ ///
|
|
|
+ /// If the channel is empty, this method waits until there is a message.
|
|
|
+ /// If the channel is closed, this method receives a message or returns an
|
|
|
+ /// error if there are no more messages.
|
|
|
+ ///
|
|
|
+ /// # Blocking
|
|
|
+ ///
|
|
|
+ /// Rather than using asynchronous waiting, like the [`recv`](Self::recv)
|
|
|
+ /// method, this method will block the current thread until the message
|
|
|
+ /// is sent.
|
|
|
+ ///
|
|
|
+ /// This method should not be used in an asynchronous context. It is
|
|
|
+ /// intended to be used such that a channel can be used in both
|
|
|
+ /// asynchronous and synchronous contexts. Calling this method in an
|
|
|
+ /// asynchronous context may result in deadlocks.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(s.send_blocking(1), Ok(()));
|
|
|
+ /// drop(s);
|
|
|
+ ///
|
|
|
+ /// assert_eq!(r.recv_blocking(), Ok(1));
|
|
|
+ /// assert_eq!(r.recv_blocking(), Err(RecvError));
|
|
|
+ /// ```
|
|
|
+ pub fn recv_blocking(&self) -> Result<T, RecvError> {
|
|
|
+ self.recv().wait()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Closes the channel.
|
|
|
+ ///
|
|
|
+ /// Returns `true` if this call has closed the channel and it was not closed
|
|
|
+ /// already.
|
|
|
+ ///
|
|
|
+ /// The remaining messages can still be received.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ /// assert_eq!(s.send(1).await, Ok(()));
|
|
|
+ ///
|
|
|
+ /// assert!(r.close());
|
|
|
+ /// assert_eq!(r.recv().await, Ok(1));
|
|
|
+ /// assert_eq!(r.recv().await, Err(RecvError));
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn close(&self) -> bool {
|
|
|
+ self.channel.close()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is closed.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::{unbounded, RecvError};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert!(!r.is_closed());
|
|
|
+ ///
|
|
|
+ /// drop(s);
|
|
|
+ /// assert!(r.is_closed());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_closed(&self) -> bool {
|
|
|
+ self.channel.closed.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is empty.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ ///
|
|
|
+ /// assert!(s.is_empty());
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// assert!(!s.is_empty());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_empty(&self) -> bool {
|
|
|
+ self.channel.queue.is_empty()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is full.
|
|
|
+ ///
|
|
|
+ /// Unbounded channels are never full.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::bounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = bounded(1);
|
|
|
+ ///
|
|
|
+ /// assert!(!r.is_full());
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// assert!(r.is_full());
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn is_full(&self) -> bool {
|
|
|
+ self.channel.queue.is_full()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of messages in the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded();
|
|
|
+ /// assert_eq!(r.len(), 0);
|
|
|
+ ///
|
|
|
+ /// s.send(1).await;
|
|
|
+ /// s.send(2).await;
|
|
|
+ /// assert_eq!(r.len(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn len(&self) -> usize {
|
|
|
+ self.channel.queue.len()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the channel capacity if it's bounded.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// use async_channel::{bounded, unbounded};
|
|
|
+ ///
|
|
|
+ /// let (s, r) = bounded::<i32>(5);
|
|
|
+ /// assert_eq!(r.capacity(), Some(5));
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<i32>();
|
|
|
+ /// assert_eq!(r.capacity(), None);
|
|
|
+ /// ```
|
|
|
+ pub fn capacity(&self) -> Option<usize> {
|
|
|
+ self.channel.queue.capacity()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of receivers for the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert_eq!(r.receiver_count(), 1);
|
|
|
+ ///
|
|
|
+ /// let r2 = r.clone();
|
|
|
+ /// assert_eq!(r.receiver_count(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn receiver_count(&self) -> usize {
|
|
|
+ self.channel.receiver_count.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the number of senders for the channel.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # futures_lite::future::block_on(async {
|
|
|
+ /// use async_channel::unbounded;
|
|
|
+ ///
|
|
|
+ /// let (s, r) = unbounded::<()>();
|
|
|
+ /// assert_eq!(r.sender_count(), 1);
|
|
|
+ ///
|
|
|
+ /// let s2 = s.clone();
|
|
|
+ /// assert_eq!(r.sender_count(), 2);
|
|
|
+ /// # });
|
|
|
+ /// ```
|
|
|
+ pub fn sender_count(&self) -> usize {
|
|
|
+ self.channel.sender_count.load(Ordering::SeqCst)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Downgrade the receiver to a weak reference.
|
|
|
+ pub fn downgrade(&self) -> WeakReceiver<T> {
|
|
|
+ WeakReceiver {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Unpin for Receiver<T> {}
|
|
|
+
|
|
|
+impl<T> Drop for Receiver<T> {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ // Decrement the receiver count and close the channel if it drops down to zero.
|
|
|
+ if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
|
|
+ self.channel.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for Receiver<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "Receiver {{ .. }}")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Clone for Receiver<T> {
|
|
|
+ fn clone(&self) -> Receiver<T> {
|
|
|
+ let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
|
+
|
|
|
+ // Make sure the count never overflows, even if lots of receiver clones are
|
|
|
+ // leaked.
|
|
|
+ assert!(count <= usize::MAX / 2);
|
|
|
+
|
|
|
+ Receiver {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ listener: None,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Stream for Receiver<T> {
|
|
|
+ type Item = T;
|
|
|
+
|
|
|
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
+ loop {
|
|
|
+ // If this stream is listening for events, first wait for a notification.
|
|
|
+ if let Some(listener) = self.listener.as_mut() {
|
|
|
+ ready!(Pin::new(listener).poll(cx));
|
|
|
+ self.listener = None;
|
|
|
+ }
|
|
|
+
|
|
|
+ loop {
|
|
|
+ // Attempt to receive a message.
|
|
|
+ match self.try_recv() {
|
|
|
+ Ok(msg) => {
|
|
|
+ // The stream is not blocked on an event - drop the listener.
|
|
|
+ self.listener = None;
|
|
|
+ return Poll::Ready(Some(msg));
|
|
|
+ }
|
|
|
+ Err(TryRecvError::Closed) => {
|
|
|
+ // The stream is not blocked on an event - drop the listener.
|
|
|
+ self.listener = None;
|
|
|
+ return Poll::Ready(None);
|
|
|
+ }
|
|
|
+ Err(TryRecvError::Empty) => {}
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving failed - now start listening for notifications or wait for one.
|
|
|
+ match self.listener.as_mut() {
|
|
|
+ None => {
|
|
|
+ // Create a listener and try sending the message again.
|
|
|
+ self.listener = Some(self.channel.stream_ops.listen());
|
|
|
+ }
|
|
|
+ Some(_) => {
|
|
|
+ // Go back to the outer loop to poll the listener.
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// impl<T> FusedStream for Receiver<T> {
|
|
|
+// fn is_terminated(&self) -> bool {
|
|
|
+// self.channel.closed.load(Ordering::SeqCst) &&
|
|
|
+// self.channel.queue.is_empty() }
|
|
|
+// }
|
|
|
+
|
|
|
+/// A [`Sender`] that prevents the channel from not being closed.
|
|
|
+///
|
|
|
+/// This is created through the [`Sender::downgrade`] method. In order to use
|
|
|
+/// it, it needs to be upgraded into a [`Sender`] through the `upgrade` method.
|
|
|
+#[derive(Clone)]
|
|
|
+pub struct WeakSender<T> {
|
|
|
+ channel: Arsc<Channel<T>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> WeakSender<T> {
|
|
|
+ /// Upgrade the [`WeakSender`] into a [`Sender`].
|
|
|
+ pub fn upgrade(&self) -> Option<Sender<T>> {
|
|
|
+ if self.channel.closed.load(Ordering::SeqCst) {
|
|
|
+ None
|
|
|
+ } else {
|
|
|
+ let old_count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
|
|
|
+ if old_count == 0 {
|
|
|
+ // Channel was closed while we were incrementing the count.
|
|
|
+ self.channel.sender_count.store(0, Ordering::Release);
|
|
|
+ None
|
|
|
+ } else {
|
|
|
+ // Make sure the count never overflows, even if lots of sender clones are
|
|
|
+ // leaked.
|
|
|
+ assert!(old_count <= usize::MAX / 2);
|
|
|
+
|
|
|
+ Some(Sender {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for WeakSender<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "WeakSender {{ .. }}")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// A [`Receiver`] that prevents the channel from not being closed.
|
|
|
+///
|
|
|
+/// This is created through the [`Receiver::downgrade`] method. In order to use
|
|
|
+/// it, it needs to be upgraded into a [`Receiver`] through the `upgrade`
|
|
|
+/// method.
|
|
|
+#[derive(Clone)]
|
|
|
+pub struct WeakReceiver<T> {
|
|
|
+ channel: Arsc<Channel<T>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> WeakReceiver<T> {
|
|
|
+ /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
|
|
|
+ pub fn upgrade(&self) -> Option<Receiver<T>> {
|
|
|
+ if self.channel.closed.load(Ordering::SeqCst) {
|
|
|
+ None
|
|
|
+ } else {
|
|
|
+ let old_count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
|
+ if old_count == 0 {
|
|
|
+ // Channel was closed while we were incrementing the count.
|
|
|
+ self.channel.receiver_count.store(0, Ordering::Release);
|
|
|
+ None
|
|
|
+ } else {
|
|
|
+ // Make sure the count never overflows, even if lots of receiver clones are
|
|
|
+ // leaked.
|
|
|
+ assert!(old_count <= usize::MAX / 2);
|
|
|
+
|
|
|
+ Some(Receiver {
|
|
|
+ channel: self.channel.clone(),
|
|
|
+ listener: None,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for WeakReceiver<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "WeakReceiver {{ .. }}")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// An error returned from [`Sender::send()`].
|
|
|
+///
|
|
|
+/// Received because the channel is closed.
|
|
|
+#[derive(PartialEq, Eq, Clone, Copy)]
|
|
|
+pub struct SendError<T>(pub T);
|
|
|
+
|
|
|
+impl<T> SendError<T> {
|
|
|
+ /// Unwraps the message that couldn't be sent.
|
|
|
+ pub fn into_inner(self) -> T {
|
|
|
+ self.0
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> error::Error for SendError<T> {}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for SendError<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "SendError(..)")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Display for SendError<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "sending into a closed channel")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// An error returned from [`Sender::try_send()`].
|
|
|
+#[derive(PartialEq, Eq, Clone, Copy)]
|
|
|
+pub enum TrySendError<T> {
|
|
|
+ /// The channel is full but not closed.
|
|
|
+ Full(T),
|
|
|
+
|
|
|
+ /// The channel is closed.
|
|
|
+ Closed(T),
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> TrySendError<T> {
|
|
|
+ /// Unwraps the message that couldn't be sent.
|
|
|
+ pub fn into_inner(self) -> T {
|
|
|
+ match self {
|
|
|
+ TrySendError::Full(t) => t,
|
|
|
+ TrySendError::Closed(t) => t,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is full but not closed.
|
|
|
+ pub fn is_full(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ TrySendError::Full(_) => true,
|
|
|
+ TrySendError::Closed(_) => false,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is closed.
|
|
|
+ pub fn is_closed(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ TrySendError::Full(_) => false,
|
|
|
+ TrySendError::Closed(_) => true,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> error::Error for TrySendError<T> {}
|
|
|
+
|
|
|
+impl<T> fmt::Debug for TrySendError<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ match *self {
|
|
|
+ TrySendError::Full(..) => write!(f, "Full(..)"),
|
|
|
+ TrySendError::Closed(..) => write!(f, "Closed(..)"),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> fmt::Display for TrySendError<T> {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ match *self {
|
|
|
+ TrySendError::Full(..) => write!(f, "sending into a full channel"),
|
|
|
+ TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// An error returned from [`Receiver::recv()`].
|
|
|
+///
|
|
|
+/// Received because the channel is empty and closed.
|
|
|
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
|
|
+pub struct RecvError;
|
|
|
+
|
|
|
+impl error::Error for RecvError {}
|
|
|
+
|
|
|
+impl fmt::Display for RecvError {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ write!(f, "receiving from an empty and closed channel")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// An error returned from [`Receiver::try_recv()`].
|
|
|
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
|
|
+pub enum TryRecvError {
|
|
|
+ /// The channel is empty but not closed.
|
|
|
+ Empty,
|
|
|
+
|
|
|
+ /// The channel is empty and closed.
|
|
|
+ Closed,
|
|
|
+}
|
|
|
+
|
|
|
+impl TryRecvError {
|
|
|
+ /// Returns `true` if the channel is empty but not closed.
|
|
|
+ pub fn is_empty(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ TryRecvError::Empty => true,
|
|
|
+ TryRecvError::Closed => false,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns `true` if the channel is empty and closed.
|
|
|
+ pub fn is_closed(&self) -> bool {
|
|
|
+ match self {
|
|
|
+ TryRecvError::Empty => false,
|
|
|
+ TryRecvError::Closed => true,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl error::Error for TryRecvError {}
|
|
|
+
|
|
|
+impl fmt::Display for TryRecvError {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ match *self {
|
|
|
+ TryRecvError::Empty => write!(f, "receiving from an empty channel"),
|
|
|
+ TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// A future returned by [`Sender::send()`].
|
|
|
+#[derive(Debug)]
|
|
|
+#[must_use = "futures do nothing unless you `.await` or poll them"]
|
|
|
+pub struct Send<'a, T> {
|
|
|
+ sender: &'a Sender<T>,
|
|
|
+ listener: Option<EventListener>,
|
|
|
+ msg: Option<T>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<'a, T> Send<'a, T> {
|
|
|
+ /// Run this future with the given `Strategy`.
|
|
|
+ fn run_with_strategy<S: Strategy>(
|
|
|
+ &mut self,
|
|
|
+ cx: &mut S::Context,
|
|
|
+ ) -> Poll<Result<(), SendError<T>>> {
|
|
|
+ loop {
|
|
|
+ let msg = self.msg.take().unwrap();
|
|
|
+ // Attempt to send a message.
|
|
|
+ match self.sender.try_send(msg) {
|
|
|
+ Ok(()) => return Poll::Ready(Ok(())),
|
|
|
+ Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
|
|
|
+ Err(TrySendError::Full(m)) => self.msg = Some(m),
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sending failed - now start listening for notifications or wait for one.
|
|
|
+ match self.listener.take() {
|
|
|
+ None => {
|
|
|
+ // Start listening and then try sending again.
|
|
|
+ self.listener = Some(self.sender.channel.send_ops.listen());
|
|
|
+ }
|
|
|
+ Some(l) => {
|
|
|
+ // Poll using the given strategy
|
|
|
+ if let Err(l) = S::poll(l, cx) {
|
|
|
+ self.listener = Some(l);
|
|
|
+ return Poll::Pending;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Run using the blocking strategy.
|
|
|
+ fn wait(mut self) -> Result<(), SendError<T>> {
|
|
|
+ match self.run_with_strategy::<Blocking>(&mut ()) {
|
|
|
+ Poll::Ready(res) => res,
|
|
|
+ Poll::Pending => unreachable!(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<'a, T> Unpin for Send<'a, T> {}
|
|
|
+
|
|
|
+impl<'a, T> Future for Send<'a, T> {
|
|
|
+ type Output = Result<(), SendError<T>>;
|
|
|
+
|
|
|
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
+ self.run_with_strategy::<NonBlocking<'_>>(cx)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// A future returned by [`Receiver::recv()`].
|
|
|
+#[derive(Debug)]
|
|
|
+#[must_use = "futures do nothing unless you `.await` or poll them"]
|
|
|
+pub struct Recv<'a, T> {
|
|
|
+ receiver: &'a Receiver<T>,
|
|
|
+ listener: Option<EventListener>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<'a, T> Unpin for Recv<'a, T> {}
|
|
|
+
|
|
|
+impl<'a, T> Recv<'a, T> {
|
|
|
+ /// Run this future with the given `Strategy`.
|
|
|
+ fn run_with_strategy<S: Strategy>(
|
|
|
+ &mut self,
|
|
|
+ cx: &mut S::Context,
|
|
|
+ ) -> Poll<Result<T, RecvError>> {
|
|
|
+ loop {
|
|
|
+ // Attempt to receive a message.
|
|
|
+ match self.receiver.try_recv() {
|
|
|
+ Ok(msg) => return Poll::Ready(Ok(msg)),
|
|
|
+ Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
|
|
|
+ Err(TryRecvError::Empty) => {}
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receiving failed - now start listening for notifications or wait for one.
|
|
|
+ match self.listener.take() {
|
|
|
+ None => {
|
|
|
+ // Start listening and then try receiving again.
|
|
|
+ self.listener = Some(self.receiver.channel.recv_ops.listen());
|
|
|
+ }
|
|
|
+ Some(l) => {
|
|
|
+ // Poll using the given strategy.
|
|
|
+ if let Err(l) = S::poll(l, cx) {
|
|
|
+ self.listener = Some(l);
|
|
|
+ return Poll::Pending;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Run with the blocking strategy.
|
|
|
+ fn wait(mut self) -> Result<T, RecvError> {
|
|
|
+ match self.run_with_strategy::<Blocking>(&mut ()) {
|
|
|
+ Poll::Ready(res) => res,
|
|
|
+ Poll::Pending => unreachable!(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<'a, T> Future for Recv<'a, T> {
|
|
|
+ type Output = Result<T, RecvError>;
|
|
|
+
|
|
|
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
+ self.run_with_strategy::<NonBlocking<'_>>(cx)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// A strategy used to poll an `EventListener`.
|
|
|
+trait Strategy {
|
|
|
+ /// Context needed to be provided to the `poll` method.
|
|
|
+ type Context;
|
|
|
+
|
|
|
+ /// Polls the given `EventListener`.
|
|
|
+ ///
|
|
|
+ /// Returns the `EventListener` back if it was not completed; otherwise,
|
|
|
+ /// returns `Ok(())`.
|
|
|
+ fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
|
|
|
+}
|
|
|
+
|
|
|
+/// Non-blocking strategy for use in asynchronous code.
|
|
|
+struct NonBlocking<'a>(&'a mut ());
|
|
|
+
|
|
|
+impl<'a> Strategy for NonBlocking<'a> {
|
|
|
+ type Context = Context<'a>;
|
|
|
+
|
|
|
+ fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
|
|
|
+ match Pin::new(&mut evl).poll(cx) {
|
|
|
+ Poll::Ready(()) => Ok(()),
|
|
|
+ Poll::Pending => Err(evl),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Blocking strategy for use in synchronous code.
|
|
|
+struct Blocking;
|
|
|
+
|
|
|
+impl Strategy for Blocking {
|
|
|
+ type Context = ();
|
|
|
+
|
|
|
+ fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
|
|
|
+ evl.wait();
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|