Compare commits

..

7 Commits

11
Cargo.lock generated

@ -1368,6 +1368,15 @@ dependencies = [
"serde",
]
[[package]]
name = "smolset"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5a73542b3021a40b49f29e6ce18b65ff38dbafbf7df8be0fa9f9305305f62ee"
dependencies = [
"smallvec",
]
[[package]]
name = "socket2"
version = "0.3.19"
@ -1779,6 +1788,7 @@ dependencies = [
name = "yuurei"
version = "0.1.0"
dependencies = [
"cfg-if 1.0.0",
"color-eyre",
"cryptohelpers",
"difference",
@ -1793,6 +1803,7 @@ dependencies = [
"serde",
"sha2",
"smallvec",
"smolset",
"tokio",
"uuid",
"warp",

@ -11,6 +11,7 @@ default = ["nightly"]
nightly = ["smallvec/const_generics"]
[dependencies]
cfg-if = "1.0.0"
color-eyre = {version = "0.5.10", default-features=false}
cryptohelpers = {version = "1.7.1", features=["full"]}
difference = "2.0.0"
@ -25,6 +26,7 @@ pretty_env_logger = "0.4.0"
serde = {version = "1.0.118", features=["derive"]}
sha2 = "0.9.2"
smallvec = {version = "1.6.0", features= ["union", "serde", "write"]}
smolset = "1.1"
tokio = {version = "0.2", features=["full"] }
uuid = {version = "0.8.1", features=["v4","serde"]}
warp = "0.2.5"

@ -540,6 +540,7 @@ pub trait AnyCloneable: mopa::Any
{
fn clone_dyn(&self) -> Box<dyn AnyCloneable + Send + Sync + 'static>;
fn clone_dyn_any(&self) -> Box<dyn Any + Send + 'static>;
fn clone_dyn_any_sync(&self) -> Box<dyn Any + Send + Sync + 'static>;
}
mopafy!(AnyCloneable);
@ -551,6 +552,9 @@ impl<T: ?Sized + Clone + Any + Send + Sync + 'static> AnyCloneable for T
#[inline] fn clone_dyn_any(&self) -> Box<dyn Any + Send + 'static> {
Box::new(self.clone())
}
fn clone_dyn_any_sync(&self) -> Box<dyn Any + Send + Sync + 'static> {
Box::new(self.clone())
}
}
/// A dynamically clonable heap allocated polymorphic `Any` object.
@ -578,3 +582,112 @@ impl Clone for DynCloneable
}
mod maybe_iter
{
use std::iter::{once, Once, Chain};
#[derive(Debug, Clone)]
enum Inner<I, T>
{
Some(I),
One(std::iter::Once<T>),
None,
}
/// An iterator that may yield 0, 1, or more values.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct MaybeIter<I, T>(Inner<I, T>);
impl<I, T> MaybeIter<I, T>
where I: Iterator<Item=T>
{
/// Create a single element iterator
pub fn one<U: Into<T>>(from: U) -> Self
{
Self(Inner::One(once(from.into())))
}
/// Create a new instance from an iterator
pub fn many<U: IntoIterator<IntoIter = I, Item=T>>(from: U) -> Self
{
Self(Inner::Some(from.into_iter()))
}
/// Create a new instance that yields 0 items
#[inline(always)] pub fn none() -> Self
{
Self(Inner::None)
}
/// Create a new instance from an iterator.
///
/// # Not using `FromIterator`.
/// 0, 1, many.. will be respected, with no unneeded heap allocations. This is not currently possible with the `FromIterator` trait.
pub fn from_iter<I2: IntoIterator<Item = T>>(from: I2) -> MaybeIter<Chain<Chain<Once<T>, Once<T>>, I2::IntoIter>, T>
{
let mut iter = from.into_iter();
MaybeIter(match (iter.next(), iter.next()) {
(Some(first), None) => Inner::One(once(first)),
(Some(first), Some(second)) => {
Inner::Some(once(first)
.chain(once(second))
.chain(iter))
},
_ => Inner::None,
})
}
}
impl<I, T, U> From<Option<U>> for MaybeIter<I, T>
where I: Iterator<Item=T>,
U: IntoIterator<IntoIter=I, Item=T>
{
fn from(from: Option<U>) -> Self
{
match from {
Some(many) => Self::many(many),
_ => Self::none()
}
}
}
impl<I, T> Iterator for MaybeIter<I, T>
where I: Iterator<Item=T>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
match &mut self.0 {
Inner::Some(x) => x.next(),
Inner::One(x) => x.next(),
Inner::None => None,
}
}
#[inline] fn size_hint(&self) -> (usize, Option<usize>) {
match &self.0
{
Inner::One(c) => c.size_hint(),
Inner::Some(x) => x.size_hint(),
Inner::None => (0, Some(0))
}
}
}
impl<T, I: Iterator<Item=T>> std::iter::DoubleEndedIterator for MaybeIter<I, T>
where I: std::iter::DoubleEndedIterator
{
fn next_back(&mut self) -> Option<Self::Item> {
match &mut self.0 {
Inner::Some(x) => x.next_back(),
Inner::One(x) => x.next_back(),
Inner::None => None,
}
}
}
impl<T, I: Iterator<Item=T>> std::iter::FusedIterator for MaybeIter<I, T>
where I: std::iter::FusedIterator{}
impl<T, I: Iterator<Item=T>> ExactSizeIterator for MaybeIter<I, T>
where I: ExactSizeIterator{}
}
pub use maybe_iter::MaybeIter;

