//! Message passing things use super::*; use tokio::{ sync::{ watch, Mutex, }, }; use std::{ task::{Poll, Context}, pin::Pin, fmt, error, }; use futures::{ future::{ Future, }, }; #[derive(Debug)] pub struct InitError; #[derive(Debug)] pub struct InitWaitError; impl error::Error for InitError{} impl fmt::Display for InitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to set init value") } } impl error::Error for InitWaitError{} impl fmt::Display for InitWaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to receive init value") } } /// Provides a method of waiting on and setting a single initialisation. /// /// In general, it should only be set once, as multiple sets do nothing but hog `Arc`s. /// Dropping the `Initialiser` after waiting or setting should generally be done immediately. /// Choose the `into_wait()` and `set()` varients over the non-consuming ones. #[derive(Clone, Debug)] pub struct Initialiser { tx: Arc>, rx: watch::Receiver } impl Initialiser { /// Create a new, unset initialiser pub fn new() -> Self { let (tx, rx) = watch::channel(false); Self { tx: Arc::new(tx), rx, } } /// Create a pre-set initialiser. Calls to `wait()` will immediately resolve. pub fn new_set() -> Self { let (tx, rx) = watch::channel(true); Self { tx: Arc::new(tx), rx, } } /// Consume into a future that completes when init is set. pub fn into_wait(self) -> impl Future> + 'static { let mut rx = self.rx; async move { if !*rx.borrow() { while !rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } } /// Clone into a future that completes when init is set. /// /// This method does not clone any `Arc`s and is prefered to `self.clone().into_wait()`. /// Use this when the `Initialiser` you want to wait on is behind a shared reference. pub fn clone_into_wait(&self) -> impl Future> + 'static { let mut rx = self.rx.clone(); async move { if !*rx.borrow() { while !rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } } /// Completes when init is set pub async fn wait(&mut self) -> Result<(), InitWaitError> { if !*self.rx.borrow() { while !self.rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } /// Is init set? pub fn is_set(&self) -> bool { *self.rx.borrow() } /// Consume and set init if it's not already set pub fn set(self) -> Result<(), InitError> { if !*self.rx.borrow() { self.tx.broadcast(true).map_err(|_| InitError) } else { Ok(()) } } /// Set init without consuming. /// /// # Note /// It is prefered to use `set()`, as this method may make `Arc`s hang around longer than they need to. /// Calling this multiple times is useless. pub fn set_in_place(&self) -> Result<(), InitError> { if !*self.rx.borrow() { self.tx.broadcast(true).map_err(|_| InitError) } else { Ok(()) } } } impl Future for Initialiser { type Output = Result<(), InitWaitError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let uhh = self.wait(); tokio::pin!(uhh); uhh.poll(cx) } } /// A value that can be consumed once. #[derive(Debug)] pub struct Once(Mutex>); impl Once { /// Create a new instance pub fn new(from: T) -> Self { Self(Mutex::new(Some(from))) } /// Consume into the instance from behind a potentially shared reference. pub async fn consume_shared(self: Arc) -> Option { match Arc::try_unwrap(self) { Ok(x) => x.0.into_inner(), Err(x) => x.0.lock().await.take(), } } /// Consume from a shared reference and panic if the value has already been consumed. pub async fn unwrap_shared(self: Arc) -> T { self.consume_shared().await.unwrap() } /// Consume into the instance. pub async fn consume(&self) -> Option { self.0.lock().await.take() } /// Consume and panic if the value has already been consumed. pub async fn unwrap(&self) -> T { self.consume().await.unwrap() } /// Consume into the inner value pub fn into_inner(self) -> Option { self.0.into_inner() } }