//! Supports sending many messages from many producers to many consumers, first come first served. use std::{ collections::LinkedList, sync::{Arc, Weak}, iter::FromIterator, }; use tokio::{ sync::{ RwLock, }, task::yield_now, stream::Stream, }; #[derive(Debug, Clone)] pub struct Stage { internal: Arc>>, } #[derive(Debug, Clone)] pub struct StageSender { internal: Weak>>, } impl StageSender { /// Send a message to the queue, returns `Ok(())` if possible, `Err(value)` if failed. pub async fn send(&self, value: T) -> Result<(), T> { loop { if let Some(internal) = self.internal.upgrade() { let mut write = internal.write().await; break Ok(write.push_back(value)); } else { break Err(value); } } } } impl Stage { /// Create a new stage from an input #[inline] fn from_iter(from: I) -> Self where I: IntoIterator { Self { internal: Arc::new(RwLock::new(from.into_iter().collect())) } } /// Create a new empty stage pub fn new() -> Self { Self{ internal: Arc::new(RwLock::new(LinkedList::new())), } } /// Create a new sender pub fn sender(&self) -> StageSender { StageSender { internal: Arc::downgrade(&self.internal), } } /// Receive one. /// /// If there are no senders, this will return `None` pub async fn take(&self) -> Option { loop { { let mut write = self.internal.write().await; if let Some(value) = write.pop_front() { return Some(value); } else if Arc::weak_count(&self.internal) < 1 { return None; } } yield_now().await } } /// Current backlog length pub async fn len(&self) -> usize { self.internal.read().await.len() } /// Number of active senders pub fn senders(&self) -> usize { Arc::weak_count(&self.internal) } /// Number of active receivers pub fn receivers(&self) -> usize { Arc::strong_count(&self.internal) } } impl FromIterator for Stage { fn from_iter>(iter: I) -> Self { Self::from_iter(iter) } } use std::{ task::{Context, Poll}, pin::Pin, future::Future, }; impl Stream for Stage { type Item=T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let future= async { while let Some(value) = self.take().await { return Some(value); } None }; tokio::pin!(future); future.poll(cx) } }