Compare commits
No commits in common. 'master' and 'clone-session-service-objects' have entirely different histories.
master
...
clone-sess
@ -0,0 +1,303 @@
|
|||||||
|
//! Global state service
|
||||||
|
use super::*;
|
||||||
|
use tokio::{
|
||||||
|
sync::{
|
||||||
|
watch,
|
||||||
|
mpsc,
|
||||||
|
oneshot,
|
||||||
|
broadcast,
|
||||||
|
},
|
||||||
|
task::JoinHandle,
|
||||||
|
};
|
||||||
|
use crate::service::{
|
||||||
|
ExitStatus,
|
||||||
|
};
|
||||||
|
use std::{error, fmt};
|
||||||
|
use std::sync::Weak;
|
||||||
|
use std::any::Any;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
|
||||||
|
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
|
||||||
|
|
||||||
|
/// Signal the shutdown method to the supervisor.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
|
||||||
|
pub enum SupervisorShutdownStatus
|
||||||
|
{
|
||||||
|
/// Signal the subtask(s) to shut down, then wait for them and exit.
|
||||||
|
Signal,
|
||||||
|
/// Drop all handles and pipes to subtask(s) then immediately exit.
|
||||||
|
Drop,
|
||||||
|
/// Keep working
|
||||||
|
Normal,
|
||||||
|
/// Restart any and all subtask(s)
|
||||||
|
Restart,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SupervisorShutdownStatus
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn default() -> Self
|
||||||
|
{
|
||||||
|
SupervisorShutdownStatus::Normal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The kind of command to send to the the service
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ServiceCommandKind
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ServiceResponseKind
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ServiceResponse(Option<ServiceCommandKind>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ServiceCommand
|
||||||
|
{
|
||||||
|
kind: ServiceCommandKind,
|
||||||
|
output: oneshot::Sender<ServiceResponse>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Supervisor responsible for spawning the state handler service.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct Supervisor
|
||||||
|
{
|
||||||
|
handle: JoinHandle<ExitStatus>,
|
||||||
|
|
||||||
|
shutdown: watch::Sender<SupervisorShutdownStatus>,
|
||||||
|
|
||||||
|
pipe: mpsc::Sender<ServiceCommand>,
|
||||||
|
sub: BTreeMap<ServiceEventKind, ServiceEvent>
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Object sent through the broadcast channel.
|
||||||
|
///
|
||||||
|
/// These objects can be cloned and downcasted, but can also be atomically refcounted if that is more desireable.
|
||||||
|
///
|
||||||
|
/// # Cloning
|
||||||
|
/// The implementation of `Clone` for this instance clones the inner object, not the refcount. Use `clone_ref()` to *just* clone the refcount.
|
||||||
|
/// To downcast and clone the inner object without an extra `Arc` allocation, use `downcast_clone()` or `downcast().map(Clone::clone)`.
|
||||||
|
pub struct ServiceEventObject(Arc<dyn AnyCloneable + Send + Sync + 'static>);
|
||||||
|
shim_debug!(ServiceEventObject);
|
||||||
|
/// A weak reference to a `ServiceEventObject`.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ServiceEventObjectRef(Weak<dyn AnyCloneable + Send + Sync + 'static>);
|
||||||
|
shim_debug!(ServiceEventObjectRef);
|
||||||
|
|
||||||
|
/// Wrapper used when broadcasting to prevent useless inner object clones.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct EventObjectRefCloneWrap(ServiceEventObject);
|
||||||
|
|
||||||
|
impl Clone for EventObjectRefCloneWrap
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(self.0.clone_ref())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for ServiceEventObject
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(Arc::from(self.0.as_ref().clone_dyn()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceEventObjectRef
|
||||||
|
{
|
||||||
|
/// Try to upgrade to a concrete reference, and then clone the inner object.
|
||||||
|
pub fn try_clone(&self) -> Option<ServiceEventObject>
|
||||||
|
{
|
||||||
|
match self.0.upgrade()
|
||||||
|
{
|
||||||
|
Some(arc) => Some(ServiceEventObject(arc).clone()),
|
||||||
|
None => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Try to upgrade to a concrete reference.
|
||||||
|
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
|
||||||
|
{
|
||||||
|
match self.0.upgrade()
|
||||||
|
{
|
||||||
|
Some(arc) => Ok(ServiceEventObject(arc)),
|
||||||
|
None => Err(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl ServiceEventObject
|
||||||
|
{
|
||||||
|
/// Get an owned reference counted handle to the object, without cloning the object itself.
|
||||||
|
pub fn clone_ref(&self) -> Self
|
||||||
|
{
|
||||||
|
Self(Arc::clone(&self.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a weak reference counted handle to the object, without cloning the object itself.
|
||||||
|
pub fn clone_weak(&self) -> ServiceEventObjectRef
|
||||||
|
{
|
||||||
|
ServiceEventObjectRef(Arc::downgrade(&self.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to downcast the inner object to a concrete type and then clone it.
|
||||||
|
///
|
||||||
|
/// This will fail if:
|
||||||
|
/// * The downcasted type is invalid
|
||||||
|
#[inline] pub fn downcast_clone<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<T>
|
||||||
|
{
|
||||||
|
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
|
||||||
|
}
|
||||||
|
/// Try to consume this instance into downcast.
|
||||||
|
///
|
||||||
|
/// This will fail if:
|
||||||
|
/// * The downcasted type is invalid
|
||||||
|
/// * There are other references to this object (created through `clone_ref()`.).
|
||||||
|
pub fn try_into_downcast<T: AnyCloneable + Send + Sync + 'static>(self) -> Result<T, Self>
|
||||||
|
{
|
||||||
|
match Arc::downcast(self.0)
|
||||||
|
{
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => Self(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if there are any other references to this object
|
||||||
|
#[inline] pub fn is_unique(&self) -> bool
|
||||||
|
{
|
||||||
|
Arc::strong_count(&self.0) == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to downcast the object into a concrete type
|
||||||
|
#[inline] pub fn is<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
|
||||||
|
{
|
||||||
|
self.0.is()
|
||||||
|
}
|
||||||
|
/// Try to downcast the object into a concrete type
|
||||||
|
#[inline] pub fn downcast_ref<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
|
||||||
|
{
|
||||||
|
self.0.downcast_ref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The kind of event outputted from a state service's broadcast stream
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
|
||||||
|
pub enum ServiceEventKind
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Does nothing.
|
||||||
|
///
|
||||||
|
/// # Associated object
|
||||||
|
/// None.
|
||||||
|
KeepAlive,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An event outputted from a state service's broadcast stream
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServiceEvent
|
||||||
|
{
|
||||||
|
kind: ServiceEventKind,
|
||||||
|
directed: Option<MaybeVec<ServiceSubID>>,
|
||||||
|
obj: Option<EventObjectRefCloneWrap>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceEvent
|
||||||
|
{
|
||||||
|
/// The kind of this event.
|
||||||
|
pub fn kind(&self) -> ServiceEventKind
|
||||||
|
{
|
||||||
|
self.kind
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this event is for you
|
||||||
|
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
|
||||||
|
{
|
||||||
|
if let Some(yes) = self.directed.as_ref() {
|
||||||
|
yes == whom
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Check if this event is directed to anyone
|
||||||
|
pub fn is_directed(&self) -> bool
|
||||||
|
{
|
||||||
|
self.directed.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check who this event is directed to.
|
||||||
|
///
|
||||||
|
/// If it is not directed, an empty slice will be returned.
|
||||||
|
pub fn directed_to(&self) -> &[&ServiceSubID]
|
||||||
|
{
|
||||||
|
match self.directed.as_ref()
|
||||||
|
{
|
||||||
|
Some(yes) => &yes[..],
|
||||||
|
None => &[],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the object, if there is one.
|
||||||
|
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
|
||||||
|
{
|
||||||
|
self.obj.as_ref().map(|x| x.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to the object, if there is one.
|
||||||
|
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
|
||||||
|
{
|
||||||
|
self.obj.as_mut().map(|x| x.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to consume into the inner object. If there is no object, return self.
|
||||||
|
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
|
||||||
|
{
|
||||||
|
match self.obj
|
||||||
|
{
|
||||||
|
Some(obj) => Ok(obj),
|
||||||
|
None => Err(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ServiceEvent> for Option<ServiceEventObject>
|
||||||
|
{
|
||||||
|
#[inline] fn from(from: ServiceEvent) -> Self
|
||||||
|
{
|
||||||
|
from.obj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<ServiceEvent> for ServiceEventObject
|
||||||
|
{
|
||||||
|
type Error = NoObjectError;
|
||||||
|
|
||||||
|
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
|
||||||
|
{
|
||||||
|
match from.obj
|
||||||
|
{
|
||||||
|
Some(obj) => Ok(obj.0),
|
||||||
|
None => Err(NoObjectError),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
|
||||||
|
pub struct NoObjectError;
|
||||||
|
|
||||||
|
impl error::Error for NoObjectError{}
|
||||||
|
impl fmt::Display for NoObjectError
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "there was no object broadcasted along with this ")
|
||||||
|
}
|
||||||
|
}
|
@ -1,141 +0,0 @@
|
|||||||
//! Controls the broadcasting of events sent from the state service task
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
/// The kind of event outputted from a state service's broadcast stream
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
|
|
||||||
pub enum ServiceEventKind
|
|
||||||
{
|
|
||||||
/// Does nothing.
|
|
||||||
///
|
|
||||||
/// # Associated object
|
|
||||||
/// `()`.
|
|
||||||
Ping,
|
|
||||||
/// Does nothing.
|
|
||||||
///
|
|
||||||
/// # Associated object
|
|
||||||
/// None.
|
|
||||||
KeepAlive,
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_if!{
|
|
||||||
if #[cfg(debug_assertions)] {
|
|
||||||
/// Type used for directed array.
|
|
||||||
/// Currently testing `smolset` over eagerly allocating.
|
|
||||||
pub(super) type SESet<T> = smolset::SmolSet<[T; 1]>;
|
|
||||||
} else {
|
|
||||||
pub(super) type SESet<T> = std::collections::HashSet<T>;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An event outputted from a state service's broadcast stream
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct ServiceEvent
|
|
||||||
{
|
|
||||||
bc_id: BroadcastID,
|
|
||||||
|
|
||||||
kind: ServiceEventKind,
|
|
||||||
directed: Option<SESet<ServiceSubID>>,
|
|
||||||
obj: Option<ServiceEventObject>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceEvent
|
|
||||||
{
|
|
||||||
/// Create a new event to be broadcast
|
|
||||||
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
|
|
||||||
where T: Any + Send + Sync + 'static
|
|
||||||
{
|
|
||||||
Self {
|
|
||||||
bc_id: BroadcastID::id_new(),
|
|
||||||
kind,
|
|
||||||
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(n)
|
|
||||||
}),
|
|
||||||
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline] pub fn id(&self) -> &BroadcastID
|
|
||||||
{
|
|
||||||
&self.bc_id
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The kind of this event.
|
|
||||||
#[inline] pub fn kind(&self) -> ServiceEventKind
|
|
||||||
{
|
|
||||||
self.kind
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if this event is for you
|
|
||||||
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
|
|
||||||
{
|
|
||||||
if let Some(yes) = self.directed.as_ref() {
|
|
||||||
yes.contains(whom)
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Check if this event is directed to anyone
|
|
||||||
pub fn is_directed(&self) -> bool
|
|
||||||
{
|
|
||||||
self.directed.is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check who this event is directed to.
|
|
||||||
///
|
|
||||||
/// If it is not directed, an empty slice will be returned.
|
|
||||||
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
|
|
||||||
{
|
|
||||||
match self.directed.as_ref()
|
|
||||||
{
|
|
||||||
Some(yes) => MaybeIter::many(yes.iter()),
|
|
||||||
None => MaybeIter::none(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a reference to the object, if there is one.
|
|
||||||
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
|
|
||||||
{
|
|
||||||
self.obj.as_ref()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a mutable reference to the object, if there is one.
|
|
||||||
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
|
|
||||||
{
|
|
||||||
self.obj.as_mut()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to consume into the inner object. If there is no object, return self.
|
|
||||||
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
|
|
||||||
{
|
|
||||||
match self.obj
|
|
||||||
{
|
|
||||||
Some(obj) => Ok(obj),
|
|
||||||
None => Err(self),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl From<ServiceEvent> for Option<ServiceEventObject>
|
|
||||||
{
|
|
||||||
#[inline] fn from(from: ServiceEvent) -> Self
|
|
||||||
{
|
|
||||||
from.obj
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TryFrom<ServiceEvent> for ServiceEventObject
|
|
||||||
{
|
|
||||||
type Error = NoObjectError;
|
|
||||||
|
|
||||||
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
|
|
||||||
{
|
|
||||||
match from.obj
|
|
||||||
{
|
|
||||||
Some(obj) => Ok(obj),
|
|
||||||
None => Err(NoObjectError),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
//! Global state service
|
|
||||||
use super::*;
|
|
||||||
use tokio::{
|
|
||||||
sync::{
|
|
||||||
watch,
|
|
||||||
mpsc,
|
|
||||||
oneshot,
|
|
||||||
broadcast,
|
|
||||||
},
|
|
||||||
task::JoinHandle,
|
|
||||||
};
|
|
||||||
use crate::service::{
|
|
||||||
ExitStatus,
|
|
||||||
};
|
|
||||||
use std::{error, fmt};
|
|
||||||
use std::sync::Weak;
|
|
||||||
use std::any::Any;
|
|
||||||
use std::collections::{BTreeMap};
|
|
||||||
|
|
||||||
|
|
||||||
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
|
|
||||||
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
|
|
||||||
|
|
||||||
mod supervisor; pub use supervisor::*;
|
|
||||||
mod resreq; pub use resreq::*;
|
|
||||||
mod obj; pub use obj::*;
|
|
||||||
|
|
||||||
mod events; pub use events::*;
|
|
@ -1,114 +0,0 @@
|
|||||||
//! broadcast object definitions
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
|
|
||||||
/// Object sent through the broadcast channel.
|
|
||||||
///
|
|
||||||
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
|
|
||||||
#[derive(Clone)]
|
|
||||||
#[repr(transparent)]
|
|
||||||
pub struct ServiceEventObject(pub(super) Arc<dyn Any + Send + Sync + 'static>);
|
|
||||||
shim_debug!(ServiceEventObject);
|
|
||||||
|
|
||||||
/// A weak reference to a `ServiceEventObject`.
|
|
||||||
#[derive(Clone)]
|
|
||||||
#[repr(transparent)]
|
|
||||||
pub struct ServiceEventObjectRef(pub(super) Weak<dyn Any + Send + Sync + 'static>);
|
|
||||||
shim_debug!(ServiceEventObjectRef);
|
|
||||||
|
|
||||||
impl ServiceEventObjectRef
|
|
||||||
{
|
|
||||||
/// Try to upgrade to a concrete reference, and then clone the inner object.
|
|
||||||
pub fn try_clone(&self) -> Option<ServiceEventObject>
|
|
||||||
{
|
|
||||||
match self.0.upgrade()
|
|
||||||
{
|
|
||||||
Some(arc) => Some(ServiceEventObject(arc).clone()),
|
|
||||||
None => None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Try to upgrade to a concrete reference.
|
|
||||||
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
|
|
||||||
{
|
|
||||||
match self.0.upgrade()
|
|
||||||
{
|
|
||||||
Some(arc) => Ok(ServiceEventObject(arc)),
|
|
||||||
None => Err(self),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if the object has not been destroyed yet.
|
|
||||||
pub fn is_alive(&self) -> bool
|
|
||||||
{
|
|
||||||
self.0.strong_count() > 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl ServiceEventObject
|
|
||||||
{
|
|
||||||
pub fn clone_inner(&self) -> Self
|
|
||||||
{
|
|
||||||
Self(Arc::from(self.0.clone_dyn_any_sync()))
|
|
||||||
}
|
|
||||||
/// Get a weak reference counted handle to the object, without cloning the object itself.
|
|
||||||
pub fn clone_weak(&self) -> ServiceEventObjectRef
|
|
||||||
{
|
|
||||||
ServiceEventObjectRef(Arc::downgrade(&self.0))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to downcast the inner object to a concrete type and then clone it.
|
|
||||||
///
|
|
||||||
/// This will fail if:
|
|
||||||
/// * The downcasted type is invalid
|
|
||||||
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
|
|
||||||
{
|
|
||||||
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
|
|
||||||
}
|
|
||||||
/// Try to consume this instance into downcast.
|
|
||||||
///
|
|
||||||
/// This will fail if:
|
|
||||||
/// * The downcasted type is invalid
|
|
||||||
/// * There are other references to this object (created through `clone_ref()`.).
|
|
||||||
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
|
|
||||||
{
|
|
||||||
match Arc::downcast(self.0)
|
|
||||||
{
|
|
||||||
Ok(v) => match Arc::try_unwrap(v) {
|
|
||||||
Ok(v) => Ok(v),
|
|
||||||
Err(s) => Err(Self(s)),
|
|
||||||
},
|
|
||||||
Err(e) => Err(Self(e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if there are any other references to this object
|
|
||||||
#[inline] pub fn is_unique(&self) -> bool
|
|
||||||
{
|
|
||||||
Arc::strong_count(&self.0) == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to downcast the object into a concrete type
|
|
||||||
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
|
|
||||||
{
|
|
||||||
self.0.as_ref().is::<T>()
|
|
||||||
}
|
|
||||||
/// Try to downcast the object into a concrete type
|
|
||||||
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
|
|
||||||
{
|
|
||||||
self.0.as_ref().downcast_ref::<T>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
|
|
||||||
pub struct NoObjectError;
|
|
||||||
|
|
||||||
impl error::Error for NoObjectError{}
|
|
||||||
impl fmt::Display for NoObjectError
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
||||||
{
|
|
||||||
write!(f, "there was no object broadcasted along with this ")
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,65 +0,0 @@
|
|||||||
//! Responses and requests for the state service(s).
|
|
||||||
//!
|
|
||||||
//! These are sent to `Supervisor` which then dispatches them accordingly.
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
|
|
||||||
/// The kind of request to send to the the service
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum ServiceRequestKind
|
|
||||||
{
|
|
||||||
/// A no-op request.
|
|
||||||
None,
|
|
||||||
|
|
||||||
/// Test request.
|
|
||||||
#[cfg(debug_assertions)] EchoRequest(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The kind of response to expect from a service query, if any.
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum ServiceResponseKind
|
|
||||||
{
|
|
||||||
/// Test response.
|
|
||||||
#[cfg(debug_assertions)] EchoResponse(String),
|
|
||||||
|
|
||||||
/// Empty response
|
|
||||||
None,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A response from a service to a specific query.
|
|
||||||
///
|
|
||||||
/// It is sent theough the `output` onehot channel in the `ServiceCommand` struct.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ServiceResponse(ServiceRequestKind);
|
|
||||||
|
|
||||||
impl ServiceResponse
|
|
||||||
{
|
|
||||||
/// An empty (default) response
|
|
||||||
#[inline] pub const fn none() -> Self
|
|
||||||
{
|
|
||||||
Self(ServiceRequestKind::None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A formed service request.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ServiceRequest
|
|
||||||
{
|
|
||||||
kind: ServiceRequestKind,
|
|
||||||
output: oneshot::Sender<ServiceResponse>, // If there is no response, this sender will just be dropped and the future impl can return `None` instead of `Some(response)`.
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceRequest
|
|
||||||
{
|
|
||||||
/// Create a new request
|
|
||||||
pub(in super) fn new(kind: ServiceRequestKind) -> (Self, oneshot::Receiver<ServiceResponse>)
|
|
||||||
{
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
(Self {
|
|
||||||
kind,
|
|
||||||
output: tx
|
|
||||||
}, rx)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,92 +0,0 @@
|
|||||||
//! Dispatching to state service task(s) through a supervisor
|
|
||||||
use super::*;
|
|
||||||
use tokio::time;
|
|
||||||
use std::{fmt, error};
|
|
||||||
use futures::prelude::*;
|
|
||||||
|
|
||||||
impl Supervisor
|
|
||||||
{
|
|
||||||
/// Dispatch a request to the supervisor to be passed through to a subtask.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Returns a `Future` that can be awaited on to produce the value sent back by the task (if there is one).
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// * The first failure will be caused if sending to the supervisor fails.
|
|
||||||
/// * The 2nd failure will be caused if either the supervisor, or its delegated task panics before being able to respond, or if the task simply does not respond.
|
|
||||||
pub async fn dispatch_req(&mut self, kind: ServiceRequestKind) -> Result<impl Future<Output=Result<ServiceResponse, SupervisorDispatchError>> + 'static, SupervisorDispatchError>
|
|
||||||
{
|
|
||||||
let (req, rx) = ServiceRequest::new(kind);
|
|
||||||
self.pipe.send(req).await.map_err(|_| SupervisorDispatchError::Send)?;
|
|
||||||
Ok(rx.map_err(|_| SupervisorDispatchError::Recv))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Returns the value sent back by the task, if there is one
|
|
||||||
pub async fn dispatch_and_wait(&mut self, kind: ServiceRequestKind) -> Result<ServiceResponse, SupervisorDispatchError>
|
|
||||||
{
|
|
||||||
Ok(self.dispatch_req(kind)
|
|
||||||
.await?
|
|
||||||
.await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
|
|
||||||
/// If the timeout expires before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Returns the value sent back by the task, if there is one
|
|
||||||
pub async fn dispatch_and_wait_timeout(&mut self, kind: ServiceRequestKind, timeout: time::Duration) -> Result<ServiceResponse, SupervisorDispatchError>
|
|
||||||
{
|
|
||||||
let resp_wait = self.dispatch_req(kind)
|
|
||||||
.await?;
|
|
||||||
tokio::select! {
|
|
||||||
val = resp_wait => {
|
|
||||||
return Ok(val?);
|
|
||||||
}
|
|
||||||
_ = time::delay_for(timeout) => {
|
|
||||||
return Err(SupervisorDispatchError::Timeout("receiving response", Some(timeout)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
|
|
||||||
/// If the future `until` completes before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Returns the value sent back by the task, if there is one
|
|
||||||
pub async fn dispatch_and_wait_until(&mut self, kind: ServiceRequestKind, until: impl Future) -> Result<ServiceResponse, SupervisorDispatchError>
|
|
||||||
{
|
|
||||||
let resp_wait = self.dispatch_req(kind)
|
|
||||||
.await?;
|
|
||||||
tokio::select! {
|
|
||||||
val = resp_wait => {
|
|
||||||
return Ok(val?);
|
|
||||||
}
|
|
||||||
_ = until => {
|
|
||||||
return Err(SupervisorDispatchError::Timeout("receiving response", None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Error when dispatching a request to the supervisor
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum SupervisorDispatchError
|
|
||||||
{
|
|
||||||
Send, Recv, Timeout(&'static str, Option<tokio::time::Duration>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl error::Error for SupervisorDispatchError{}
|
|
||||||
impl fmt::Display for SupervisorDispatchError
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Self::Send => write!(f, "dispatching the request failed"),
|
|
||||||
Self::Recv => write!(f, "receiving the response failed"),
|
|
||||||
Self::Timeout(msg, Some(duration)) => write!(f, "timeout on {} was reached ({:?})", msg, duration),
|
|
||||||
Self::Timeout(msg, _) => write!(f, "timeout on {} was reached", msg),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,309 +0,0 @@
|
|||||||
//! Handles spawning and restarting service task(s)
|
|
||||||
use super::*;
|
|
||||||
use tokio::sync::RwLock;
|
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
use std::ops;
|
|
||||||
use futures::prelude::*;
|
|
||||||
use tokio::time;
|
|
||||||
use std::{fmt, error};
|
|
||||||
|
|
||||||
const SUPERVISOR_BACKLOG: usize = 32;
|
|
||||||
|
|
||||||
mod dispatch; pub use dispatch::*;
|
|
||||||
|
|
||||||
TODO: This all needs redoing when i'm actually lucid. This part seems okay but the rest of `service` needs to go and be replaced by something like this
|
|
||||||
|
|
||||||
/// Signal the shutdown method to the supervisor.
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
|
|
||||||
pub enum SupervisorControl
|
|
||||||
{
|
|
||||||
/// Normal working
|
|
||||||
Initialise,
|
|
||||||
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
|
|
||||||
///
|
|
||||||
/// # Notes
|
|
||||||
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
|
|
||||||
Signal(Option<time::Duration>),
|
|
||||||
/// Drop all handles and pipes to subtask(s) then immediately exit.
|
|
||||||
Drop,
|
|
||||||
/// Restart any and all subtask(s)
|
|
||||||
Restart,
|
|
||||||
|
|
||||||
/// Set the max task limit. Default is 0.
|
|
||||||
TaskLimit(usize),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for SupervisorControl
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn default() -> Self
|
|
||||||
{
|
|
||||||
Self::Initialise
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Supervisor responsible for spawning the state handler service.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(super) struct Supervisor
|
|
||||||
{
|
|
||||||
/// Handle for the supervisor task itself
|
|
||||||
handle: JoinHandle<ExitStatus>,
|
|
||||||
|
|
||||||
/// Watch sender for signalling shutdowns the supervisor task itself
|
|
||||||
shutdown: watch::Sender<SupervisorControl>,
|
|
||||||
|
|
||||||
/// The pipe to send requests to the supervisor's subtasks
|
|
||||||
pipe: mpsc::Sender<ServiceRequest>,
|
|
||||||
|
|
||||||
/// The initial receiver created from `broadcast_root`.
|
|
||||||
broadcast_receiver: broadcast::Receiver<ServiceEvent>,
|
|
||||||
|
|
||||||
/// Data shared between the supervisor's task and its controller instance here.
|
|
||||||
shared: Arc<SupervisorShared>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Object shared btweeen the Supervisor control instance and its supervisor task.
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct SupervisorShared
|
|
||||||
{
|
|
||||||
/// this is for filtering specific messages to specific subscribers
|
|
||||||
sub: RwLock<BTreeMap<ServiceEventKind, SESet<ServiceSubID>>>,
|
|
||||||
|
|
||||||
broadcast_root: broadcast::Sender<ServiceEvent>,
|
|
||||||
state: state::State,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A subscriber to supervisor task(s) event pump
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Subscriber
|
|
||||||
{
|
|
||||||
id: ServiceSubID,
|
|
||||||
|
|
||||||
/// For directed messages
|
|
||||||
spec: mpsc::Receiver<ServiceEvent>,
|
|
||||||
/// For broadcast messages
|
|
||||||
broad: broadcast::Receiver<ServiceEvent>,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl Supervisor
|
|
||||||
{
|
|
||||||
/// Attempt to send a control signal to the supervisor itself
|
|
||||||
pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError<SupervisorControl>>
|
|
||||||
{
|
|
||||||
self.shutdown.broadcast(sig)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Drop all communications with background worker and wait for it to complete
|
|
||||||
pub async fn join_now(self) -> eyre::Result<()>
|
|
||||||
{
|
|
||||||
let handle = {
|
|
||||||
let Self { handle, ..} = self; // drop everything else
|
|
||||||
handle
|
|
||||||
};
|
|
||||||
handle.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if the background worker has not been dropped.
|
|
||||||
///
|
|
||||||
/// If this returns false it usually indicates a fatal error.
|
|
||||||
pub fn is_alive(&self) -> bool
|
|
||||||
{
|
|
||||||
Arc::strong_count(&self.shared) > 1
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a new supervisor for this state.
|
|
||||||
pub fn new(state: state::State) -> Self
|
|
||||||
{
|
|
||||||
let shutdown = watch::channel(Default::default());
|
|
||||||
let pipe = mpsc::channel(SUPERVISOR_BACKLOG);
|
|
||||||
let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG);
|
|
||||||
|
|
||||||
let shared = Arc::new(SupervisorShared{
|
|
||||||
broadcast_root,
|
|
||||||
state,
|
|
||||||
});
|
|
||||||
|
|
||||||
let (shutdown_0, shutdown_1) = shutdown;
|
|
||||||
let (pipe_0, pipe_1) = pipe;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
shutdown: shutdown_0,
|
|
||||||
pipe: pipe_0,
|
|
||||||
broadcast_receiver,
|
|
||||||
shared: Arc::clone(&shared),
|
|
||||||
|
|
||||||
handle: tokio::spawn(async move {
|
|
||||||
let shared = shared;
|
|
||||||
ExitStatus::from(service_fn(SupervisorTaskState {
|
|
||||||
shared,
|
|
||||||
recv: pipe_1,
|
|
||||||
shutdown: shutdown_1,
|
|
||||||
}).await.or_else(|err| err.into_own_result()))
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The state held by the running superviser service
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct SupervisorTaskState
|
|
||||||
{
|
|
||||||
shutdown: watch::Receiver<SupervisorControl>,
|
|
||||||
recv: mpsc::Receiver<ServiceRequest>,
|
|
||||||
shared: Arc<SupervisorShared>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Detached supervisor server
|
|
||||||
async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError>
|
|
||||||
{
|
|
||||||
impl Default for TerminationKind
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn default() -> Self
|
|
||||||
{
|
|
||||||
Self::Graceful
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The command stream to dispatch to the worker tasks
|
|
||||||
let command_dispatch = async {
|
|
||||||
while let Some(req) = recv.recv().await {
|
|
||||||
todo!("Dispatch to child(s)");
|
|
||||||
}
|
|
||||||
TerminationKind::Graceful
|
|
||||||
};
|
|
||||||
tokio::pin!(command_dispatch);
|
|
||||||
|
|
||||||
// The signal stream to be handled here
|
|
||||||
let signal_stream = async {
|
|
||||||
while let Some(value) = shutdown.recv().await
|
|
||||||
{
|
|
||||||
use SupervisorControl::*;
|
|
||||||
match value {
|
|
||||||
Initialise => info!("Initialised"),
|
|
||||||
Signal(None) => return TerminationKind::SignalHup,
|
|
||||||
Signal(Some(to)) => return TerminationKind::SignalTimeout(to),
|
|
||||||
Drop => return TerminationKind::Immediate,
|
|
||||||
Restart => todo!("not implemented"),
|
|
||||||
TaskLimit(_limit) => todo!("not implemented"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TerminationKind::Graceful
|
|
||||||
};
|
|
||||||
tokio::pin!(signal_stream);
|
|
||||||
|
|
||||||
//loop {
|
|
||||||
tokio::select! {
|
|
||||||
sd_kind = &mut signal_stream => {
|
|
||||||
// We received a signal
|
|
||||||
|
|
||||||
Err(ServiceTerminationError::Signal(sd_kind))
|
|
||||||
}
|
|
||||||
disp_end = &mut command_dispatch => {
|
|
||||||
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
|
|
||||||
disp_end.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// }
|
|
||||||
|
|
||||||
}
|
|
||||||
/// The mannor in which the supervisor exited.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum TerminationKind
|
|
||||||
{
|
|
||||||
/// The child task(s) were signalled to stop and they were waited on.
|
|
||||||
SignalHup,
|
|
||||||
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
|
|
||||||
SignalTimeout(time::Duration),
|
|
||||||
/// Immediately drop everything and exit
|
|
||||||
Immediate,
|
|
||||||
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
|
|
||||||
Graceful,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TerminationKind
|
|
||||||
{
|
|
||||||
/// Convert `TerminationKind::Graceful` into a non-error
|
|
||||||
fn strip_grace(self) -> Result<(), Self>
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Self::Graceful => Ok(()),
|
|
||||||
e => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl error::Error for TerminationKind{}
|
|
||||||
impl fmt::Display for TerminationKind
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Self::SignalHup => write!(f, "children were signalled to shut down and compiled"),
|
|
||||||
Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to),
|
|
||||||
Self::Immediate => write!(f, "children were dropped and an immediate exit was made"),
|
|
||||||
Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// The error returned on a failed service termination.
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum ServiceTerminationError
|
|
||||||
{
|
|
||||||
/// Was terminated by a signal.
|
|
||||||
Signal(TerminationKind),
|
|
||||||
/// Was terminated by a panic.
|
|
||||||
Panic,
|
|
||||||
/// There were no more commands being sent through, and the worker gracefully shut down.
|
|
||||||
Interest,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<TerminationKind> for Result<(), ServiceTerminationError>
|
|
||||||
{
|
|
||||||
fn from(from: TerminationKind) -> Self
|
|
||||||
{
|
|
||||||
ServiceTerminationError::Signal(from).into_own_result()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl ServiceTerminationError
|
|
||||||
{
|
|
||||||
fn into_own_result(self) -> Result<(), Self>
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Self::Signal(term) => term.strip_grace().map_err(Self::Signal),
|
|
||||||
x => Err(x),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl error::Error for ServiceTerminationError
|
|
||||||
{
|
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)>
|
|
||||||
{
|
|
||||||
Some(match &self {
|
|
||||||
Self::Signal(ts) => ts,
|
|
||||||
_ => return None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl fmt::Display for ServiceTerminationError
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Self::Signal(_) => write!(f, "shut down by signal"),
|
|
||||||
Self::Panic => write!(f, "shut down by panic. this is usually fatal"),
|
|
||||||
Self::Interest => write!(f, "all communications with this service stopped"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in new issue