diff --git a/src/state/service/supervisor/mod.rs b/src/state/service/supervisor/mod.rs index f569b9f..3010cc4 100644 --- a/src/state/service/supervisor/mod.rs +++ b/src/state/service/supervisor/mod.rs @@ -3,7 +3,9 @@ use super::*; use tokio::sync::RwLock; use std::mem::MaybeUninit; use std::ops; -use futures::future::Future; +use futures::prelude::*; +use tokio::time; +use std::{fmt, error}; const SUPERVISOR_BACKLOG: usize = 32; @@ -15,8 +17,11 @@ pub enum SupervisorControl { /// Normal working Initialise, - /// Signal the subtask(s) to shut down, then wait for them and exit. - Signal, + /// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout. + /// + /// # Notes + /// If the timeout expires while waiting, then the mode is switched to `Drop`. + Signal(Option), /// Drop all handles and pipes to subtask(s) then immediately exit. Drop, /// Restart any and all subtask(s) @@ -66,14 +71,6 @@ struct SupervisorShared state: state::State, } -/// The state held by the running superviser service -#[derive(Debug)] -struct SupervisorTaskState -{ - shutdown: watch::Receiver, - recv: mpsc::Receiver, - shared: Arc, -} impl Supervisor { @@ -131,16 +128,162 @@ impl Supervisor shared, recv: pipe_1, shutdown: shutdown_1, - }).await) + }).await.or_else(|err| err.into_own_result())) }), } } } -async fn service_fn(mut state: SupervisorTaskState) -> eyre::Result<()> +/// The state held by the running superviser service +#[derive(Debug)] +struct SupervisorTaskState { - while let Some(req) = state.recv.recv().await { - + shutdown: watch::Receiver, + recv: mpsc::Receiver, + shared: Arc, +} + +/// Detached supervisor server +async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError> +{ + impl Default for TerminationKind + { + #[inline] + fn default() -> Self + { + Self::Graceful + } } + + // The command stream to dispatch to the worker tasks + let command_dispatch = async { + while let Some(req) = recv.recv().await { + //TODO: Dispatch + } + TerminationKind::Graceful + }; + tokio::pin!(command_dispatch); + + // The signal stream to be handled here + let signal_stream = async { + while let Some(value) = shutdown.recv().await + { + use SupervisorControl::*; + match value { + Initialise => (), + Signal(None) => return TerminationKind::SignalHup, + Signal(Some(to)) => return TerminationKind::SignalTimeout(to), + Drop => return TerminationKind::Immediate, + Restart => (), + TaskLimit(_limit) => (), + } + } + TerminationKind::Graceful + }; + tokio::pin!(signal_stream); + + //loop { + tokio::select! { + sd_kind = &mut signal_stream => { + // We received a signal + + return Err(ServiceTerminationError::Signal(sd_kind)); + } + disp_end = &mut command_dispatch => { + // The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so... + + //TODO: Anything here? + } + } + // } + + Ok(()) } +/// The mannor in which the supervisor exited. +#[derive(Debug)] +pub enum TerminationKind +{ + /// The child task(s) were signalled to stop and they were waited on. + SignalHup, + /// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`. + SignalTimeout(time::Duration), + /// Immediately drop everything and exit + Immediate, + /// A non-signalled shutdown. There were no more watchers for the shutdown channel. + Graceful, +} + +impl TerminationKind +{ + /// Convert `TerminationKind::Graceful` into a non-error + fn strip_grace(self) -> Result<(), Self> + { + match self { + Self::Graceful => Ok(()), + e => Err(e), + } + } +} + +impl error::Error for TerminationKind{} +impl fmt::Display for TerminationKind +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::SignalHup => write!(f, "children were signalled to shut down and compiled"), + Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to), + Self::Immediate => write!(f, "children were dropped and an immediate exit was made"), + Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"), + } + } +} + + +/// The error returned on a failed service termination. +#[derive(Debug)] +#[non_exhaustive] +pub enum ServiceTerminationError +{ + /// Was terminated by a signal. + Signal(TerminationKind), + /// Was terminated by a panic. + Panic, + /// There were no more commands being sent through, and the worker gracefully shut down. + Interest, +} + +impl ServiceTerminationError +{ + fn into_own_result(self) -> Result<(), Self> + { + match self { + Self::Signal(term) => term.strip_grace().map_err(Self::Signal), + x => Err(x), + } + } +} + +impl error::Error for ServiceTerminationError +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> + { + Some(match &self { + Self::Signal(ts) => ts, + _ => return None, + }) + } +} +impl fmt::Display for ServiceTerminationError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Signal(_) => write!(f, "shut down by signal"), + Self::Panic => write!(f, "shut down by panic. this is usually fatal"), + Self::Interest => write!(f, "all communications with this service stopped"), + } + } +} +