//! Message passing things use super::*; use tokio::{ sync::{ watch, }, }; use std::{ task::{Poll, Context}, pin::Pin, fmt, error, }; use futures::{ future::Future, }; #[derive(Debug)] pub struct InitError; #[derive(Debug)] pub struct InitWaitError; impl error::Error for InitError{} impl fmt::Display for InitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to set init value") } } impl error::Error for InitWaitError{} impl fmt::Display for InitWaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to receive init value") } } #[derive(Clone, Debug)] pub struct Initialiser { tx: Arc>, rx: watch::Receiver } impl Initialiser { pub fn new() -> Self { let (tx, rx) = watch::channel(false); Self { tx: Arc::new(tx), rx, } } pub fn new_set() -> Self { let (tx, rx) = watch::channel(true); Self { tx: Arc::new(tx), rx, } } pub fn into_wait(self) -> impl Future> + 'static { let mut rx = self.rx; async move { if !*rx.borrow() { while !rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } } pub fn clone_into_wait(&self) -> impl Future> + 'static { let mut rx = self.rx.clone(); async move { if !*rx.borrow() { while !rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } } pub async fn wait(&mut self) -> Result<(), InitWaitError> { if !*self.rx.borrow() { while !self.rx.recv().await.ok_or_else(|| InitWaitError)? { //tokio::task::yield_now().await; } Ok(()) } else { Ok(()) } } pub fn is_set(&self) -> bool { *self.rx.borrow() } pub fn set(self) -> Result<(), InitError> { if !*self.rx.borrow() { self.tx.broadcast(true).map_err(|_| InitError) } else { Ok(()) } } } impl Future for Initialiser { type Output = Result<(), InitWaitError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let uhh = self.wait(); tokio::pin!(uhh); uhh.poll(cx) } }