|
|
|
//! Supports sending many messages from many producers to many consumers, first come first served.
|
|
|
|
|
|
|
|
use std::{
|
|
|
|
collections::LinkedList,
|
|
|
|
sync::{Arc, Weak},
|
|
|
|
iter::FromIterator,
|
|
|
|
};
|
|
|
|
use tokio::{
|
|
|
|
sync::{
|
|
|
|
RwLock,
|
|
|
|
},
|
|
|
|
task::yield_now,
|
|
|
|
stream::Stream,
|
|
|
|
};
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct Stage<T>
|
|
|
|
{
|
|
|
|
internal: Arc<RwLock<LinkedList<T>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct StageSender<T> {
|
|
|
|
internal: Weak<RwLock<LinkedList<T>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> StageSender<T>
|
|
|
|
{
|
|
|
|
/// Send a message to the queue, returns `Ok(())` if possible, `Err(value)` if failed.
|
|
|
|
pub async fn send(&self, value: T) -> Result<(), T>
|
|
|
|
{
|
|
|
|
loop {
|
|
|
|
if let Some(internal) = self.internal.upgrade() {
|
|
|
|
let mut write = internal.write().await;
|
|
|
|
break Ok(write.push_back(value));
|
|
|
|
} else {
|
|
|
|
break Err(value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Send many values at once
|
|
|
|
pub async fn send_many<I: IntoIterator<Item=T>>(&self, values: I) -> Result<(), I>
|
|
|
|
{
|
|
|
|
loop {
|
|
|
|
if let Some(internal) = self.internal.upgrade() {
|
|
|
|
let mut write = internal.write().await;
|
|
|
|
for item in values.into_iter() {
|
|
|
|
write.push_back(item);
|
|
|
|
}
|
|
|
|
break Ok(());
|
|
|
|
} else {
|
|
|
|
break Err(values);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Stage<T>
|
|
|
|
{
|
|
|
|
/// Create a new stage from an input
|
|
|
|
#[inline] fn from_iter<I>(from: I) -> Self
|
|
|
|
where I: IntoIterator<Item=T>
|
|
|
|
{
|
|
|
|
Self {
|
|
|
|
internal: Arc::new(RwLock::new(from.into_iter().collect()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new empty stage
|
|
|
|
pub fn new() -> Self
|
|
|
|
{
|
|
|
|
Self{
|
|
|
|
internal: Arc::new(RwLock::new(LinkedList::new())),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new sender
|
|
|
|
pub fn sender(&self) -> StageSender<T>
|
|
|
|
{
|
|
|
|
StageSender {
|
|
|
|
internal: Arc::downgrade(&self.internal),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Receive one.
|
|
|
|
///
|
|
|
|
/// If there are no senders, this will return `None`
|
|
|
|
pub async fn take(&self) -> Option<T>
|
|
|
|
{
|
|
|
|
loop {
|
|
|
|
{
|
|
|
|
let mut write = self.internal.write().await;
|
|
|
|
if let Some(value) = write.pop_front() {
|
|
|
|
return Some(value);
|
|
|
|
} else if Arc::weak_count(&self.internal) < 1 {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
yield_now().await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Current backlog length
|
|
|
|
pub async fn len(&self) -> usize {
|
|
|
|
self.internal.read().await.len()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Number of active senders
|
|
|
|
pub fn senders(&self) -> usize {
|
|
|
|
Arc::weak_count(&self.internal)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Number of active receivers
|
|
|
|
pub fn receivers(&self) -> usize {
|
|
|
|
Arc::strong_count(&self.internal)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> FromIterator<T> for Stage<T>
|
|
|
|
{
|
|
|
|
fn from_iter<I: IntoIterator<Item=T>>(iter: I) -> Self {
|
|
|
|
Self::from_iter(iter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
use std::{
|
|
|
|
task::{Context, Poll},
|
|
|
|
pin::Pin,
|
|
|
|
future::Future,
|
|
|
|
};
|
|
|
|
|
|
|
|
impl<T> Stream for Stage<T>
|
|
|
|
{
|
|
|
|
type Item=T;
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
|
|
|
|
{
|
|
|
|
let future= async {
|
|
|
|
while let Some(value) = self.take().await {
|
|
|
|
return Some(value);
|
|
|
|
}
|
|
|
|
None
|
|
|
|
};
|
|
|
|
tokio::pin!(future);
|
|
|
|
future.poll(cx)
|
|
|
|
}
|
|
|
|
}
|