You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yuurei/src/state/service.rs

332 lines
7.7 KiB

//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::{BTreeMap};
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorShutdownStatus
{
/// Signal the subtask(s) to shut down, then wait for them and exit.
Signal,
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Keep working
Normal,
/// Restart any and all subtask(s)
Restart,
}
impl Default for SupervisorShutdownStatus
{
#[inline]
fn default() -> Self
{
SupervisorShutdownStatus::Normal
}
}
/// The kind of command to send to the the service
#[derive(Debug)]
pub enum ServiceCommandKind
{
}
#[derive(Debug)]
pub enum ServiceResponseKind
{
}
#[derive(Debug)]
pub struct ServiceResponse(Option<ServiceCommandKind>);
#[derive(Debug)]
pub struct ServiceCommand
{
kind: ServiceCommandKind,
output: oneshot::Sender<ServiceResponse>,
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
handle: JoinHandle<ExitStatus>,
shutdown: watch::Sender<SupervisorShutdownStatus>,
pipe: mpsc::Sender<ServiceCommand>,
sub: BTreeMap<ServiceEventKind, ServiceEvent>
}
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObject(Arc<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObjectRef(Weak<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
/// Check if the object has not been destroyed yet.
pub fn is_alive(&self) -> bool
{
self.0.strong_count() > 0
}
}
impl ServiceEventObject
{
pub fn clone_inner(&self) -> Self
{
Self(Arc::from(self.0.clone_dyn_any_sync()))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => match Arc::try_unwrap(v) {
Ok(v) => Ok(v),
Err(s) => Err(Self(s)),
},
Err(e) => Err(Self(e)),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
{
self.0.as_ref().is::<T>()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.as_ref().downcast_ref::<T>()
}
}
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// `()`.
Ping,
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
cfg_if!{
if #[cfg(debug_assertions)] {
/// Type used for directed array.
/// Currently testing `smolset` over eagerly allocating.
type SESet<T> = smolset::SmolSet<[T; 1]>;
} else {
type SESet<T> = std::collections::HashSet<T>;
}
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
bc_id: BroadcastID,
kind: ServiceEventKind,
directed: Option<SESet<ServiceSubID>>,
obj: Option<ServiceEventObject>,
}
impl ServiceEvent
{
/// Create a new event to be broadcast
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
where T: Any + Send + Sync + 'static
{
Self {
bc_id: BroadcastID::id_new(),
kind,
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
None
} else {
Some(n)
}),
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
}
}
#[inline] pub fn id(&self) -> &BroadcastID
{
&self.bc_id
}
/// The kind of this event.
#[inline] pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes.contains(whom)
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
{
match self.directed.as_ref()
{
Some(yes) => MaybeIter::many(yes.iter()),
None => MaybeIter::none(),
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref()
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut()
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj),
None => Err(NoObjectError),
}
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}