added instant-init; signal handler also waits for server

feed
Avril 4 years ago
parent 4e1e38a0fd
commit 59dcecded3
Signed by: flanchan
GPG Key ID: 284488987C31F630

2
Cargo.lock generated

@ -616,7 +616,7 @@ dependencies = [
[[package]] [[package]]
name = "markov" name = "markov"
version = "0.7.2" version = "0.7.3"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"bzip2-sys", "bzip2-sys",

@ -1,6 +1,6 @@
[package] [package]
name = "markov" name = "markov"
version = "0.7.2" version = "0.7.3"
description = "Generate string of text from Markov chain fed by stdin" description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"

@ -269,7 +269,7 @@ async fn main() {
}) { }) {
Ok((addr, server)) => { Ok((addr, server)) => {
info!("Server bound on {:?}", addr); info!("Server bound on {:?}", addr);
(server, s2.into_inner().into_save_initialiser()) (server, s2.into_inner().into_initialiser())
}, },
Err(err) => { Err(err) => {
error!("Failed to bind server: {}", err); error!("Failed to bind server: {}", err);

@ -69,7 +69,7 @@ impl Initialiser
{ {
if !*self.rx.borrow() { if !*self.rx.borrow() {
while !self.rx.recv().await.ok_or_else(|| InitWaitError)? { while !self.rx.recv().await.ok_or_else(|| InitWaitError)? {
tokio::task::yield_now().await; //tokio::task::yield_now().await;
} }
Ok(()) Ok(())
} else { } else {

@ -77,8 +77,9 @@ pub async fn host(mut state: Box<State>)
{ {
let to = state.config().file.to_owned(); let to = state.config().file.to_owned();
let interval = state.config().save_interval(); let interval = state.config().save_interval();
if state.on_init_save().await.is_ok() { trace!("Setup oke. Waiting on init");
trace!("Init get"); if state.on_init().await.is_ok() {
debug!("Begin save handler");
while Arc::strong_count(state.when()) > 1 { while Arc::strong_count(state.when()) > 1 {
{ {
let chain = state.chain().read().await; let chain = state.chain().read().await;
@ -102,7 +103,7 @@ pub async fn host(mut state: Box<State>)
} }
} }
} else { } else {
trace!("Shutdown called before init completed"); debug!("Shutdown called before init");
} }
trace!("Saver exiting"); trace!("Saver exiting");
} }

@ -15,45 +15,51 @@ pub async fn handle(mut state: State)
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2"); let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT"); let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT");
loop { trace!("Setup oke. Waiting on init");
tokio::select! { if state.on_init().await.is_ok() {
_ = state.on_shutdown() => { debug!("Begin signal handler");
break; loop {
} tokio::select! {
_ = usr1.recv() => { _ = state.on_shutdown() => {
info!("Got SIGUSR1. Saving chain immediately."); break;
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
},
_ = usr2.recv() => {
info!("Got SIGUSR1. Loading chain immediately.");
match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain().write().await;
*chain = new;
}
trace!("Replaced with read chain");
},
Err(e) => {
error!("Failed to load chain from file, keeping current: {}", e);
},
} }
}, _ = usr1.recv() => {
_ = quit.recv() => { info!("Got SIGUSR1. Saving chain immediately.");
warn!("Got SIGQUIT. Saving chain then aborting."); if let Err(e) = save::save_now(&state).await {
if let Err(e) = save::save_now(&state).await { error!("Failed to save chain: {}", e);
error!("Failed to save chain: {}", e); } else{
} else{ trace!("Saved chain okay");
trace!("Saved chain okay."); }
} },
error!("Aborting"); _ = usr2.recv() => {
std::process::abort() info!("Got SIGUSR1. Loading chain immediately.");
}, match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain().write().await;
*chain = new;
}
trace!("Replaced with read chain");
},
Err(e) => {
error!("Failed to load chain from file, keeping current: {}", e);
},
}
},
_ = quit.recv() => {
warn!("Got SIGQUIT. Saving chain then aborting.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay.");
}
error!("Aborting");
std::process::abort()
},
}
} }
} else {
debug!("Shutdown called before init()");
} }
trace!("Graceful shutdown"); trace!("Exiting");
} }

@ -28,7 +28,7 @@ pub struct State
exclude: Arc<(sanitise::filter::Filter, sanitise::filter::Filter)>, exclude: Arc<(sanitise::filter::Filter, sanitise::filter::Filter)>,
chain: Arc<RwLock<Chain<String>>>, chain: Arc<RwLock<Chain<String>>>,
save: Arc<Notify>, save: Arc<Notify>,
save_begin: Initialiser, begin: Initialiser,
shutdown: Arc<watch::Sender<bool>>, shutdown: Arc<watch::Sender<bool>>,
shutdown_recv: watch::Receiver<bool>, shutdown_recv: watch::Receiver<bool>,
@ -37,30 +37,34 @@ pub struct State
impl State impl State
{ {
/// Consume this `state` into its initialiser /// Consume this `state` into its initialiser
pub fn into_save_initialiser(self) -> Initialiser pub fn into_initialiser(self) -> Initialiser
{ {
self.save_begin self.begin
} }
/// Allow the saver task to start work /// Allow the saver task to start work
pub fn init_save(self) -> Result<(), msg::InitError> pub fn init(self) -> Result<(), msg::InitError>
{ {
self.save_begin.set() self.begin.set()
} }
/// Has `init_save` been called? /// Has `init` been called?
pub fn is_init_save(&self) -> bool pub fn is_init(&self) -> bool
{ {
self.save_begin.is_set() self.begin.is_set()
} }
/// A future that completes either when `init_save` is called, or `shutdown`. /// A future that completes either when `init` is called, or `shutdown`.
pub async fn on_init_save(&mut self) -> Result<(), ShutdownError> pub async fn on_init(&mut self) -> Result<(), ShutdownError>
{ {
tokio::select!{ if self.has_shutdown() {
Ok(()) = self.save_begin.wait() => Ok(()), return Err(ShutdownError);
Some(true) = self.shutdown_recv.recv() => { }
debug!("on_init_save(): shutdown received"); let mut begin = self.begin.clone();
tokio::select!{ //fuck
Ok(()) = begin.wait() => Ok(()),
_ = self.on_shutdown() => {
debug!("on_init(): shutdown received");
Err(ShutdownError) Err(ShutdownError)
} }
else => Err(ShutdownError) else => Err(ShutdownError)
@ -85,7 +89,7 @@ impl State
config: Arc::new(config), config: Arc::new(config),
chain, chain,
save, save,
save_begin: Initialiser::new(), begin: Initialiser::new(),
shutdown: Arc::new(shutdown), shutdown: Arc::new(shutdown),
shutdown_recv, shutdown_recv,
} }

Loading…
Cancel
Save