You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yuurei/src/state/service/supervisor/mod.rs

310 lines
8.0 KiB

//! Handles spawning and restarting service task(s)
use super::*;
use tokio::sync::RwLock;
use std::mem::MaybeUninit;
use std::ops;
use futures::prelude::*;
use tokio::time;
use std::{fmt, error};
const SUPERVISOR_BACKLOG: usize = 32;
mod dispatch; pub use dispatch::*;
TODO: This all needs redoing when i'm actually lucid. This part seems okay but the rest of `service` needs to go and be replaced by something like this
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorControl
{
/// Normal working
Initialise,
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
///
/// # Notes
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
Signal(Option<time::Duration>),
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Restart any and all subtask(s)
Restart,
/// Set the max task limit. Default is 0.
TaskLimit(usize),
}
impl Default for SupervisorControl
{
#[inline]
fn default() -> Self
{
Self::Initialise
}
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
/// Handle for the supervisor task itself
handle: JoinHandle<ExitStatus>,
/// Watch sender for signalling shutdowns the supervisor task itself
shutdown: watch::Sender<SupervisorControl>,
/// The pipe to send requests to the supervisor's subtasks
pipe: mpsc::Sender<ServiceRequest>,
/// The initial receiver created from `broadcast_root`.
broadcast_receiver: broadcast::Receiver<ServiceEvent>,
/// Data shared between the supervisor's task and its controller instance here.
shared: Arc<SupervisorShared>,
}
/// Object shared btweeen the Supervisor control instance and its supervisor task.
#[derive(Debug)]
struct SupervisorShared
{
/// this is for filtering specific messages to specific subscribers
sub: RwLock<BTreeMap<ServiceEventKind, SESet<ServiceSubID>>>,
broadcast_root: broadcast::Sender<ServiceEvent>,
state: state::State,
}
/// A subscriber to supervisor task(s) event pump
#[derive(Debug)]
pub struct Subscriber
{
id: ServiceSubID,
/// For directed messages
spec: mpsc::Receiver<ServiceEvent>,
/// For broadcast messages
broad: broadcast::Receiver<ServiceEvent>,
}
impl Supervisor
{
/// Attempt to send a control signal to the supervisor itself
pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError<SupervisorControl>>
{
self.shutdown.broadcast(sig)?;
Ok(())
}
/// Drop all communications with background worker and wait for it to complete
pub async fn join_now(self) -> eyre::Result<()>
{
let handle = {
let Self { handle, ..} = self; // drop everything else
handle
};
handle.await?;
Ok(())
}
/// Check if the background worker has not been dropped.
///
/// If this returns false it usually indicates a fatal error.
pub fn is_alive(&self) -> bool
{
Arc::strong_count(&self.shared) > 1
}
/// Create a new supervisor for this state.
pub fn new(state: state::State) -> Self
{
let shutdown = watch::channel(Default::default());
let pipe = mpsc::channel(SUPERVISOR_BACKLOG);
let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG);
let shared = Arc::new(SupervisorShared{
broadcast_root,
state,
});
let (shutdown_0, shutdown_1) = shutdown;
let (pipe_0, pipe_1) = pipe;
Self {
shutdown: shutdown_0,
pipe: pipe_0,
broadcast_receiver,
shared: Arc::clone(&shared),
handle: tokio::spawn(async move {
let shared = shared;
ExitStatus::from(service_fn(SupervisorTaskState {
shared,
recv: pipe_1,
shutdown: shutdown_1,
}).await.or_else(|err| err.into_own_result()))
}),
}
}
}
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
/// Detached supervisor server
async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError>
{
impl Default for TerminationKind
{
#[inline]
fn default() -> Self
{
Self::Graceful
}
}
// The command stream to dispatch to the worker tasks
let command_dispatch = async {
while let Some(req) = recv.recv().await {
todo!("Dispatch to child(s)");
}
TerminationKind::Graceful
};
tokio::pin!(command_dispatch);
// The signal stream to be handled here
let signal_stream = async {
while let Some(value) = shutdown.recv().await
{
use SupervisorControl::*;
match value {
Initialise => info!("Initialised"),
Signal(None) => return TerminationKind::SignalHup,
Signal(Some(to)) => return TerminationKind::SignalTimeout(to),
Drop => return TerminationKind::Immediate,
Restart => todo!("not implemented"),
TaskLimit(_limit) => todo!("not implemented"),
}
}
TerminationKind::Graceful
};
tokio::pin!(signal_stream);
//loop {
tokio::select! {
sd_kind = &mut signal_stream => {
// We received a signal
Err(ServiceTerminationError::Signal(sd_kind))
}
disp_end = &mut command_dispatch => {
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
disp_end.into()
}
}
// }
}
/// The mannor in which the supervisor exited.
#[derive(Debug)]
pub enum TerminationKind
{
/// The child task(s) were signalled to stop and they were waited on.
SignalHup,
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
SignalTimeout(time::Duration),
/// Immediately drop everything and exit
Immediate,
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
Graceful,
}
impl TerminationKind
{
/// Convert `TerminationKind::Graceful` into a non-error
fn strip_grace(self) -> Result<(), Self>
{
match self {
Self::Graceful => Ok(()),
e => Err(e),
}
}
}
impl error::Error for TerminationKind{}
impl fmt::Display for TerminationKind
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SignalHup => write!(f, "children were signalled to shut down and compiled"),
Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to),
Self::Immediate => write!(f, "children were dropped and an immediate exit was made"),
Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"),
}
}
}
/// The error returned on a failed service termination.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceTerminationError
{
/// Was terminated by a signal.
Signal(TerminationKind),
/// Was terminated by a panic.
Panic,
/// There were no more commands being sent through, and the worker gracefully shut down.
Interest,
}
impl From<TerminationKind> for Result<(), ServiceTerminationError>
{
fn from(from: TerminationKind) -> Self
{
ServiceTerminationError::Signal(from).into_own_result()
}
}
impl ServiceTerminationError
{
fn into_own_result(self) -> Result<(), Self>
{
match self {
Self::Signal(term) => term.strip_grace().map_err(Self::Signal),
x => Err(x),
}
}
}
impl error::Error for ServiceTerminationError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)>
{
Some(match &self {
Self::Signal(ts) => ts,
_ => return None,
})
}
}
impl fmt::Display for ServiceTerminationError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Signal(_) => write!(f, "shut down by signal"),
Self::Panic => write!(f, "shut down by panic. this is usually fatal"),
Self::Interest => write!(f, "all communications with this service stopped"),
}
}
}