state: service_fn start

another-service-failure
Avril 4 years ago
parent 815feaa4a7
commit a17b2539d0
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -90,6 +90,19 @@ pub enum ExitStatus<T=()>
Error(eyre::Report), Error(eyre::Report),
} }
impl<T,E: Into<eyre::Report>> From<Result<T, E>> for ExitStatus<T>
{
fn from(from: Result<T, E>) -> Self
{
match from
{
Ok(v) => Self::Graceful(v),
Err(e) => Self::Error(e.into())
}
}
}
#[derive(Debug)] #[derive(Debug)]
/// The error `ExitStatus::Abnormal` converts to. /// The error `ExitStatus::Abnormal` converts to.
pub struct AbnormalExitError; pub struct AbnormalExitError;
@ -102,14 +115,14 @@ impl fmt::Display for AbnormalExitError
write!(f, "service terminated in an abnormal way") write!(f, "service terminated in an abnormal way")
} }
} }
/*
impl<U, T: error::Error + Send+Sync+'static> From<T> for ExitStatus<U> impl<U, T: error::Error + Send+Sync+'static> From<T> for ExitStatus<U>
{ {
fn from(from: T) -> Self fn from(from: T) -> Self
{ {
Self::Error(from.into()) Self::Error(from.into())
} }
} }*/
impl<T: Default> Default for ExitStatus<T> impl<T: Default> Default for ExitStatus<T>
{ {
#[inline] #[inline]

@ -1,331 +0,0 @@
//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::{BTreeMap};
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorShutdownStatus
{
/// Signal the subtask(s) to shut down, then wait for them and exit.
Signal,
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Keep working
Normal,
/// Restart any and all subtask(s)
Restart,
}
impl Default for SupervisorShutdownStatus
{
#[inline]
fn default() -> Self
{
SupervisorShutdownStatus::Normal
}
}
/// The kind of command to send to the the service
#[derive(Debug)]
pub enum ServiceCommandKind
{
}
#[derive(Debug)]
pub enum ServiceResponseKind
{
}
#[derive(Debug)]
pub struct ServiceResponse(Option<ServiceCommandKind>);
#[derive(Debug)]
pub struct ServiceCommand
{
kind: ServiceCommandKind,
output: oneshot::Sender<ServiceResponse>,
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
handle: JoinHandle<ExitStatus>,
shutdown: watch::Sender<SupervisorShutdownStatus>,
pipe: mpsc::Sender<ServiceCommand>,
sub: BTreeMap<ServiceEventKind, ServiceEvent>
}
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObject(Arc<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObjectRef(Weak<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
/// Check if the object has not been destroyed yet.
pub fn is_alive(&self) -> bool
{
self.0.strong_count() > 0
}
}
impl ServiceEventObject
{
pub fn clone_inner(&self) -> Self
{
Self(Arc::from(self.0.clone_dyn_any_sync()))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => match Arc::try_unwrap(v) {
Ok(v) => Ok(v),
Err(s) => Err(Self(s)),
},
Err(e) => Err(Self(e)),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
{
self.0.as_ref().is::<T>()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.as_ref().downcast_ref::<T>()
}
}
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// `()`.
Ping,
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
cfg_if!{
if #[cfg(debug_assertions)] {
/// Type used for directed array.
/// Currently testing `smolset` over eagerly allocating.
type SESet<T> = smolset::SmolSet<[T; 1]>;
} else {
type SESet<T> = std::collections::HashSet<T>;
}
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
bc_id: BroadcastID,
kind: ServiceEventKind,
directed: Option<SESet<ServiceSubID>>,
obj: Option<ServiceEventObject>,
}
impl ServiceEvent
{
/// Create a new event to be broadcast
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
where T: Any + Send + Sync + 'static
{
Self {
bc_id: BroadcastID::id_new(),
kind,
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
None
} else {
Some(n)
}),
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
}
}
#[inline] pub fn id(&self) -> &BroadcastID
{
&self.bc_id
}
/// The kind of this event.
#[inline] pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes.contains(whom)
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
{
match self.directed.as_ref()
{
Some(yes) => MaybeIter::many(yes.iter()),
None => MaybeIter::none(),
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref()
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut()
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj),
None => Err(NoObjectError),
}
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}

@ -0,0 +1,141 @@
//! Controls the broadcasting of events sent from the state service task
use super::*;
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// `()`.
Ping,
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
cfg_if!{
if #[cfg(debug_assertions)] {
/// Type used for directed array.
/// Currently testing `smolset` over eagerly allocating.
type SESet<T> = smolset::SmolSet<[T; 1]>;
} else {
type SESet<T> = std::collections::HashSet<T>;
}
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
bc_id: BroadcastID,
kind: ServiceEventKind,
directed: Option<SESet<ServiceSubID>>,
obj: Option<ServiceEventObject>,
}
impl ServiceEvent
{
/// Create a new event to be broadcast
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
where T: Any + Send + Sync + 'static
{
Self {
bc_id: BroadcastID::id_new(),
kind,
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
None
} else {
Some(n)
}),
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
}
}
#[inline] pub fn id(&self) -> &BroadcastID
{
&self.bc_id
}
/// The kind of this event.
#[inline] pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes.contains(whom)
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
{
match self.directed.as_ref()
{
Some(yes) => MaybeIter::many(yes.iter()),
None => MaybeIter::none(),
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref()
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut()
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj),
None => Err(NoObjectError),
}
}
}

