From a17b2539d07919f31c6f63b1fb82bb64f80c9be5 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 17 Jan 2021 09:11:10 +0000 Subject: [PATCH] state: service_fn start --- src/service.rs | 17 +- src/state/service.rs | 331 ----------------------- src/state/service/events.rs | 141 ++++++++++ src/state/service/mod.rs | 28 ++ src/state/service/obj.rs | 114 ++++++++ src/state/service/resreq.rs | 65 +++++ src/state/service/supervisor/dispatch.rs | 92 +++++++ src/state/service/supervisor/mod.rs | 146 ++++++++++ 8 files changed, 601 insertions(+), 333 deletions(-) delete mode 100644 src/state/service.rs create mode 100644 src/state/service/events.rs create mode 100644 src/state/service/mod.rs create mode 100644 src/state/service/obj.rs create mode 100644 src/state/service/resreq.rs create mode 100644 src/state/service/supervisor/dispatch.rs create mode 100644 src/state/service/supervisor/mod.rs diff --git a/src/service.rs b/src/service.rs index c3f4381..0008103 100644 --- a/src/service.rs +++ b/src/service.rs @@ -90,6 +90,19 @@ pub enum ExitStatus Error(eyre::Report), } +impl> From> for ExitStatus +{ + fn from(from: Result) -> Self + { + match from + { + Ok(v) => Self::Graceful(v), + Err(e) => Self::Error(e.into()) + } + } +} + + #[derive(Debug)] /// The error `ExitStatus::Abnormal` converts to. pub struct AbnormalExitError; @@ -102,14 +115,14 @@ impl fmt::Display for AbnormalExitError write!(f, "service terminated in an abnormal way") } } - +/* impl From for ExitStatus { fn from(from: T) -> Self { Self::Error(from.into()) } -} +}*/ impl Default for ExitStatus { #[inline] diff --git a/src/state/service.rs b/src/state/service.rs deleted file mode 100644 index 90fb0dc..0000000 --- a/src/state/service.rs +++ /dev/null @@ -1,331 +0,0 @@ -//! Global state service -use super::*; -use tokio::{ - sync::{ - watch, - mpsc, - oneshot, - broadcast, - }, - task::JoinHandle, -}; -use crate::service::{ - ExitStatus, -}; -use std::{error, fmt}; -use std::sync::Weak; -use std::any::Any; -use std::collections::{BTreeMap}; - - -id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages"); -id_type!(BroadcastID; "Each broadcast message has a unique ID."); - -/// Signal the shutdown method to the supervisor. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)] -pub enum SupervisorShutdownStatus -{ - /// Signal the subtask(s) to shut down, then wait for them and exit. - Signal, - /// Drop all handles and pipes to subtask(s) then immediately exit. - Drop, - /// Keep working - Normal, - /// Restart any and all subtask(s) - Restart, -} - -impl Default for SupervisorShutdownStatus -{ - #[inline] - fn default() -> Self - { - SupervisorShutdownStatus::Normal - } -} - -/// The kind of command to send to the the service -#[derive(Debug)] -pub enum ServiceCommandKind -{ - -} - -#[derive(Debug)] -pub enum ServiceResponseKind -{ - -} - -#[derive(Debug)] -pub struct ServiceResponse(Option); - -#[derive(Debug)] -pub struct ServiceCommand -{ - kind: ServiceCommandKind, - output: oneshot::Sender, -} - -/// Supervisor responsible for spawning the state handler service. -#[derive(Debug)] -pub(super) struct Supervisor -{ - handle: JoinHandle, - - shutdown: watch::Sender, - - pipe: mpsc::Sender, - sub: BTreeMap -} - -/// Object sent through the broadcast channel. -/// -/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable. -#[derive(Clone)] -#[repr(transparent)] -pub struct ServiceEventObject(Arc); -shim_debug!(ServiceEventObject); - -/// A weak reference to a `ServiceEventObject`. -#[derive(Clone)] -#[repr(transparent)] -pub struct ServiceEventObjectRef(Weak); -shim_debug!(ServiceEventObjectRef); - -impl ServiceEventObjectRef -{ - /// Try to upgrade to a concrete reference, and then clone the inner object. - pub fn try_clone(&self) -> Option - { - match self.0.upgrade() - { - Some(arc) => Some(ServiceEventObject(arc).clone()), - None => None - } - } - /// Try to upgrade to a concrete reference. - pub fn upgrade(self) -> Result - { - match self.0.upgrade() - { - Some(arc) => Ok(ServiceEventObject(arc)), - None => Err(self), - } - } - - /// Check if the object has not been destroyed yet. - pub fn is_alive(&self) -> bool - { - self.0.strong_count() > 0 - } -} - - -impl ServiceEventObject -{ - pub fn clone_inner(&self) -> Self - { - Self(Arc::from(self.0.clone_dyn_any_sync())) - } - /// Get a weak reference counted handle to the object, without cloning the object itself. - pub fn clone_weak(&self) -> ServiceEventObjectRef - { - ServiceEventObjectRef(Arc::downgrade(&self.0)) - } - - /// Try to downcast the inner object to a concrete type and then clone it. - /// - /// This will fail if: - /// * The downcasted type is invalid - #[inline] pub fn downcast_clone(&self) -> Option - { - self.downcast_ref::().map(|x| *x.clone_dyn_any().downcast().unwrap()) - } - /// Try to consume this instance into downcast. - /// - /// This will fail if: - /// * The downcasted type is invalid - /// * There are other references to this object (created through `clone_ref()`.). - pub fn try_into_downcast(self) -> Result - { - match Arc::downcast(self.0) - { - Ok(v) => match Arc::try_unwrap(v) { - Ok(v) => Ok(v), - Err(s) => Err(Self(s)), - }, - Err(e) => Err(Self(e)), - } - } - - /// Check if there are any other references to this object - #[inline] pub fn is_unique(&self) -> bool - { - Arc::strong_count(&self.0) == 1 - } - - /// Try to downcast the object into a concrete type - #[inline] pub fn is(&self) -> bool - { - self.0.as_ref().is::() - } - /// Try to downcast the object into a concrete type - #[inline] pub fn downcast_ref(&self) -> Option<&T> - { - self.0.as_ref().downcast_ref::() - } -} - -/// The kind of event outputted from a state service's broadcast stream -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] -pub enum ServiceEventKind -{ - /// Does nothing. - /// - /// # Associated object - /// `()`. - Ping, - /// Does nothing. - /// - /// # Associated object - /// None. - KeepAlive, -} - -cfg_if!{ - if #[cfg(debug_assertions)] { - /// Type used for directed array. - /// Currently testing `smolset` over eagerly allocating. - type SESet = smolset::SmolSet<[T; 1]>; - } else { - type SESet = std::collections::HashSet; - } -} - -/// An event outputted from a state service's broadcast stream -#[derive(Debug, Clone)] -pub struct ServiceEvent -{ - bc_id: BroadcastID, - - kind: ServiceEventKind, - directed: Option>, - obj: Option, -} - -impl ServiceEvent -{ - /// Create a new event to be broadcast - fn new(kind: ServiceEventKind, directed: Option>, obj: Option) -> Self - where T: Any + Send + Sync + 'static - { - Self { - bc_id: BroadcastID::id_new(), - kind, - directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 { - None - } else { - Some(n) - }), - obj: obj.map(|x| ServiceEventObject(Arc::new(x))), - } - } - - #[inline] pub fn id(&self) -> &BroadcastID - { - &self.bc_id - } - - /// The kind of this event. - #[inline] pub fn kind(&self) -> ServiceEventKind - { - self.kind - } - - /// Check if this event is for you - pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool - { - if let Some(yes) = self.directed.as_ref() { - yes.contains(whom) - } else { - false - } - } - /// Check if this event is directed to anyone - pub fn is_directed(&self) -> bool - { - self.directed.is_some() - } - - /// Check who this event is directed to. - /// - /// If it is not directed, an empty slice will be returned. - pub fn directed_to(&self) -> impl Iterator + '_ - { - match self.directed.as_ref() - { - Some(yes) => MaybeIter::many(yes.iter()), - None => MaybeIter::none(), - } - } - - /// Get a reference to the object, if there is one. - pub fn obj_ref(&self) -> Option<&ServiceEventObject> - { - self.obj.as_ref() - } - - /// Get a mutable reference to the object, if there is one. - pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject> - { - self.obj.as_mut() - } - - /// Try to consume into the inner object. If there is no object, return self. - pub fn try_into_object(self) -> Result - { - match self.obj - { - Some(obj) => Ok(obj), - None => Err(self), - } - } -} - - -impl From for Option -{ - #[inline] fn from(from: ServiceEvent) -> Self - { - from.obj - } -} - -impl TryFrom for ServiceEventObject -{ - type Error = NoObjectError; - - #[inline] fn try_from(from: ServiceEvent) -> Result - { - match from.obj - { - Some(obj) => Ok(obj), - None => Err(NoObjectError), - } - } -} - - -#[derive(Debug)] -/// Error returned from trying to extract an object from a `ServiceEvent` which has none. -pub struct NoObjectError; - -impl error::Error for NoObjectError{} -impl fmt::Display for NoObjectError -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result - { - write!(f, "there was no object broadcasted along with this ") - } -} diff --git a/src/state/service/events.rs b/src/state/service/events.rs new file mode 100644 index 0000000..33a0dff --- /dev/null +++ b/src/state/service/events.rs @@ -0,0 +1,141 @@ +//! Controls the broadcasting of events sent from the state service task +use super::*; + +/// The kind of event outputted from a state service's broadcast stream +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] +pub enum ServiceEventKind +{ + /// Does nothing. + /// + /// # Associated object + /// `()`. + Ping, + /// Does nothing. + /// + /// # Associated object + /// None. + KeepAlive, +} + +cfg_if!{ + if #[cfg(debug_assertions)] { + /// Type used for directed array. + /// Currently testing `smolset` over eagerly allocating. + type SESet = smolset::SmolSet<[T; 1]>; + } else { + type SESet = std::collections::HashSet; + } +} + +/// An event outputted from a state service's broadcast stream +#[derive(Debug, Clone)] +pub struct ServiceEvent +{ + bc_id: BroadcastID, + + kind: ServiceEventKind, + directed: Option>, + obj: Option, +} + +impl ServiceEvent +{ + /// Create a new event to be broadcast + fn new(kind: ServiceEventKind, directed: Option>, obj: Option) -> Self + where T: Any + Send + Sync + 'static + { + Self { + bc_id: BroadcastID::id_new(), + kind, + directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 { + None + } else { + Some(n) + }), + obj: obj.map(|x| ServiceEventObject(Arc::new(x))), + } + } + + #[inline] pub fn id(&self) -> &BroadcastID + { + &self.bc_id + } + + /// The kind of this event. + #[inline] pub fn kind(&self) -> ServiceEventKind + { + self.kind + } + + /// Check if this event is for you + pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool + { + if let Some(yes) = self.directed.as_ref() { + yes.contains(whom) + } else { + false + } + } + /// Check if this event is directed to anyone + pub fn is_directed(&self) -> bool + { + self.directed.is_some() + } + + /// Check who this event is directed to. + /// + /// If it is not directed, an empty slice will be returned. + pub fn directed_to(&self) -> impl Iterator + '_ + { + match self.directed.as_ref() + { + Some(yes) => MaybeIter::many(yes.iter()), + None => MaybeIter::none(), + } + } + + /// Get a reference to the object, if there is one. + pub fn obj_ref(&self) -> Option<&ServiceEventObject> + { + self.obj.as_ref() + } + + /// Get a mutable reference to the object, if there is one. + pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject> + { + self.obj.as_mut() + } + + /// Try to consume into the inner object. If there is no object, return self. + pub fn try_into_object(self) -> Result + { + match self.obj + { + Some(obj) => Ok(obj), + None => Err(self), + } + } +} + + +impl From for Option +{ + #[inline] fn from(from: ServiceEvent) -> Self + { + from.obj + } +} + +impl TryFrom for ServiceEventObject +{ + type Error = NoObjectError; + + #[inline] fn try_from(from: ServiceEvent) -> Result + { + match from.obj + { + Some(obj) => Ok(obj), + None => Err(NoObjectError), + } + } +} diff --git a/src/state/service/mod.rs b/src/state/service/mod.rs new file mode 100644 index 0000000..bf88f36 --- /dev/null +++ b/src/state/service/mod.rs @@ -0,0 +1,28 @@ +//! Global state service +use super::*; +use tokio::{ + sync::{ + watch, + mpsc, + oneshot, + broadcast, + }, + task::JoinHandle, +}; +use crate::service::{ + ExitStatus, +}; +use std::{error, fmt}; +use std::sync::Weak; +use std::any::Any; +use std::collections::{BTreeMap}; + + +id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages"); +id_type!(BroadcastID; "Each broadcast message has a unique ID."); + +mod supervisor; pub use supervisor::*; +mod resreq; pub use resreq::*; +mod obj; pub use obj::*; + +mod events; pub use events::*; diff --git a/src/state/service/obj.rs b/src/state/service/obj.rs new file mode 100644 index 0000000..1ffd29b --- /dev/null +++ b/src/state/service/obj.rs @@ -0,0 +1,114 @@ +//! broadcast object definitions +use super::*; + + +/// Object sent through the broadcast channel. +/// +/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable. +#[derive(Clone)] +#[repr(transparent)] +pub struct ServiceEventObject(pub(super) Arc); +shim_debug!(ServiceEventObject); + +/// A weak reference to a `ServiceEventObject`. +#[derive(Clone)] +#[repr(transparent)] +pub struct ServiceEventObjectRef(pub(super) Weak); +shim_debug!(ServiceEventObjectRef); + +impl ServiceEventObjectRef +{ + /// Try to upgrade to a concrete reference, and then clone the inner object. + pub fn try_clone(&self) -> Option + { + match self.0.upgrade() + { + Some(arc) => Some(ServiceEventObject(arc).clone()), + None => None + } + } + /// Try to upgrade to a concrete reference. + pub fn upgrade(self) -> Result + { + match self.0.upgrade() + { + Some(arc) => Ok(ServiceEventObject(arc)), + None => Err(self), + } + } + + /// Check if the object has not been destroyed yet. + pub fn is_alive(&self) -> bool + { + self.0.strong_count() > 0 + } +} + + +impl ServiceEventObject +{ + pub fn clone_inner(&self) -> Self + { + Self(Arc::from(self.0.clone_dyn_any_sync())) + } + /// Get a weak reference counted handle to the object, without cloning the object itself. + pub fn clone_weak(&self) -> ServiceEventObjectRef + { + ServiceEventObjectRef(Arc::downgrade(&self.0)) + } + + /// Try to downcast the inner object to a concrete type and then clone it. + /// + /// This will fail if: + /// * The downcasted type is invalid + #[inline] pub fn downcast_clone(&self) -> Option + { + self.downcast_ref::().map(|x| *x.clone_dyn_any().downcast().unwrap()) + } + /// Try to consume this instance into downcast. + /// + /// This will fail if: + /// * The downcasted type is invalid + /// * There are other references to this object (created through `clone_ref()`.). + pub fn try_into_downcast(self) -> Result + { + match Arc::downcast(self.0) + { + Ok(v) => match Arc::try_unwrap(v) { + Ok(v) => Ok(v), + Err(s) => Err(Self(s)), + }, + Err(e) => Err(Self(e)), + } + } + + /// Check if there are any other references to this object + #[inline] pub fn is_unique(&self) -> bool + { + Arc::strong_count(&self.0) == 1 + } + + /// Try to downcast the object into a concrete type + #[inline] pub fn is(&self) -> bool + { + self.0.as_ref().is::() + } + /// Try to downcast the object into a concrete type + #[inline] pub fn downcast_ref(&self) -> Option<&T> + { + self.0.as_ref().downcast_ref::() + } +} + +#[derive(Debug)] +/// Error returned from trying to extract an object from a `ServiceEvent` which has none. +pub struct NoObjectError; + +impl error::Error for NoObjectError{} +impl fmt::Display for NoObjectError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "there was no object broadcasted along with this ") + } +} diff --git a/src/state/service/resreq.rs b/src/state/service/resreq.rs new file mode 100644 index 0000000..3ba069c --- /dev/null +++ b/src/state/service/resreq.rs @@ -0,0 +1,65 @@ +//! Responses and requests for the state service(s). +//! +//! These are sent to `Supervisor` which then dispatches them accordingly. +use super::*; + + +/// The kind of request to send to the the service +#[derive(Debug)] +#[non_exhaustive] +pub enum ServiceRequestKind +{ + /// A no-op request. + None, + + /// Test request. + #[cfg(debug_assertions)] EchoRequest(String), +} + +/// The kind of response to expect from a service query, if any. +#[derive(Debug)] +#[non_exhaustive] +pub enum ServiceResponseKind +{ + /// Test response. + #[cfg(debug_assertions)] EchoResponse(String), + + /// Empty response + None, +} + +/// A response from a service to a specific query. +/// +/// It is sent theough the `output` onehot channel in the `ServiceCommand` struct. +#[derive(Debug)] +pub struct ServiceResponse(ServiceRequestKind); + +impl ServiceResponse +{ + /// An empty (default) response + #[inline] pub const fn none() -> Self + { + Self(ServiceRequestKind::None) + } +} + +/// A formed service request. +#[derive(Debug)] +pub struct ServiceRequest +{ + kind: ServiceRequestKind, + output: oneshot::Sender, // If there is no response, this sender will just be dropped and the future impl can return `None` instead of `Some(response)`. +} + +impl ServiceRequest +{ + /// Create a new request + pub(in super) fn new(kind: ServiceRequestKind) -> (Self, oneshot::Receiver) + { + let (tx, rx) = oneshot::channel(); + (Self { + kind, + output: tx + }, rx) + } +} diff --git a/src/state/service/supervisor/dispatch.rs b/src/state/service/supervisor/dispatch.rs new file mode 100644 index 0000000..32f4f96 --- /dev/null +++ b/src/state/service/supervisor/dispatch.rs @@ -0,0 +1,92 @@ +//! Dispatching to state service task(s) through a supervisor +use super::*; +use tokio::time; +use std::{fmt, error}; +use futures::prelude::*; + +impl Supervisor +{ + /// Dispatch a request to the supervisor to be passed through to a subtask. + /// + /// # Returns + /// Returns a `Future` that can be awaited on to produce the value sent back by the task (if there is one). + /// + /// # Errors + /// * The first failure will be caused if sending to the supervisor fails. + /// * The 2nd failure will be caused if either the supervisor, or its delegated task panics before being able to respond, or if the task simply does not respond. + pub async fn dispatch_req(&mut self, kind: ServiceRequestKind) -> Result> + 'static, SupervisorDispatchError> + { + let (req, rx) = ServiceRequest::new(kind); + self.pipe.send(req).await.map_err(|_| SupervisorDispatchError::Send)?; + Ok(rx.map_err(|_| SupervisorDispatchError::Recv)) + } + + /// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it. + /// + /// # Returns + /// Returns the value sent back by the task, if there is one + pub async fn dispatch_and_wait(&mut self, kind: ServiceRequestKind) -> Result + { + Ok(self.dispatch_req(kind) + .await? + .await?) + } + + /// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it. + /// If the timeout expires before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`. + /// + /// # Returns + /// Returns the value sent back by the task, if there is one + pub async fn dispatch_and_wait_timeout(&mut self, kind: ServiceRequestKind, timeout: time::Duration) -> Result + { + let resp_wait = self.dispatch_req(kind) + .await?; + tokio::select! { + val = resp_wait => { + return Ok(val?); + } + _ = time::delay_for(timeout) => { + return Err(SupervisorDispatchError::Timeout("receiving response", Some(timeout))) + } + } + } + /// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it. + /// If the future `until` completes before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`. + /// + /// # Returns + /// Returns the value sent back by the task, if there is one + pub async fn dispatch_and_wait_until(&mut self, kind: ServiceRequestKind, until: impl Future) -> Result + { + let resp_wait = self.dispatch_req(kind) + .await?; + tokio::select! { + val = resp_wait => { + return Ok(val?); + } + _ = until => { + return Err(SupervisorDispatchError::Timeout("receiving response", None)) + } + } + } +} + +/// Error when dispatching a request to the supervisor +#[derive(Debug)] +pub enum SupervisorDispatchError +{ + Send, Recv, Timeout(&'static str, Option), +} + +impl error::Error for SupervisorDispatchError{} +impl fmt::Display for SupervisorDispatchError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Send => write!(f, "dispatching the request failed"), + Self::Recv => write!(f, "receiving the response failed"), + Self::Timeout(msg, Some(duration)) => write!(f, "timeout on {} was reached ({:?})", msg, duration), + Self::Timeout(msg, _) => write!(f, "timeout on {} was reached", msg), + } + } +} diff --git a/src/state/service/supervisor/mod.rs b/src/state/service/supervisor/mod.rs new file mode 100644 index 0000000..f569b9f --- /dev/null +++ b/src/state/service/supervisor/mod.rs @@ -0,0 +1,146 @@ +//! Handles spawning and restarting service task(s) +use super::*; +use tokio::sync::RwLock; +use std::mem::MaybeUninit; +use std::ops; +use futures::future::Future; + +const SUPERVISOR_BACKLOG: usize = 32; + +mod dispatch; pub use dispatch::*; + +/// Signal the shutdown method to the supervisor. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)] +pub enum SupervisorControl +{ + /// Normal working + Initialise, + /// Signal the subtask(s) to shut down, then wait for them and exit. + Signal, + /// Drop all handles and pipes to subtask(s) then immediately exit. + Drop, + /// Restart any and all subtask(s) + Restart, + + /// Set the max task limit. Default is 0. + TaskLimit(usize), +} + +impl Default for SupervisorControl +{ + #[inline] + fn default() -> Self + { + Self::Initialise + } +} + +/// Supervisor responsible for spawning the state handler service. +#[derive(Debug)] +pub(super) struct Supervisor +{ + /// Handle for the supervisor task itself + handle: JoinHandle, + + /// Watch sender for signalling shutdowns the supervisor task itself + shutdown: watch::Sender, + + /// The pipe to send requests to the supervisor's subtasks + pipe: mpsc::Sender, + + /// The initial receiver created from `broadcast_root`. + broadcast_receiver: broadcast::Receiver, + + /// Data shared between the supervisor's task and its controller instance here. + shared: Arc, +} + +/// Object shared btweeen the Supervisor control instance and its supervisor task. +#[derive(Debug)] +struct SupervisorShared +{ + //// scratch that, idk what this was supposed to be for. I'm sure i'll remember if it's important. + //sub: RwLock>, + + broadcast_root: broadcast::Sender, + state: state::State, +} + +/// The state held by the running superviser service +#[derive(Debug)] +struct SupervisorTaskState +{ + shutdown: watch::Receiver, + recv: mpsc::Receiver, + shared: Arc, +} + +impl Supervisor +{ + /// Attempt to send a control signal to the supervisor itself + pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError> + { + self.shutdown.broadcast(sig)?; + Ok(()) + } + + /// Drop all communications with background worker and wait for it to complete + pub async fn join_now(self) -> eyre::Result<()> + { + let handle = { + let Self { handle, ..} = self; // drop everything else + handle + }; + handle.await?; + + Ok(()) + } + + /// Check if the background worker has not been dropped. + /// + /// If this returns false it usually indicates a fatal error. + pub fn is_alive(&self) -> bool + { + Arc::strong_count(&self.shared) > 1 + } + + /// Create a new supervisor for this state. + pub fn new(state: state::State) -> Self + { + let shutdown = watch::channel(Default::default()); + let pipe = mpsc::channel(SUPERVISOR_BACKLOG); + let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG); + + let shared = Arc::new(SupervisorShared{ + broadcast_root, + state, + }); + + let (shutdown_0, shutdown_1) = shutdown; + let (pipe_0, pipe_1) = pipe; + + Self { + shutdown: shutdown_0, + pipe: pipe_0, + broadcast_receiver, + shared: Arc::clone(&shared), + + handle: tokio::spawn(async move { + let shared = shared; + ExitStatus::from(service_fn(SupervisorTaskState { + shared, + recv: pipe_1, + shutdown: shutdown_1, + }).await) + }), + } + } +} + +async fn service_fn(mut state: SupervisorTaskState) -> eyre::Result<()> +{ + while let Some(req) = state.recv.recv().await { + + } + Ok(()) +}