From 59dcecded30bb40006e5acb1ebfa9eb467172aff Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 12 Oct 2020 19:55:28 +0100 Subject: [PATCH] added instant-init; signal handler also waits for server --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 2 +- src/msg.rs | 2 +- src/save.rs | 7 +++-- src/signals.rs | 82 +++++++++++++++++++++++++++----------------------- src/state.rs | 34 ++++++++++++--------- 7 files changed, 71 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cc9ee9..bcf467e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,7 +616,7 @@ dependencies = [ [[package]] name = "markov" -version = "0.7.2" +version = "0.7.3" dependencies = [ "async-compression", "bzip2-sys", diff --git a/Cargo.toml b/Cargo.toml index f748059..dc5b8b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "markov" -version = "0.7.2" +version = "0.7.3" description = "Generate string of text from Markov chain fed by stdin" authors = ["Avril "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index caf058a..82758b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -269,7 +269,7 @@ async fn main() { }) { Ok((addr, server)) => { info!("Server bound on {:?}", addr); - (server, s2.into_inner().into_save_initialiser()) + (server, s2.into_inner().into_initialiser()) }, Err(err) => { error!("Failed to bind server: {}", err); diff --git a/src/msg.rs b/src/msg.rs index 70eced5..1c474a3 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -69,7 +69,7 @@ impl Initialiser { if !*self.rx.borrow() { while !self.rx.recv().await.ok_or_else(|| InitWaitError)? { - tokio::task::yield_now().await; + //tokio::task::yield_now().await; } Ok(()) } else { diff --git a/src/save.rs b/src/save.rs index 2ad51d0..f083b0c 100644 --- a/src/save.rs +++ b/src/save.rs @@ -77,8 +77,9 @@ pub async fn host(mut state: Box) { let to = state.config().file.to_owned(); let interval = state.config().save_interval(); - if state.on_init_save().await.is_ok() { - trace!("Init get"); + trace!("Setup oke. Waiting on init"); + if state.on_init().await.is_ok() { + debug!("Begin save handler"); while Arc::strong_count(state.when()) > 1 { { let chain = state.chain().read().await; @@ -102,7 +103,7 @@ pub async fn host(mut state: Box) } } } else { - trace!("Shutdown called before init completed"); + debug!("Shutdown called before init"); } trace!("Saver exiting"); } diff --git a/src/signals.rs b/src/signals.rs index 8a8a4d0..672ec3f 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -15,45 +15,51 @@ pub async fn handle(mut state: State) let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2"); let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT"); - loop { - tokio::select! { - _ = state.on_shutdown() => { - break; - } - _ = usr1.recv() => { - info!("Got SIGUSR1. Saving chain immediately."); - if let Err(e) = save::save_now(&state).await { - error!("Failed to save chain: {}", e); - } else{ - trace!("Saved chain okay"); - } - }, - _ = usr2.recv() => { - info!("Got SIGUSR1. Loading chain immediately."); - match save::load(&state.config().file).await { - Ok(new) => { - { - let mut chain = state.chain().write().await; - *chain = new; - } - trace!("Replaced with read chain"); - }, - Err(e) => { - error!("Failed to load chain from file, keeping current: {}", e); - }, + trace!("Setup oke. Waiting on init"); + if state.on_init().await.is_ok() { + debug!("Begin signal handler"); + loop { + tokio::select! { + _ = state.on_shutdown() => { + break; } - }, - _ = quit.recv() => { - warn!("Got SIGQUIT. Saving chain then aborting."); - if let Err(e) = save::save_now(&state).await { - error!("Failed to save chain: {}", e); - } else{ - trace!("Saved chain okay."); - } - error!("Aborting"); - std::process::abort() - }, + _ = usr1.recv() => { + info!("Got SIGUSR1. Saving chain immediately."); + if let Err(e) = save::save_now(&state).await { + error!("Failed to save chain: {}", e); + } else{ + trace!("Saved chain okay"); + } + }, + _ = usr2.recv() => { + info!("Got SIGUSR1. Loading chain immediately."); + match save::load(&state.config().file).await { + Ok(new) => { + { + let mut chain = state.chain().write().await; + *chain = new; + } + trace!("Replaced with read chain"); + }, + Err(e) => { + error!("Failed to load chain from file, keeping current: {}", e); + }, + } + }, + _ = quit.recv() => { + warn!("Got SIGQUIT. Saving chain then aborting."); + if let Err(e) = save::save_now(&state).await { + error!("Failed to save chain: {}", e); + } else{ + trace!("Saved chain okay."); + } + error!("Aborting"); + std::process::abort() + }, + } } + } else { + debug!("Shutdown called before init()"); } - trace!("Graceful shutdown"); + trace!("Exiting"); } diff --git a/src/state.rs b/src/state.rs index 2125ef3..a8f5cee 100644 --- a/src/state.rs +++ b/src/state.rs @@ -28,7 +28,7 @@ pub struct State exclude: Arc<(sanitise::filter::Filter, sanitise::filter::Filter)>, chain: Arc>>, save: Arc, - save_begin: Initialiser, + begin: Initialiser, shutdown: Arc>, shutdown_recv: watch::Receiver, @@ -37,30 +37,34 @@ pub struct State impl State { /// Consume this `state` into its initialiser - pub fn into_save_initialiser(self) -> Initialiser + pub fn into_initialiser(self) -> Initialiser { - self.save_begin + self.begin } /// Allow the saver task to start work - pub fn init_save(self) -> Result<(), msg::InitError> + pub fn init(self) -> Result<(), msg::InitError> { - self.save_begin.set() + self.begin.set() } - /// Has `init_save` been called? - pub fn is_init_save(&self) -> bool + /// Has `init` been called? + pub fn is_init(&self) -> bool { - self.save_begin.is_set() + self.begin.is_set() } - /// A future that completes either when `init_save` is called, or `shutdown`. - pub async fn on_init_save(&mut self) -> Result<(), ShutdownError> + /// A future that completes either when `init` is called, or `shutdown`. + pub async fn on_init(&mut self) -> Result<(), ShutdownError> { - tokio::select!{ - Ok(()) = self.save_begin.wait() => Ok(()), - Some(true) = self.shutdown_recv.recv() => { - debug!("on_init_save(): shutdown received"); + if self.has_shutdown() { + return Err(ShutdownError); + } + let mut begin = self.begin.clone(); + tokio::select!{ //fuck + Ok(()) = begin.wait() => Ok(()), + _ = self.on_shutdown() => { + debug!("on_init(): shutdown received"); Err(ShutdownError) } else => Err(ShutdownError) @@ -85,7 +89,7 @@ impl State config: Arc::new(config), chain, save, - save_begin: Initialiser::new(), + begin: Initialiser::new(), shutdown: Arc::new(shutdown), shutdown_recv, }