@ -0,0 +1,28 @@
//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::{BTreeMap};
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
mod supervisor; pub use supervisor::*;
mod resreq; pub use resreq::*;
mod obj; pub use obj::*;
mod events; pub use events::*;

@ -0,0 +1,114 @@
//! broadcast object definitions
use super::*;
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObject(pub(super) Arc<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObjectRef(pub(super) Weak<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
/// Check if the object has not been destroyed yet.
pub fn is_alive(&self) -> bool
{
self.0.strong_count() > 0
}
}
impl ServiceEventObject
{
pub fn clone_inner(&self) -> Self
{
Self(Arc::from(self.0.clone_dyn_any_sync()))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => match Arc::try_unwrap(v) {
Ok(v) => Ok(v),
Err(s) => Err(Self(s)),
},
Err(e) => Err(Self(e)),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
{
self.0.as_ref().is::<T>()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.as_ref().downcast_ref::<T>()
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}

@ -0,0 +1,65 @@
//! Responses and requests for the state service(s).
//!
//! These are sent to `Supervisor` which then dispatches them accordingly.
use super::*;
/// The kind of request to send to the the service
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceRequestKind
{
/// A no-op request.
None,
/// Test request.
#[cfg(debug_assertions)] EchoRequest(String),
}
/// The kind of response to expect from a service query, if any.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceResponseKind
{
/// Test response.
#[cfg(debug_assertions)] EchoResponse(String),
/// Empty response
None,
}
/// A response from a service to a specific query.
///
/// It is sent theough the `output` onehot channel in the `ServiceCommand` struct.
#[derive(Debug)]
pub struct ServiceResponse(ServiceRequestKind);
impl ServiceResponse
{
/// An empty (default) response
#[inline] pub const fn none() -> Self
{
Self(ServiceRequestKind::None)
}
}
/// A formed service request.
#[derive(Debug)]
pub struct ServiceRequest
{
kind: ServiceRequestKind,
output: oneshot::Sender<ServiceResponse>, // If there is no response, this sender will just be dropped and the future impl can return `None` instead of `Some(response)`.
}
impl ServiceRequest
{
/// Create a new request
pub(in super) fn new(kind: ServiceRequestKind) -> (Self, oneshot::Receiver<ServiceResponse>)
{
let (tx, rx) = oneshot::channel();
(Self {
kind,
output: tx
}, rx)
}
}

@ -0,0 +1,92 @@
//! Dispatching to state service task(s) through a supervisor
use super::*;
use tokio::time;
use std::{fmt, error};
use futures::prelude::*;
impl Supervisor
{
/// Dispatch a request to the supervisor to be passed through to a subtask.
///
/// # Returns
/// Returns a `Future` that can be awaited on to produce the value sent back by the task (if there is one).
///
/// # Errors
/// * The first failure will be caused if sending to the supervisor fails.
/// * The 2nd failure will be caused if either the supervisor, or its delegated task panics before being able to respond, or if the task simply does not respond.
pub async fn dispatch_req(&mut self, kind: ServiceRequestKind) -> Result<impl Future<Output=Result<ServiceResponse, SupervisorDispatchError>> + 'static, SupervisorDispatchError>
{
let (req, rx) = ServiceRequest::new(kind);
self.pipe.send(req).await.map_err(|_| SupervisorDispatchError::Send)?;
Ok(rx.map_err(|_| SupervisorDispatchError::Recv))
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait(&mut self, kind: ServiceRequestKind) -> Result<ServiceResponse, SupervisorDispatchError>
{
Ok(self.dispatch_req(kind)
.await?
.await?)
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the timeout expires before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_timeout(&mut self, kind: ServiceRequestKind, timeout: time::Duration) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = time::delay_for(timeout) => {
return Err(SupervisorDispatchError::Timeout("receiving response", Some(timeout)))
}
}
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the future `until` completes before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_until(&mut self, kind: ServiceRequestKind, until: impl Future) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = until => {
return Err(SupervisorDispatchError::Timeout("receiving response", None))
}
}
}
}
/// Error when dispatching a request to the supervisor
#[derive(Debug)]
pub enum SupervisorDispatchError
{
Send, Recv, Timeout(&'static str, Option<tokio::time::Duration>),
}
impl error::Error for SupervisorDispatchError{}
impl fmt::Display for SupervisorDispatchError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Send => write!(f, "dispatching the request failed"),
Self::Recv => write!(f, "receiving the response failed"),
Self::Timeout(msg, Some(duration)) => write!(f, "timeout on {} was reached ({:?})", msg, duration),
Self::Timeout(msg, _) => write!(f, "timeout on {} was reached", msg),
}
}
}

