parent
f66f5a6565
commit
13cb323ed3
@ -0,0 +1,303 @@
|
|||||||
|
//! Global state service
|
||||||
|
use super::*;
|
||||||
|
use tokio::{
|
||||||
|
sync::{
|
||||||
|
watch,
|
||||||
|
mpsc,
|
||||||
|
oneshot,
|
||||||
|
broadcast,
|
||||||
|
},
|
||||||
|
task::JoinHandle,
|
||||||
|
};
|
||||||
|
use crate::service::{
|
||||||
|
ExitStatus,
|
||||||
|
};
|
||||||
|
use std::{error, fmt};
|
||||||
|
use std::sync::Weak;
|
||||||
|
use std::any::Any;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
|
||||||
|
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
|
||||||
|
|
||||||
|
/// Signal the shutdown method to the supervisor.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
|
||||||
|
pub enum SupervisorShutdownStatus
|
||||||
|
{
|
||||||
|
/// Signal the subtask(s) to shut down, then wait for them and exit.
|
||||||
|
Signal,
|
||||||
|
/// Drop all handles and pipes to subtask(s) then immediately exit.
|
||||||
|
Drop,
|
||||||
|
/// Keep working
|
||||||
|
Normal,
|
||||||
|
/// Restart any and all subtask(s)
|
||||||
|
Restart,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SupervisorShutdownStatus
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn default() -> Self
|
||||||
|
{
|
||||||
|
SupervisorShutdownStatus::Normal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The kind of command to send to the the service
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ServiceCommandKind
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ServiceResponseKind
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ServiceResponse(Option<ServiceCommandKind>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ServiceCommand
|
||||||
|
{
|
||||||
|
kind: ServiceCommandKind,
|
||||||
|
output: oneshot::Sender<ServiceResponse>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Supervisor responsible for spawning the state handler service.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct Supervisor
|
||||||
|
{
|
||||||
|
handle: JoinHandle<ExitStatus>,
|
||||||
|
|
||||||
|
shutdown: watch::Sender<SupervisorShutdownStatus>,
|
||||||
|
|
||||||
|
pipe: mpsc::Sender<ServiceCommand>,
|
||||||
|
sub: BTreeMap<ServiceEventKind, ServiceEvent>
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Object sent through the broadcast channel.
|
||||||
|
///
|
||||||
|
/// These objects can be cloned and downcasted, but can also be atomically refcounted if that is more desireable.
|
||||||
|
///
|
||||||
|
/// # Cloning
|
||||||
|
/// The implementation of `Clone` for this instance clones the inner object, not the refcount. Use `clone_ref()` to *just* clone the refcount.
|
||||||
|
/// To downcast and clone the inner object without an extra `Arc` allocation, use `downcast_clone()` or `downcast().map(Clone::clone)`.
|
||||||
|
pub struct ServiceEventObject(Arc<dyn AnyCloneable + Send + Sync + 'static>);
|
||||||
|
shim_debug!(ServiceEventObject);
|
||||||
|
/// A weak reference to a `ServiceEventObject`.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ServiceEventObjectRef(Weak<dyn AnyCloneable + Send + Sync + 'static>);
|
||||||
|
shim_debug!(ServiceEventObjectRef);
|
||||||
|
|
||||||
|
/// Wrapper used when broadcasting to prevent useless inner object clones.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct EventObjectRefCloneWrap(ServiceEventObject);
|
||||||
|
|
||||||
|
impl Clone for EventObjectRefCloneWrap
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(self.0.clone_ref())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for ServiceEventObject
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(Arc::from(self.0.as_ref().clone_dyn()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceEventObjectRef
|
||||||
|
{
|
||||||
|
/// Try to upgrade to a concrete reference, and then clone the inner object.
|
||||||
|
pub fn try_clone(&self) -> Option<ServiceEventObject>
|
||||||
|
{
|
||||||
|
match self.0.upgrade()
|
||||||
|
{
|
||||||
|
Some(arc) => Some(ServiceEventObject(arc).clone()),
|
||||||
|
None => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Try to upgrade to a concrete reference.
|
||||||
|
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
|
||||||
|
{
|
||||||
|
match self.0.upgrade()
|
||||||
|
{
|
||||||
|
Some(arc) => Ok(ServiceEventObject(arc)),
|
||||||
|
None => Err(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl ServiceEventObject
|
||||||
|
{
|
||||||
|
/// Get an owned reference counted handle to the object, without cloning the object itself.
|
||||||
|
pub fn clone_ref(&self) -> Self
|
||||||
|
{
|
||||||
|
Self(Arc::clone(&self.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a weak reference counted handle to the object, without cloning the object itself.
|
||||||
|
pub fn clone_weak(&self) -> ServiceEventObjectRef
|
||||||
|
{
|
||||||
|
ServiceEventObjectRef(Arc::downgrade(&self.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to downcast the inner object to a concrete type and then clone it.
|
||||||
|
///
|
||||||
|
/// This will fail if:
|
||||||
|
/// * The downcasted type is invalid
|
||||||
|
#[inline] pub fn downcast_clone<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<T>
|
||||||
|
{
|
||||||
|
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
|
||||||
|
}
|
||||||
|
/// Try to consume this instance into downcast.
|
||||||
|
///
|
||||||
|
/// This will fail if:
|
||||||
|
/// * The downcasted type is invalid
|
||||||
|
/// * There are other references to this object (created through `clone_ref()`.).
|
||||||
|
pub fn try_into_downcast<T: AnyCloneable + Send + Sync + 'static>(self) -> Result<T, Self>
|
||||||
|
{
|
||||||
|
match Arc::downcast(self.0)
|
||||||
|
{
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => Self(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if there are any other references to this object
|
||||||
|
#[inline] pub fn is_unique(&self) -> bool
|
||||||
|
{
|
||||||
|
Arc::strong_count(&self.0) == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to downcast the object into a concrete type
|
||||||
|
#[inline] pub fn is<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
|
||||||
|
{
|
||||||
|
self.0.is()
|
||||||
|
}
|
||||||
|
/// Try to downcast the object into a concrete type
|
||||||
|
#[inline] pub fn downcast_ref<T: AnyCloneable + Send + Sync + 'static>(&self) -> Option<&T>
|
||||||
|
{
|
||||||
|
self.0.downcast_ref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The kind of event outputted from a state service's broadcast stream
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
|
||||||
|
pub enum ServiceEventKind
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Does nothing.
|
||||||
|
///
|
||||||
|
/// # Associated object
|
||||||
|
/// None.
|
||||||
|
KeepAlive,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An event outputted from a state service's broadcast stream
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServiceEvent
|
||||||
|
{
|
||||||
|
kind: ServiceEventKind,
|
||||||
|
directed: Option<MaybeVec<ServiceSubID>>,
|
||||||
|
obj: Option<EventObjectRefCloneWrap>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceEvent
|
||||||
|
{
|
||||||
|
/// The kind of this event.
|
||||||
|
pub fn kind(&self) -> ServiceEventKind
|
||||||
|
{
|
||||||
|
self.kind
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this event is for you
|
||||||
|
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
|
||||||
|
{
|
||||||
|
if let Some(yes) = self.directed.as_ref() {
|
||||||
|
yes == whom
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Check if this event is directed to anyone
|
||||||
|
pub fn is_directed(&self) -> bool
|
||||||
|
{
|
||||||
|
self.directed.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check who this event is directed to.
|
||||||
|
///
|
||||||
|
/// If it is not directed, an empty slice will be returned.
|
||||||
|
pub fn directed_to(&self) -> &[&ServiceSubID]
|
||||||
|
{
|
||||||
|
match self.directed.as_ref()
|
||||||
|
{
|
||||||
|
Some(yes) => &yes[..],
|
||||||
|
None => &[],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the object, if there is one.
|
||||||
|
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
|
||||||
|
{
|
||||||
|
self.obj.as_ref().map(|x| x.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to the object, if there is one.
|
||||||
|
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
|
||||||
|
{
|
||||||
|
self.obj.as_mut().map(|x| x.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to consume into the inner object. If there is no object, return self.
|
||||||
|
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
|
||||||
|
{
|
||||||
|
match self.obj
|
||||||
|
{
|
||||||
|
Some(obj) => Ok(obj),
|
||||||
|
None => Err(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ServiceEvent> for Option<ServiceEventObject>
|
||||||
|
{
|
||||||
|
#[inline] fn from(from: ServiceEvent) -> Self
|
||||||
|
{
|
||||||
|
from.obj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<ServiceEvent> for ServiceEventObject
|
||||||
|
{
|
||||||
|
type Error = NoObjectError;
|
||||||
|
|
||||||
|
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
|
||||||
|
{
|
||||||
|
match from.obj
|
||||||
|
{
|
||||||
|
Some(obj) => Ok(obj.0),
|
||||||
|
None => Err(NoObjectError),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
|
||||||
|
pub struct NoObjectError;
|
||||||
|
|
||||||
|
impl error::Error for NoObjectError{}
|
||||||
|
impl fmt::Display for NoObjectError
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "there was no object broadcasted along with this ")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in new issue