@ -8,8 +8,11 @@
#[macro_use] extern crate lazy_static;
#[macro_use] extern crate log;
#[macro_use] extern crate mopa;
#[macro_use] extern crate cfg_if;
#[allow(unused_imports)]
use std::convert::{TryFrom, TryInto};
#[allow(unused_imports)]
use color_eyre::{
eyre::{
WrapErr as _,

@ -90,6 +90,19 @@ pub enum ExitStatus<T=()>
Error(eyre::Report),
}
impl<T,E: Into<eyre::Report>> From<Result<T, E>> for ExitStatus<T>
{
fn from(from: Result<T, E>) -> Self
{
match from
{
Ok(v) => Self::Graceful(v),
Err(e) => Self::Error(e.into())
}
}
}
#[derive(Debug)]
/// The error `ExitStatus::Abnormal` converts to.
pub struct AbnormalExitError;
@ -102,14 +115,14 @@ impl fmt::Display for AbnormalExitError
write!(f, "service terminated in an abnormal way")
}
}
/*
impl<U, T: error::Error + Send+Sync+'static> From<T> for ExitStatus<U>
{
fn from(from: T) -> Self
{
Self::Error(from.into())
}
}
}*/
impl<T: Default> Default for ExitStatus<T>
{
#[inline]

@ -1,303 +0,0 @@
//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::BTreeMap;
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorShutdownStatus
{
/// Signal the subtask(s) to shut down, then wait for them and exit.
Signal,
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Keep working
Normal,
/// Restart any and all subtask(s)
Restart,
}
impl Default for SupervisorShutdownStatus
{
#[inline]
fn default() -> Self
{
SupervisorShutdownStatus::Normal
}
}
/// The kind of command to send to the the service
#[derive(Debug)]
pub enum ServiceCommandKind
{
}
#[derive(Debug)]
pub enum ServiceResponseKind
{
}
#[derive(Debug)]
pub struct ServiceResponse(Option<ServiceCommandKind>);
#[derive(Debug)]
pub struct ServiceCommand
{
kind: ServiceCommandKind,
output: oneshot::Sender<ServiceResponse>,
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
handle: JoinHandle<ExitStatus>,
shutdown: watch::Sender<SupervisorShutdownStatus>,
pipe: mpsc::Sender<ServiceCommand>,
sub: BTreeMap<ServiceEventKind, ServiceEvent>
}
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, but can also be atomically refcounted if that is more desireable.
///
/// # Cloning
/// The implementation of `Clone` for this instance clones the inner object, not the refcount. Use `clone_ref()` to *just* clone the refcount.
/// To downcast and clone the inner object without an extra `Arc` allocation, use `downcast_clone()` or `downcast().map(Clone::clone)`.
pub struct ServiceEventObject(Arc<dyn AnyCloneable + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
pub struct ServiceEventObjectRef(Weak<dyn AnyCloneable + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
/// Wrapper used when broadcasting to prevent useless inner object clones.
#[derive(Debug)]
struct EventObjectRefCloneWrap(ServiceEventObject);
impl Clone for EventObjectRefCloneWrap
{
fn clone(&self) -> Self {
Self(self.0.clone_ref())
}
}
impl Clone for ServiceEventObject
{
fn clone(&self) -> Self {
Self(Arc::from(self.0.as_ref().clone_dyn()))
}
}
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
}
impl ServiceEventObject
{
/// Get an owned reference counted handle to the object, without cloning the object itself.
pub fn clone_ref(&self) -> Self
{
Self(Arc::clone(&self.0))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: AnyCloneable + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => v,
Err(e) => Self(e),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.is()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.downcast_ref()
}
}
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
kind: ServiceEventKind,
directed: Option<MaybeVec<ServiceSubID>>,
obj: Option<EventObjectRefCloneWrap>,
}
impl ServiceEvent
{
/// The kind of this event.
pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes == whom
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> &[&ServiceSubID]
{
match self.directed.as_ref()
{
Some(yes) => &yes[..],
None => &[],
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref().map(|x| x.0)
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut().map(|x| x.0)
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj.0),
None => Err(NoObjectError),
}
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}

@ -0,0 +1,141 @@
//! Controls the broadcasting of events sent from the state service task
use super::*;
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// `()`.
Ping,
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
cfg_if!{
if #[cfg(debug_assertions)] {
/// Type used for directed array.
/// Currently testing `smolset` over eagerly allocating.
pub(super) type SESet<T> = smolset::SmolSet<[T; 1]>;
} else {
pub(super) type SESet<T> = std::collections::HashSet<T>;
}
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
bc_id: BroadcastID,
kind: ServiceEventKind,
directed: Option<SESet<ServiceSubID>>,
obj: Option<ServiceEventObject>,
}
impl ServiceEvent
{
/// Create a new event to be broadcast
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
where T: Any + Send + Sync + 'static
{
Self {
bc_id: BroadcastID::id_new(),
kind,
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
None
} else {
Some(n)
}),
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
}
}
#[inline] pub fn id(&self) -> &BroadcastID
{
&self.bc_id
}
/// The kind of this event.
#[inline] pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes.contains(whom)
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
{
match self.directed.as_ref()
{
Some(yes) => MaybeIter::many(yes.iter()),
None => MaybeIter::none(),
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref()
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut()
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj),
None => Err(NoObjectError),
}
}
}