@ -0,0 +1,146 @@
//! Handles spawning and restarting service task(s)
use super::*;
use tokio::sync::RwLock;
use std::mem::MaybeUninit;
use std::ops;
use futures::future::Future;
const SUPERVISOR_BACKLOG: usize = 32;
mod dispatch; pub use dispatch::*;
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorControl
{
/// Normal working
Initialise,
/// Signal the subtask(s) to shut down, then wait for them and exit.
Signal,
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Restart any and all subtask(s)
Restart,
/// Set the max task limit. Default is 0.
TaskLimit(usize),
}
impl Default for SupervisorControl
{
#[inline]
fn default() -> Self
{
Self::Initialise
}
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
/// Handle for the supervisor task itself
handle: JoinHandle<ExitStatus>,
/// Watch sender for signalling shutdowns the supervisor task itself
shutdown: watch::Sender<SupervisorControl>,
/// The pipe to send requests to the supervisor's subtasks
pipe: mpsc::Sender<ServiceRequest>,
/// The initial receiver created from `broadcast_root`.
broadcast_receiver: broadcast::Receiver<ServiceEvent>,
/// Data shared between the supervisor's task and its controller instance here.
shared: Arc<SupervisorShared>,
}
/// Object shared btweeen the Supervisor control instance and its supervisor task.
#[derive(Debug)]
struct SupervisorShared
{
//// scratch that, idk what this was supposed to be for. I'm sure i'll remember if it's important.
//sub: RwLock<BTreeMap<ServiceEventKind, ServiceEvent>>,
broadcast_root: broadcast::Sender<ServiceEvent>,
state: state::State,
}
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
impl Supervisor
{
/// Attempt to send a control signal to the supervisor itself
pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError<SupervisorControl>>
{
self.shutdown.broadcast(sig)?;
Ok(())
}
/// Drop all communications with background worker and wait for it to complete
pub async fn join_now(self) -> eyre::Result<()>
{
let handle = {
let Self { handle, ..} = self; // drop everything else
handle
};
handle.await?;
Ok(())
}
/// Check if the background worker has not been dropped.
///
/// If this returns false it usually indicates a fatal error.
pub fn is_alive(&self) -> bool
{
Arc::strong_count(&self.shared) > 1
}
/// Create a new supervisor for this state.
pub fn new(state: state::State) -> Self
{
let shutdown = watch::channel(Default::default());
let pipe = mpsc::channel(SUPERVISOR_BACKLOG);
let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG);
let shared = Arc::new(SupervisorShared{
broadcast_root,
state,
});
let (shutdown_0, shutdown_1) = shutdown;
let (pipe_0, pipe_1) = pipe;
Self {
shutdown: shutdown_0,
pipe: pipe_0,
broadcast_receiver,
shared: Arc::clone(&shared),
handle: tokio::spawn(async move {
let shared = shared;
ExitStatus::from(service_fn(SupervisorTaskState {
shared,
recv: pipe_1,
shutdown: shutdown_1,
}).await)
}),
}
}
}
async fn service_fn(mut state: SupervisorTaskState) -> eyre::Result<()>
{
while let Some(req) = state.recv.recv().await {
}
Ok(())
}
Loading…
Cancel
Save