signal handling

serve
Avril 4 years ago
parent 66470d6be5
commit 45a93939de
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,6 +1,6 @@
[package] [package]
name = "markov" name = "markov"
version = "0.2.0" version = "0.3.0"
description = "Generate string of text from Markov chain fed by stdin" description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"

@ -32,6 +32,14 @@ use serde::{
Serialize, Serialize,
Deserialize Deserialize
}; };
use futures::{
future::{
FutureExt,
BoxFuture,
join_all,
},
};
macro_rules! status { macro_rules! status {
($code:expr) => { ($code:expr) => {
@ -39,6 +47,8 @@ macro_rules! status {
}; };
} }
#[cfg(target_family="unix")]
mod signals;
mod config; mod config;
mod state; mod state;
use state::State; use state::State;
@ -80,7 +90,8 @@ async fn main() {
}, },
})); }));
{ {
let (state, chain, saver) = { let mut tasks = Vec::<BoxFuture<'static, ()>>::new();
let (state, chain) = {
let save_when = Arc::new(Notify::new()); let save_when = Arc::new(Notify::new());
let state = State::new(config, let state = State::new(config,
@ -89,7 +100,9 @@ async fn main() {
let state2 = state.clone(); let state2 = state.clone();
let saver = tokio::spawn(save::host(state.clone())); let saver = tokio::spawn(save::host(state.clone()));
let chain = warp::any().map(move || state.clone()); let chain = warp::any().map(move || state.clone());
(state2, chain, saver)
tasks.push(saver.map(|res| res.expect("Saver panicked")).boxed());
(state2, chain)
}; };
let client_ip = if state.config().trust_x_forwarded_for { let client_ip = if state.config().trust_x_forwarded_for {
@ -135,13 +148,16 @@ async fn main() {
}) })
.with(warp::log("markov::read")); .with(warp::log("markov::read"));
#[cfg(target_family="unix")]
tasks.push(tokio::spawn(signals::handle(state.clone())).map(|res| res.expect("Signal handler panicked")).boxed());
let (addr, server) = warp::serve(push let (addr, server) = warp::serve(push
.or(read)) .or(read))
.bind_with_graceful_shutdown(state.config().bindpoint, async move { .bind_with_graceful_shutdown(state.config().bindpoint, async move {
tokio::signal::ctrl_c().await.unwrap(); tokio::signal::ctrl_c().await.unwrap();
state.shutdown(); state.shutdown();
}); });
info!("Server bound on {:?}", addr); info!("Server bound on {:?}", addr);
server.await; server.await;
@ -149,7 +165,7 @@ async fn main() {
async move { async move {
trace!("Cleanup"); trace!("Cleanup");
saver.await.expect("Saver panicked"); join_all(tasks).await;
} }
}.await; }.await;
info!("Shut down gracefully") info!("Shut down gracefully")

@ -33,7 +33,16 @@ use lzzzz::{
const SAVE_INTERVAL: Option<Duration> = Some(Duration::from_secs(2)); const SAVE_INTERVAL: Option<Duration> = Some(Duration::from_secs(2));
pub async fn save_now(chain: &Chain<String>, to: impl AsRef<Path>) -> io::Result<()>
pub async fn save_now(state: &State) -> io::Result<()>
{
let chain = state.chain().read().await;
use std::ops::Deref;
let to = &state.config().file;
save_now_to(chain.deref(),to).await
}
async fn save_now_to(chain: &Chain<String>, to: impl AsRef<Path>) -> io::Result<()>
{ {
debug!("Saving chain to {:?}", to.as_ref()); debug!("Saving chain to {:?}", to.as_ref());
let file = OpenOptions::new() let file = OpenOptions::new()
@ -59,7 +68,7 @@ pub async fn host(state: State)
{ {
let chain = state.chain().read().await; let chain = state.chain().read().await;
use std::ops::Deref; use std::ops::Deref;
if let Err(e) = save_now(chain.deref(), &to).await { if let Err(e) = save_now_to(chain.deref(), &to).await {
error!("Failed to save chain: {}", e); error!("Failed to save chain: {}", e);
} else { } else {
info!("Saved chain to {:?}", to); info!("Saved chain to {:?}", to);

@ -0,0 +1,59 @@
//! Unix signals
use super::*;
use tokio::{
signal::unix::{
self,
SignalKind,
},
};
pub async fn handle(mut state: State)
{
let mut usr1 = unix::signal(SignalKind::user_defined1()).expect("Failed to hook SIGUSR1");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT");
loop {
tokio::select! {
_ = state.on_shutdown() => {
break;
}
_ = usr1.recv() => {
info!("Got SIGUSR1. Saving chain immediately.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
},
_ = usr2.recv() => {
info!("Got SIGUSR1. Loading chain immediately.");
match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain().write().await;
*chain = new;
}
trace!("Replaced with read chain");
},
Err(e) => {
error!("Failed to load chain from file, keeping current: {}", e);
},
}
},
_ = quit.recv() => {
warn!("Got SIGQUIT. Saving chain then aborting.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay.");
}
error!("Aborting");
std::process::abort()
},
}
}
trace!("Graceful shutdown");
}

@ -63,7 +63,7 @@ impl State
*self.shutdown_recv.borrow() *self.shutdown_recv.borrow()
} }
pub async fn on_shutdown(mut self) pub async fn on_shutdown(&mut self)
{ {
if !self.has_shutdown() { if !self.has_shutdown() {
while let Some(false) = self.shutdown_recv.recv().await { while let Some(false) = self.shutdown_recv.recv().await {

Loading…
Cancel
Save