You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
genmarkov/src/state.rs

162 lines
3.2 KiB

//! State
use super::*;
use tokio::{
sync::{
watch,
mpsc::error::SendError,
},
};
use config::Config;
use msg::Initialiser;
#[derive(Debug)]
pub struct ShutdownError;
impl error::Error for ShutdownError{}
impl fmt::Display for ShutdownError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "shutdown signal caught")
}
}
#[derive(Debug, Clone)]
pub struct State
{
config: Arc<Box<(Config, config::Cache)>>, //to avoid cloning config
chain: handle::ChainHandle<String>,
//save: Arc<Notify>,
begin: Initialiser,
shutdown: Arc<watch::Sender<bool>>,
shutdown_recv: watch::Receiver<bool>,
}
impl State
{
/// Consume this `state` into its initialiser
pub fn into_initialiser(self) -> Initialiser
{
self.begin
}
/// Allow the saver task to start work
pub fn init(self) -> Result<(), msg::InitError>
{
self.begin.set()
}
/// Has `init` been called?
pub fn is_init(&self) -> bool
{
self.begin.is_set()
}
/// A future that completes either when `init` is called, or `shutdown`.
pub async fn on_init(&mut self) -> Result<(), ShutdownError>
{
if self.has_shutdown() {
return Err(ShutdownError);
}
tokio::select! {
Ok(()) = self.begin.clone_into_wait() => Ok(()),
_ = self.on_shutdown() => {
debug!("on_init(): shutdown received");
Err(ShutdownError)
}
else => Err(ShutdownError)
}
}
pub fn inbound_filter(&self) -> &sanitise::filter::Filter
{
&self.config_cache().inbound_filter
}
pub fn outbound_filter(&self) -> &sanitise::filter::Filter
{
&self.config_cache().outbound_filter
}
pub fn new(config: Config, cache: config::Cache, chain: handle::ChainHandle<String>) -> Self
{
let (shutdown, shutdown_recv) = watch::channel(false);
Self {
config: Arc::new(Box::new((config, cache))),
chain,
begin: Initialiser::new(),
shutdown: Arc::new(shutdown),
shutdown_recv,
}
}
pub fn config(&self) -> &Config
{
&self.config.as_ref().0
}
pub fn config_cache(&self) -> &config::Cache
{
&self.config.as_ref().1
}
/*pub fn notify_save(&self)
{
self.save.notify();
}*/
/*pub fn chain(&self) -> &RwLock<Chain<String>>
{
&self.chain.as_ref()
}*/
pub fn chain_ref(&self) -> &RwLock<Chain<String>>
{
&self.chain.chain_ref()
}
pub fn chain_read(&self) -> handle::ChainStream<String>
{
self.chain.read()
}
/// Write to this chain
pub async fn chain_write<'a, T: Stream<Item = String>>(&'a self, buffer: T) -> Result<(), SendError<Vec<String>>>
{
self.chain.write_stream(buffer).await
}
pub fn when_ref(&self) -> &Arc<Notify>
{
&self.chain.notify_when()
}
/// Force the chain to push through now
pub fn push_now(&self)
{
self.chain.push_now()
}
pub fn shutdown(self)
{
self.shutdown.broadcast(true).expect("Failed to communicate shutdown");
self.chain.hang();
self.when_ref().notify();
}
pub fn has_shutdown(&self) -> bool
{
*self.shutdown_recv.borrow()
}
pub async fn on_shutdown(&mut self)
{
if !self.has_shutdown() {
while let Some(false) = self.shutdown_recv.recv().await {
}
}
}
}