//! Handles spawning and restarting service task(s)
use super ::* ;
use tokio ::sync ::RwLock ;
use std ::mem ::MaybeUninit ;
use std ::ops ;
use futures ::prelude ::* ;
use tokio ::time ;
use std ::{ fmt , error } ;
const SUPERVISOR_BACKLOG : usize = 32 ;
mod dispatch ; pub use dispatch ::* ;
//TODO: This all needs redoing when i'm actually lucid. This part seems okay but the rest of `service` needs to go and be replaced by something like this
/// Signal the shutdown method to the supervisor.
#[ derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy) ]
pub enum SupervisorControl
{
/// Normal working
Initialise ,
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
///
/// # Notes
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
Signal ( Option < time ::Duration > ) ,
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop ,
/// Restart any and all subtask(s)
Restart ,
/// Set the max task limit. Default is 0.
TaskLimit ( usize ) ,
}
impl Default for SupervisorControl
{
#[ inline ]
fn default ( ) -> Self
{
Self ::Initialise
}
}
/// Supervisor responsible for spawning the state handler service.
#[ derive(Debug) ]
pub ( super ) struct Supervisor
{
/// Handle for the supervisor task itself
handle : JoinHandle < ExitStatus > ,
/// Watch sender for signalling shutdowns the supervisor task itself
shutdown : watch ::Sender < SupervisorControl > ,
/// The pipe to send requests to the supervisor's subtasks
pipe : mpsc ::Sender < ServiceRequest > ,
/// The initial receiver created from `broadcast_root`.
broadcast_receiver : broadcast ::Receiver < ServiceEvent > ,
/// Data shared between the supervisor's task and its controller instance here.
shared : Arc < SupervisorShared > ,
}
/// Object shared btweeen the Supervisor control instance and its supervisor task.
#[ derive(Debug) ]
struct SupervisorShared
{
/// this is for filtering specific messages to specific subscribers
sub : RwLock < BTreeMap < ServiceEventKind , SESet < ServiceSubID > > > ,
broadcast_root : broadcast ::Sender < ServiceEvent > ,
state : state ::State ,
}
/// A subscriber to supervisor task(s) event pump
#[ derive(Debug) ]
pub struct Subscriber
{
id : ServiceSubID ,
/// For directed messages
spec : mpsc ::Receiver < ServiceEvent > ,
/// For broadcast messages
broad : broadcast ::Receiver < ServiceEvent > ,
}
impl Supervisor
{
/// Attempt to send a control signal to the supervisor itself
pub fn signal_control ( & self , sig : SupervisorControl ) -> Result < ( ) , watch ::error ::SendError < SupervisorControl > >
{
self . shutdown . broadcast ( sig ) ? ;
Ok ( ( ) )
}
/// Drop all communications with background worker and wait for it to complete
pub async fn join_now ( self ) -> eyre ::Result < ( ) >
{
let handle = {
let Self { handle , .. } = self ; // drop everything else
handle
} ;
handle . await ? ;
Ok ( ( ) )
}
/// Check if the background worker has not been dropped.
///
/// If this returns false it usually indicates a fatal error.
pub fn is_alive ( & self ) -> bool
{
Arc ::strong_count ( & self . shared ) > 1
}
/// Create a new supervisor for this state.
pub fn new ( state : state ::State ) -> Self
{
let shutdown = watch ::channel ( Default ::default ( ) ) ;
let pipe = mpsc ::channel ( SUPERVISOR_BACKLOG ) ;
let ( broadcast_root , broadcast_receiver ) = broadcast ::channel ( SUPERVISOR_BACKLOG ) ;
let shared = Arc ::new ( SupervisorShared {
broadcast_root ,
state ,
} ) ;
let ( shutdown_0 , shutdown_1 ) = shutdown ;
let ( pipe_0 , pipe_1 ) = pipe ;
Self {
shutdown : shutdown_0 ,
pipe : pipe_0 ,
broadcast_receiver ,
shared : Arc ::clone ( & shared ) ,
handle : tokio ::spawn ( async move {
let shared = shared ;
ExitStatus ::from ( service_fn ( SupervisorTaskState {
shared ,
recv : pipe_1 ,
shutdown : shutdown_1 ,
} ) . await . or_else ( | err | err . into_own_result ( ) ) )
} ) ,
}
}
}
/// The state held by the running superviser service
#[ derive(Debug) ]
struct SupervisorTaskState
{
shutdown : watch ::Receiver < SupervisorControl > ,
recv : mpsc ::Receiver < ServiceRequest > ,
shared : Arc < SupervisorShared > ,
}
/// Detached supervisor server
async fn service_fn ( SupervisorTaskState { shared , mut recv , mut shutdown } : SupervisorTaskState ) -> Result < ( ) , ServiceTerminationError >
{
impl Default for TerminationKind
{
#[ inline ]
fn default ( ) -> Self
{
Self ::Graceful
}
}
// The command stream to dispatch to the worker tasks
let command_dispatch = async {
while let Some ( req ) = recv . recv ( ) . await {
todo! ( "Dispatch to child(s)" ) ;
}
TerminationKind ::Graceful
} ;
tokio ::pin ! ( command_dispatch ) ;
// The signal stream to be handled here
let signal_stream = async {
while let Some ( value ) = shutdown . recv ( ) . await
{
use SupervisorControl ::* ;
match value {
Initialise = > info ! ( "Initialised" ) ,
Signal ( None ) = > return TerminationKind ::SignalHup ,
Signal ( Some ( to ) ) = > return TerminationKind ::SignalTimeout ( to ) ,
Drop = > return TerminationKind ::Immediate ,
Restart = > todo! ( "not implemented" ) ,
TaskLimit ( _limit ) = > todo! ( "not implemented" ) ,
}
}
TerminationKind ::Graceful
} ;
tokio ::pin ! ( signal_stream ) ;
//loop {
tokio ::select ! {
sd_kind = & mut signal_stream = > {
// We received a signal
Err ( ServiceTerminationError ::Signal ( sd_kind ) )
}
disp_end = & mut command_dispatch = > {
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
disp_end . into ( )
}
}
// }
}
/// The mannor in which the supervisor exited.
#[ derive(Debug) ]
pub enum TerminationKind
{
/// The child task(s) were signalled to stop and they were waited on.
SignalHup ,
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
SignalTimeout ( time ::Duration ) ,
/// Immediately drop everything and exit
Immediate ,
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
Graceful ,
}
impl TerminationKind
{
/// Convert `TerminationKind::Graceful` into a non-error
fn strip_grace ( self ) -> Result < ( ) , Self >
{
match self {
Self ::Graceful = > Ok ( ( ) ) ,
e = > Err ( e ) ,
}
}
}
impl error ::Error for TerminationKind { }
impl fmt ::Display for TerminationKind
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
match self {
Self ::SignalHup = > write! ( f , "children were signalled to shut down and compiled" ) ,
Self ::SignalTimeout ( to ) = > write! ( f , "children were signalled to shut but did not do so within the {:?} timeout" , to ) ,
Self ::Immediate = > write! ( f , "children were dropped and an immediate exit was made" ) ,
Self ::Graceful = > write! ( f , "a graceful shutdown order was issued and compiled with" ) ,
}
}
}
/// The error returned on a failed service termination.
#[ derive(Debug) ]
#[ non_exhaustive ]
pub enum ServiceTerminationError
{
/// Was terminated by a signal.
Signal ( TerminationKind ) ,
/// Was terminated by a panic.
Panic ,
/// There were no more commands being sent through, and the worker gracefully shut down.
Interest ,
}
impl From < TerminationKind > for Result < ( ) , ServiceTerminationError >
{
fn from ( from : TerminationKind ) -> Self
{
ServiceTerminationError ::Signal ( from ) . into_own_result ( )
}
}
impl ServiceTerminationError
{
fn into_own_result ( self ) -> Result < ( ) , Self >
{
match self {
Self ::Signal ( term ) = > term . strip_grace ( ) . map_err ( Self ::Signal ) ,
x = > Err ( x ) ,
}
}
}
impl error ::Error for ServiceTerminationError
{
fn source ( & self ) -> Option < & ( dyn error ::Error + ' static ) >
{
Some ( match & self {
Self ::Signal ( ts ) = > ts ,
_ = > return None ,
} )
}
}
impl fmt ::Display for ServiceTerminationError
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
match self {
Self ::Signal ( _ ) = > write! ( f , "shut down by signal" ) ,
Self ::Panic = > write! ( f , "shut down by panic. this is usually fatal" ) ,
Self ::Interest = > write! ( f , "all communications with this service stopped" ) ,
}
}
}