//! State use super::*; use tokio::{ sync::{ watch, }, }; use config::Config; use msg::Initialiser; #[derive(Debug)] pub struct ShutdownError; impl error::Error for ShutdownError{} impl fmt::Display for ShutdownError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "shutdown signal caught") } } #[derive(Debug, Clone)] pub struct State { config: Arc>, //to avoid cloning config chain: Arc>>, save: Arc, begin: Initialiser, shutdown: Arc>, shutdown_recv: watch::Receiver, } impl State { /// Consume this `state` into its initialiser pub fn into_initialiser(self) -> Initialiser { self.begin } /// Allow the saver task to start work pub fn init(self) -> Result<(), msg::InitError> { self.begin.set() } /// Has `init` been called? pub fn is_init(&self) -> bool { self.begin.is_set() } /// A future that completes either when `init` is called, or `shutdown`. pub async fn on_init(&mut self) -> Result<(), ShutdownError> { if self.has_shutdown() { return Err(ShutdownError); } tokio::select! { Ok(()) = self.begin.clone_into_wait() => Ok(()), _ = self.on_shutdown() => { debug!("on_init(): shutdown received"); Err(ShutdownError) } else => Err(ShutdownError) } } pub fn inbound_filter(&self) -> &sanitise::filter::Filter { &self.config_cache().inbound_filter } pub fn outbound_filter(&self) -> &sanitise::filter::Filter { &self.config_cache().outbound_filter } pub fn new(config: Config, cache: config::Cache, chain: Arc>>, save: Arc) -> Self { let (shutdown, shutdown_recv) = watch::channel(false); Self { config: Arc::new(Box::new((config, cache))), chain, save, begin: Initialiser::new(), shutdown: Arc::new(shutdown), shutdown_recv, } } pub fn config(&self) -> &Config { &self.config.as_ref().0 } pub fn config_cache(&self) -> &config::Cache { &self.config.as_ref().1 } pub fn notify_save(&self) { self.save.notify(); } pub fn chain(&self) -> &RwLock> { &self.chain.as_ref() } pub fn when(&self) -> &Arc { &self.save } pub fn shutdown(self) { self.shutdown.broadcast(true).expect("Failed to communicate shutdown"); self.save.notify(); } pub fn has_shutdown(&self) -> bool { *self.shutdown_recv.borrow() } pub async fn on_shutdown(&mut self) { if !self.has_shutdown() { while let Some(false) = self.shutdown_recv.recv().await { } } } }