start state service supervisor pipeline

i have n fucking idea what i was doing
another-service-failure
Avril 4 years ago
parent a17b2539d0
commit 578a9fd788
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -3,7 +3,9 @@ use super::*;
use tokio::sync::RwLock;
use std::mem::MaybeUninit;
use std::ops;
use futures::future::Future;
use futures::prelude::*;
use tokio::time;
use std::{fmt, error};
const SUPERVISOR_BACKLOG: usize = 32;
@ -15,8 +17,11 @@ pub enum SupervisorControl
{
/// Normal working
Initialise,
/// Signal the subtask(s) to shut down, then wait for them and exit.
Signal,
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
///
/// # Notes
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
Signal(Option<time::Duration>),
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Restart any and all subtask(s)
@ -66,14 +71,6 @@ struct SupervisorShared
state: state::State,
}
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
impl Supervisor
{
@ -131,16 +128,162 @@ impl Supervisor
shared,
recv: pipe_1,
shutdown: shutdown_1,
}).await)
}).await.or_else(|err| err.into_own_result()))
}),
}
}
}
async fn service_fn(mut state: SupervisorTaskState) -> eyre::Result<()>
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
/// Detached supervisor server
async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError>
{
while let Some(req) = state.recv.recv().await {
impl Default for TerminationKind
{
#[inline]
fn default() -> Self
{
Self::Graceful
}
}
// The command stream to dispatch to the worker tasks
let command_dispatch = async {
while let Some(req) = recv.recv().await {
//TODO: Dispatch
}
TerminationKind::Graceful
};
tokio::pin!(command_dispatch);
// The signal stream to be handled here
let signal_stream = async {
while let Some(value) = shutdown.recv().await
{
use SupervisorControl::*;
match value {
Initialise => (),
Signal(None) => return TerminationKind::SignalHup,
Signal(Some(to)) => return TerminationKind::SignalTimeout(to),
Drop => return TerminationKind::Immediate,
Restart => (),
TaskLimit(_limit) => (),
}
}
TerminationKind::Graceful
};
tokio::pin!(signal_stream);
//loop {
tokio::select! {
sd_kind = &mut signal_stream => {
// We received a signal
return Err(ServiceTerminationError::Signal(sd_kind));
}
disp_end = &mut command_dispatch => {
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
//TODO: Anything here?
}
}
// }
Ok(())
}
/// The mannor in which the supervisor exited.
#[derive(Debug)]
pub enum TerminationKind
{
/// The child task(s) were signalled to stop and they were waited on.
SignalHup,
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
SignalTimeout(time::Duration),
/// Immediately drop everything and exit
Immediate,
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
Graceful,
}
impl TerminationKind
{
/// Convert `TerminationKind::Graceful` into a non-error
fn strip_grace(self) -> Result<(), Self>
{
match self {
Self::Graceful => Ok(()),
e => Err(e),
}
}
}
impl error::Error for TerminationKind{}
impl fmt::Display for TerminationKind
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SignalHup => write!(f, "children were signalled to shut down and compiled"),
Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to),
Self::Immediate => write!(f, "children were dropped and an immediate exit was made"),
Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"),
}
}
}
/// The error returned on a failed service termination.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceTerminationError
{
/// Was terminated by a signal.
Signal(TerminationKind),
/// Was terminated by a panic.
Panic,
/// There were no more commands being sent through, and the worker gracefully shut down.
Interest,
}
impl ServiceTerminationError
{
fn into_own_result(self) -> Result<(), Self>
{
match self {
Self::Signal(term) => term.strip_grace().map_err(Self::Signal),
x => Err(x),
}
}
}
impl error::Error for ServiceTerminationError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)>
{
Some(match &self {
Self::Signal(ts) => ts,
_ => return None,
})
}
}
impl fmt::Display for ServiceTerminationError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Signal(_) => write!(f, "shut down by signal"),
Self::Panic => write!(f, "shut down by panic. this is usually fatal"),
Self::Interest => write!(f, "all communications with this service stopped"),
}
}
}

Loading…
Cancel
Save