@ -0,0 +1,28 @@
//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::{BTreeMap};
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
mod supervisor; pub use supervisor::*;
mod resreq; pub use resreq::*;
mod obj; pub use obj::*;
mod events; pub use events::*;

@ -0,0 +1,114 @@
//! broadcast object definitions
use super::*;
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObject(pub(super) Arc<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObjectRef(pub(super) Weak<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
/// Check if the object has not been destroyed yet.
pub fn is_alive(&self) -> bool
{
self.0.strong_count() > 0
}
}
impl ServiceEventObject
{
pub fn clone_inner(&self) -> Self
{
Self(Arc::from(self.0.clone_dyn_any_sync()))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => match Arc::try_unwrap(v) {
Ok(v) => Ok(v),
Err(s) => Err(Self(s)),
},
Err(e) => Err(Self(e)),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
{
self.0.as_ref().is::<T>()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.as_ref().downcast_ref::<T>()
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}

@ -0,0 +1,65 @@
//! Responses and requests for the state service(s).
//!
//! These are sent to `Supervisor` which then dispatches them accordingly.
use super::*;
/// The kind of request to send to the the service
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceRequestKind
{
/// A no-op request.
None,
/// Test request.
#[cfg(debug_assertions)] EchoRequest(String),
}
/// The kind of response to expect from a service query, if any.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceResponseKind
{
/// Test response.
#[cfg(debug_assertions)] EchoResponse(String),
/// Empty response
None,
}
/// A response from a service to a specific query.
///
/// It is sent theough the `output` onehot channel in the `ServiceCommand` struct.
#[derive(Debug)]
pub struct ServiceResponse(ServiceRequestKind);
impl ServiceResponse
{
/// An empty (default) response
#[inline] pub const fn none() -> Self
{
Self(ServiceRequestKind::None)
}
}
/// A formed service request.
#[derive(Debug)]
pub struct ServiceRequest
{
kind: ServiceRequestKind,
output: oneshot::Sender<ServiceResponse>, // If there is no response, this sender will just be dropped and the future impl can return `None` instead of `Some(response)`.
}
impl ServiceRequest
{
/// Create a new request
pub(in super) fn new(kind: ServiceRequestKind) -> (Self, oneshot::Receiver<ServiceResponse>)
{
let (tx, rx) = oneshot::channel();
(Self {
kind,
output: tx
}, rx)
}
}

@ -0,0 +1,92 @@
//! Dispatching to state service task(s) through a supervisor
use super::*;
use tokio::time;
use std::{fmt, error};
use futures::prelude::*;
impl Supervisor
{
/// Dispatch a request to the supervisor to be passed through to a subtask.
///
/// # Returns
/// Returns a `Future` that can be awaited on to produce the value sent back by the task (if there is one).
///
/// # Errors
/// * The first failure will be caused if sending to the supervisor fails.
/// * The 2nd failure will be caused if either the supervisor, or its delegated task panics before being able to respond, or if the task simply does not respond.
pub async fn dispatch_req(&mut self, kind: ServiceRequestKind) -> Result<impl Future<Output=Result<ServiceResponse, SupervisorDispatchError>> + 'static, SupervisorDispatchError>
{
let (req, rx) = ServiceRequest::new(kind);
self.pipe.send(req).await.map_err(|_| SupervisorDispatchError::Send)?;
Ok(rx.map_err(|_| SupervisorDispatchError::Recv))
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait(&mut self, kind: ServiceRequestKind) -> Result<ServiceResponse, SupervisorDispatchError>
{
Ok(self.dispatch_req(kind)
.await?
.await?)
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the timeout expires before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_timeout(&mut self, kind: ServiceRequestKind, timeout: time::Duration) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = time::delay_for(timeout) => {
return Err(SupervisorDispatchError::Timeout("receiving response", Some(timeout)))
}
}
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the future `until` completes before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_until(&mut self, kind: ServiceRequestKind, until: impl Future) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = until => {
return Err(SupervisorDispatchError::Timeout("receiving response", None))
}
}
}
}
/// Error when dispatching a request to the supervisor
#[derive(Debug)]
pub enum SupervisorDispatchError
{
Send, Recv, Timeout(&'static str, Option<tokio::time::Duration>),
}
impl error::Error for SupervisorDispatchError{}
impl fmt::Display for SupervisorDispatchError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Send => write!(f, "dispatching the request failed"),
Self::Recv => write!(f, "receiving the response failed"),
Self::Timeout(msg, Some(duration)) => write!(f, "timeout on {} was reached ({:?})", msg, duration),
Self::Timeout(msg, _) => write!(f, "timeout on {} was reached", msg),
}
}
}

