//! State use super::*; use tokio::{ sync::{ watch, mpsc::error::SendError, }, }; use config::Config; use msg::Initialiser; #[derive(Debug)] pub struct ShutdownError; impl error::Error for ShutdownError{} impl fmt::Display for ShutdownError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "shutdown signal caught") } } #[derive(Debug, Clone)] pub struct State { config: Arc>, //to avoid cloning config chain: handle::ChainHandle, //save: Arc, begin: Initialiser, shutdown: Arc>, shutdown_recv: watch::Receiver, } impl State { /// Consume this `state` into its initialiser pub fn into_initialiser(self) -> Initialiser { self.begin } /// Allow the saver task to start work pub fn init(self) -> Result<(), msg::InitError> { self.begin.set() } /// Has `init` been called? pub fn is_init(&self) -> bool { self.begin.is_set() } /// A future that completes either when `init` is called, or `shutdown`. pub async fn on_init(&mut self) -> Result<(), ShutdownError> { if self.has_shutdown() { return Err(ShutdownError); } tokio::select! { Ok(()) = self.begin.clone_into_wait() => Ok(()), _ = self.on_shutdown() => { debug!("on_init(): shutdown received"); Err(ShutdownError) } else => Err(ShutdownError) } } pub fn inbound_filter(&self) -> &sanitise::filter::Filter { &self.config_cache().inbound_filter } pub fn outbound_filter(&self) -> &sanitise::filter::Filter { &self.config_cache().outbound_filter } pub fn new(config: Config, cache: config::Cache, chain: handle::ChainHandle) -> Self { let (shutdown, shutdown_recv) = watch::channel(false); Self { config: Arc::new(Box::new((config, cache))), chain, begin: Initialiser::new(), shutdown: Arc::new(shutdown), shutdown_recv, } } pub fn config(&self) -> &Config { &self.config.as_ref().0 } pub fn config_cache(&self) -> &config::Cache { &self.config.as_ref().1 } /*pub fn notify_save(&self) { self.save.notify(); }*/ /*pub fn chain(&self) -> &RwLock> { &self.chain.as_ref() }*/ pub fn chain_ref(&self) -> &RwLock> { &self.chain.chain_ref() } pub fn chain_read(&self) -> handle::ChainStream { self.chain.read() } /// Write to this chain pub async fn chain_write<'a, T: Stream>(&'a self, buffer: T) -> Result<(), SendError>> { self.chain.write_stream(buffer).await } pub fn when_ref(&self) -> &Arc { &self.chain.notify_when() } /// Force the chain to push through now pub fn push_now(&self) { self.chain.push_now() } pub fn shutdown(self) { self.shutdown.broadcast(true).expect("Failed to communicate shutdown"); self.chain.hang(); self.when_ref().notify(); } pub fn has_shutdown(&self) -> bool { *self.shutdown_recv.borrow() } pub async fn on_shutdown(&mut self) { if !self.has_shutdown() { while let Some(false) = self.shutdown_recv.recv().await { } } } }