@ -0,0 +1,309 @@
//! Handles spawning and restarting service task(s)
use super::*;
use tokio::sync::RwLock;
use std::mem::MaybeUninit;
use std::ops;
use futures::prelude::*;
use tokio::time;
use std::{fmt, error};
const SUPERVISOR_BACKLOG: usize = 32;
mod dispatch; pub use dispatch::*;
TODO: This all needs redoing when i'm actually lucid. This part seems okay but the rest of `service` needs to go and be replaced by something like this
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorControl
{
/// Normal working
Initialise,
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
///
/// # Notes
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
Signal(Option<time::Duration>),
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Restart any and all subtask(s)
Restart,
/// Set the max task limit. Default is 0.
TaskLimit(usize),
}
impl Default for SupervisorControl
{
#[inline]
fn default() -> Self
{
Self::Initialise
}
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
/// Handle for the supervisor task itself
handle: JoinHandle<ExitStatus>,
/// Watch sender for signalling shutdowns the supervisor task itself
shutdown: watch::Sender<SupervisorControl>,
/// The pipe to send requests to the supervisor's subtasks
pipe: mpsc::Sender<ServiceRequest>,
/// The initial receiver created from `broadcast_root`.
broadcast_receiver: broadcast::Receiver<ServiceEvent>,
/// Data shared between the supervisor's task and its controller instance here.
shared: Arc<SupervisorShared>,
}
/// Object shared btweeen the Supervisor control instance and its supervisor task.
#[derive(Debug)]
struct SupervisorShared
{
/// this is for filtering specific messages to specific subscribers
sub: RwLock<BTreeMap<ServiceEventKind, SESet<ServiceSubID>>>,
broadcast_root: broadcast::Sender<ServiceEvent>,
state: state::State,
}
/// A subscriber to supervisor task(s) event pump
#[derive(Debug)]
pub struct Subscriber
{
id: ServiceSubID,
/// For directed messages
spec: mpsc::Receiver<ServiceEvent>,
/// For broadcast messages
broad: broadcast::Receiver<ServiceEvent>,
}
impl Supervisor
{
/// Attempt to send a control signal to the supervisor itself
pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError<SupervisorControl>>
{
self.shutdown.broadcast(sig)?;
Ok(())
}
/// Drop all communications with background worker and wait for it to complete
pub async fn join_now(self) -> eyre::Result<()>
{
let handle = {
let Self { handle, ..} = self; // drop everything else
handle
};
handle.await?;
Ok(())
}
/// Check if the background worker has not been dropped.
///
/// If this returns false it usually indicates a fatal error.
pub fn is_alive(&self) -> bool
{
Arc::strong_count(&self.shared) > 1
}
/// Create a new supervisor for this state.
pub fn new(state: state::State) -> Self
{
let shutdown = watch::channel(Default::default());
let pipe = mpsc::channel(SUPERVISOR_BACKLOG);
let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG);
let shared = Arc::new(SupervisorShared{
broadcast_root,
state,
});
let (shutdown_0, shutdown_1) = shutdown;
let (pipe_0, pipe_1) = pipe;
Self {
shutdown: shutdown_0,
pipe: pipe_0,
broadcast_receiver,
shared: Arc::clone(&shared),
handle: tokio::spawn(async move {
let shared = shared;
ExitStatus::from(service_fn(SupervisorTaskState {
shared,
recv: pipe_1,
shutdown: shutdown_1,
}).await.or_else(|err| err.into_own_result()))
}),
}
}
}
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
/// Detached supervisor server
async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError>
{
impl Default for TerminationKind
{
#[inline]
fn default() -> Self
{
Self::Graceful
}
}
// The command stream to dispatch to the worker tasks
let command_dispatch = async {
while let Some(req) = recv.recv().await {
todo!("Dispatch to child(s)");
}
TerminationKind::Graceful
};
tokio::pin!(command_dispatch);
// The signal stream to be handled here
let signal_stream = async {
while let Some(value) = shutdown.recv().await
{
use SupervisorControl::*;
match value {
Initialise => info!("Initialised"),
Signal(None) => return TerminationKind::SignalHup,
Signal(Some(to)) => return TerminationKind::SignalTimeout(to),
Drop => return TerminationKind::Immediate,
Restart => todo!("not implemented"),
TaskLimit(_limit) => todo!("not implemented"),
}
}
TerminationKind::Graceful
};
tokio::pin!(signal_stream);
//loop {
tokio::select! {
sd_kind = &mut signal_stream => {
// We received a signal
Err(ServiceTerminationError::Signal(sd_kind))
}
disp_end = &mut command_dispatch => {
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
disp_end.into()
}
}
// }
}
/// The mannor in which the supervisor exited.
#[derive(Debug)]
pub enum TerminationKind
{
/// The child task(s) were signalled to stop and they were waited on.
SignalHup,
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
SignalTimeout(time::Duration),
/// Immediately drop everything and exit
Immediate,
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
Graceful,
}
impl TerminationKind
{
/// Convert `TerminationKind::Graceful` into a non-error
fn strip_grace(self) -> Result<(), Self>
{
match self {
Self::Graceful => Ok(()),
e => Err(e),
}
}
}
impl error::Error for TerminationKind{}
impl fmt::Display for TerminationKind
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SignalHup => write!(f, "children were signalled to shut down and compiled"),
Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to),
Self::Immediate => write!(f, "children were dropped and an immediate exit was made"),
Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"),
}
}
}
/// The error returned on a failed service termination.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceTerminationError
{
/// Was terminated by a signal.
Signal(TerminationKind),
/// Was terminated by a panic.
Panic,
/// There were no more commands being sent through, and the worker gracefully shut down.
Interest,
}
impl From<TerminationKind> for Result<(), ServiceTerminationError>
{
fn from(from: TerminationKind) -> Self
{
ServiceTerminationError::Signal(from).into_own_result()
}
}
impl ServiceTerminationError
{
fn into_own_result(self) -> Result<(), Self>
{
match self {
Self::Signal(term) => term.strip_grace().map_err(Self::Signal),
x => Err(x),
}
}
}
impl error::Error for ServiceTerminationError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)>
{
Some(match &self {
Self::Signal(ts) => ts,
_ => return None,
})
}
}
impl fmt::Display for ServiceTerminationError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Signal(_) => write!(f, "shut down by signal"),
Self::Panic => write!(f, "shut down by panic. this is usually fatal"),
Self::Interest => write!(f, "all communications with this service stopped"),
}
}
}
Loading…
Cancel
Save