restart state

new-idea
Avril 4 years ago
parent 69b0d073ed
commit 572bb748a3
Signed by: flanchan
GPG Key ID: 284488987C31F630

34
Cargo.lock generated

@ -1245,6 +1245,15 @@ version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.5"
@ -1263,6 +1272,21 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.118"
@ -1359,6 +1383,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallmap"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d61d73d5986b7f728a76234ca60f7826faa44dd9043d2954ca6583bfc7b875d"
dependencies = [
"rustc_version",
]
[[package]]
name = "smallvec"
version = "1.6.0"
@ -1802,6 +1835,7 @@ dependencies = [
"pretty_env_logger",
"serde",
"sha2",
"smallmap",
"smallvec",
"smolset",
"tokio",

@ -25,6 +25,7 @@ once_cell = "1.5.2"
pretty_env_logger = "0.4.0"
serde = {version = "1.0.118", features=["derive"]}
sha2 = "0.9.2"
smallmap = "1.3.0"
smallvec = {version = "1.6.0", features= ["union", "serde", "write"]}
smolset = "1.1"
tokio = {version = "0.2", features=["full"] }

@ -691,3 +691,115 @@ mod maybe_iter
where I: ExactSizeIterator{}
}
pub use maybe_iter::MaybeIter;
mod cancel {
use std::{
pin::Pin,
task::{Context, Poll},
convert::Infallible,
};
use std::{error,fmt};
use futures::Future;
/// A future that never completes
#[derive(Debug, Clone, Copy)]
pub struct NeverFuture;
impl Future for NeverFuture
{
type Output = Infallible;
#[inline] fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
/// Default error when a task is cancelled.
#[derive(Debug)]
pub struct CancellationError;
impl error::Error for CancellationError{}
impl fmt::Display for CancellationError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "task was cancelled")
}
}
}
pub use cancel::{
NeverFuture,
CancellationError,
};
/// Run a future until another one completes, 'cancelling' the running task.
///
/// # Usage
/// Normal usage to return `CancellationError`s.
/// ```
/// # use futures::Future;
/// # use crate::ext::*;
/// async fn do_something(cancel: impl Future) -> Result<(), CancellationError> {
/// with_cancel!(cancel => async move {
/// // do some async work
/// })?;
/// Ok(())
/// }
/// ```
/// Return specific errors
/// ```
/// # use futures::Future;
/// # use crate::ext::*;
/// # struct SomeError;
/// async fn do_something(cancel: impl Future) -> Result<(), SomeError> {
/// with_cancel!(cancel, SomeError => async move {
/// // do some async work
/// })?;
/// Ok(())
/// }
/// ```
/// Return the return of the cancel future itself
/// ```
/// # use futures::Future;
/// # use crate::ext::*;
/// # struct SomeError;
/// async fn do_something(cancel: impl Future<Output=String>) -> Result<(), String> {
/// with_cancel!(use cancel => async move {
/// // do some async work
/// })?;
/// Ok(())
/// }
/// ```
#[macro_export] macro_rules! with_cancel {
($cancel:expr => $normal:expr) => {
tokio::select! {
_ = $cancel => {
return Err($crate::ext::cancel::CancellationError.into());
}
x = $normal => {
x
}
}
};
($cancel:expr, $or:expr => $normal:expr) => {
tokio::select! {
_ = $cancel => {
return Err($or);
}
x = $normal => {
x
}
}
};
(use $cancel:expr => $normal:expr) => {
tokio::select! {
y = $cancel => {
return Err(y.into());
}
x = $normal => {
x
}
}
}
}

@ -0,0 +1,218 @@
use super::*;
use std::marker::PhantomData;
use std::{fmt,error};
use std::ops::Deref;
/// A spec to validate the formatting of a string for.
pub trait FormatSpec
{
type Error: Into<eyre::Report>;
fn validate(s: &str) -> Result<(), Self::Error>;
}
/// A strongly validated string slice
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct FormattedStr<F: FormatSpec + ?Sized>(PhantomData<F>, str);
/// A strongly validated string
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct FormattedString<F: FormatSpec + ?Sized>(String, PhantomData<F>);
impl<F: FormatSpec + ?Sized> FormattedStr<F>
{
/// Create a new instance without validating the input.
///
/// # Safety
/// You must be sure the input is of a valid state to `F`.
#[inline(always)] pub unsafe fn new_unchecked<'a>(s: &'a str) -> &'a Self
{
std::mem::transmute(s)
}
/// Create and validate a the format of a new instance.
pub fn new<'a>(s: &'a str) -> Result<&'a Self, F::Error>
{
F::validate(s).map(move |_| unsafe {Self::new_unchecked(s)})
}
/// Get the inner str
#[inline] pub fn as_str<'a>(&'a self) -> &'a str
{
&self.1
}
}
impl<F: FormatSpec + ?Sized> FormattedString<F>
{
/// Create a new instance without validating the input.
///
/// # Safety
/// You must be sure the input is of a valid state to `F`.
#[inline(always)] pub unsafe fn new_unchecked(s: String) -> Self
{
std::mem::transmute(s)
}
/// Create and validate a the format of a new instance.
pub fn new(s: String) -> Result<Self, F::Error>
{
F::validate(&s)
.map(move |_| unsafe {Self::new_unchecked(s)})
}
}
impl<F: FormatSpec + ?Sized> FormattedString<F>
{
/// As a formatted str
#[inline] pub fn as_str<'a>(&'a self) -> &'a FormattedStr<F>
{
unsafe { FormattedStr::new_unchecked(&self.0) }
}
}
impl<F: FormatSpec + ?Sized> AsRef<str> for FormattedStr<F>
{
#[inline] fn as_ref(&self) -> &str
{
self.as_str()
}
}
impl<F: FormatSpec + ?Sized> AsRef<FormattedStr<F>> for FormattedString<F>
{
#[inline] fn as_ref(&self) -> &FormattedStr<F> {
self.as_str()
}
}
impl<F: FormatSpec + ?Sized> Deref for FormattedString<F>
{
type Target = FormattedStr<F>;
#[inline] fn deref(&self) -> &Self::Target {
self.as_ref()
}
}
impl<'a,F: FormatSpec + ?Sized> From<&'a FormattedStr<F>> for &'a str
{
#[inline] fn from(from: &'a FormattedStr<F>) -> Self
{
&from.1
}
}
impl<'a,F: FormatSpec + ?Sized> TryFrom<&'a str> for &'a FormattedStr<F>
{
type Error = F::Error;
#[inline] fn try_from(from: &'a str) -> Result<Self, Self::Error>
{
FormattedStr::new(from)
}
}
impl<'a, F: FormatSpec + ?Sized> From<FormattedString<F>> for String
{
#[inline] fn from(from: FormattedString<F>) -> Self
{
from.0
}
}
impl<'a, F: FormatSpec + ?Sized> TryFrom<String> for FormattedString<F>
{
type Error = F::Error;
fn try_from(from: String) -> Result<Self, Self::Error>
{
Self::new(from)
}
}
pub mod formats
{
use super::*;
/// A hex string format specifier
#[derive(Debug)]
pub enum HexFormat{}
impl HexFormat
{
const HEX_MAP: &'static [u8] = b"1234567890abcdefABCDEF";
}
impl FormatSpec for HexFormat
{
type Error = HexFormatError;
fn validate(s: &str) -> Result<(), Self::Error> {
for (i, chr) in s.char_indices()
{
if !chr.is_ascii_alphanumeric() || !Self::HEX_MAP.contains(&(chr as u8)) {
return Err(HexFormatError(chr, i));
}
}
Ok(())
}
}
/// Error for an invalid hex string.
#[derive(Debug)]
pub struct HexFormatError(char, usize);
impl error::Error for HexFormatError{}
impl fmt::Display for HexFormatError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "invalid hex char {:?} at index {}", self.0, self.1)
}
}
/// A PEM formatted string.
#[derive(Debug, Clone, Copy)]
pub enum PEMFormat{}
#[derive(Debug)]
pub struct PEMFormatError;
impl error::Error for PEMFormatError{}
impl fmt::Display for PEMFormatError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "invalid PEM format")
}
}
impl FormatSpec for PEMFormat
{
type Error = PEMFormatError;
fn validate(_s: &str) -> Result<(), Self::Error> {
todo!()
}
}
pub type HexFormattedStr = FormattedStr<HexFormat>;
pub type HexFormattedString = FormattedString<HexFormat>;
pub type PEMFormattedStr = FormattedStr<PEMFormat>;
pub type PEMFormattedString = FormattedString<PEMFormat>;
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn hex_format()
{
let _invalid = HexFormattedStr::new("ab120982039840i ").expect_err("Invalidation");
let _valid = HexFormattedStr::new("abc123982095830495adcfDD").expect("Validation");
}
}
}

@ -25,10 +25,11 @@ use color_eyre::{
#[macro_use] mod ext; use ext::*;
mod bytes;
mod delta; //unused now, but tests still use it, so...
mod hard_format;
mod delta;
mod post;
mod state;
mod service;
fn install() -> eyre::Result<()>
{
@ -38,10 +39,13 @@ fn install() -> eyre::Result<()>
Ok(())
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
install()?;
println!("Hello, world!");
trace!("log initialised");
Ok(())
}

@ -0,0 +1,7 @@
use super::*;
#[derive(Debug)]
pub struct Post
{
}

@ -1,147 +1,2 @@
//! Session services
use super::*;
use tokio::{
task::JoinHandle,
sync::{
mpsc,
broadcast,
},
};
use std::{fmt,error};
use std::marker::{Send,Sync};
///// A boxed message that can be downcasted.
//pub type BoxedMessage = Box<dyn std::any::Any + Send + Sync + 'static>;
/// A handle to a service.
pub trait Service<T=ExitStatus>
where T: Send + 'static
{
/// The message type to send to the service.
type Message: Send + Sync + 'static;
/// The response to expect from the service.
type Response: Send + Sync + 'static;
/// Return the wait handle.
///
/// This method should drop the message pipeline.
fn wait_on(self) -> JoinHandle<T>;
/// An immutable reference to the pipe for sending message. Useful for service handle detaching (i.e. cloning the message input pipe).
fn message_in_ref(&self) -> &mpsc::Sender<Self::Message>;
/// The message pipe for sending messages to the service.
fn message_in(&mut self) -> &mut mpsc::Sender<Self::Message>;
/// The message pipe for receiving messages from the service.
fn message_out(&mut self) -> &mut broadcast::Receiver<Self::Response>;
/// Is the service alive? A `None` value means 'maybe', and is the default return.
///
/// # Note
/// This should not be considered an infallible indicator of if the service has crashed or not. A better method is attempting to send or receive a message and the sender/receiver returning an error.
#[inline] fn is_alive(&self) -> Option<bool>
{
None
}
}
/// Error returned when subscribing to a service
#[derive(Debug)]
#[non_exhaustive]
pub enum SubscribeError
{
/// The service's receive has already been dropped.
SenderDropped,
/// The service dropped the response oneshot channel.
NoResponse,
}
impl error::Error for SubscribeError{}
impl fmt::Display for SubscribeError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SenderDropped => write!(f, "the service has already stopped"),
Self::NoResponse => write!(f, "the service declined to, or was unable to respond"),
}
}
}
#[macro_export] macro_rules! join_service {
($serv:expr) => ($serv.await.map_err(|_| $crate::service::ExitStatus::Abnormal)?)
}
/// How a service exited
#[derive(Debug)]
#[non_exhaustive]
pub enum ExitStatus<T=()>
{
/// A graceful exit with value
Graceful(T),
/// An abnormal exit (counted as error)
///
/// # Usage
/// Usually for panicked services, otherwise use `Self::Error`.
/// The macro `join_service!()` can be used to convert handle join errors into this
Abnormal,
/// Exit on an error report (counted as error)
Error(eyre::Report),
}
impl<T,E: Into<eyre::Report>> From<Result<T, E>> for ExitStatus<T>
{
fn from(from: Result<T, E>) -> Self
{
match from
{
Ok(v) => Self::Graceful(v),
Err(e) => Self::Error(e.into())
}
}
}
#[derive(Debug)]
/// The error `ExitStatus::Abnormal` converts to.
pub struct AbnormalExitError;
impl error::Error for AbnormalExitError{}
impl fmt::Display for AbnormalExitError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "service terminated in an abnormal way")
}
}
/*
impl<U, T: error::Error + Send+Sync+'static> From<T> for ExitStatus<U>
{
fn from(from: T) -> Self
{
Self::Error(from.into())
}
}*/
impl<T: Default> Default for ExitStatus<T>
{
#[inline]
fn default() -> Self
{
Self::Graceful(T::default())
}
}
impl<T> From<ExitStatus<T>> for eyre::Result<T>
{
fn from(from: ExitStatus<T>) -> Self
{
match from {
ExitStatus::Abnormal => Err(AbnormalExitError).with_note(|| "The background worker likely panicked"),
ExitStatus::Graceful(t) => Ok(t),
ExitStatus::Error(rep) => Err(rep).wrap_err(AbnormalExitError),
}
}
}

@ -0,0 +1,13 @@
use super::*;
use std::collections::HashMap;
use post::Post;
#[derive(Debug, Hash)]
pub struct User;
#[derive(Debug)]
pub struct State
{
entries: HashMap<User, MaybeVec<Post>>
}

@ -1,33 +0,0 @@
//! Open post body
use super::*;
use tokio::{
sync::{
watch,
mpsc,
},
};
use once_cell::sync::OnceCell;
/// An open post body
#[derive(Debug)]
pub struct Karada
{
scape: Vec<char>,
body_cache: String,
//TODO: Pub/sub? Or should that be in some other place? Probably.
}
impl Karada
{
fn invalidate_cache(&mut self)
{
self.body_cache = self.scape.iter().copied().collect();
}
/// Apply this delta to the body.
pub fn apply_delta(&mut self, delta: &delta::Delta)
{
delta.insert(&mut self.scape);
self.invalidate_cache();
}
}

@ -1,155 +0,0 @@
//! Creating immutable images of state.
use super::*;
use std::{error,fmt};
/// An image of the entire post container
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Freeze
{
posts: Vec<post::Post>,
}
impl From<Freeze> for Imouto
{
#[inline] fn from(from: Freeze) -> Self
{
Self::from_freeze(from)
}
}
impl TryFrom<Imouto> for Freeze
{
type Error = FreezeError;
#[inline] fn try_from(from: Imouto) -> Result<Self, Self::Error>
{
from.try_into_freeze()
}
}
/// Error returned when a freeze operation fails
#[derive(Debug)]
pub struct FreezeError{
held: Option<Arc<RwLock<post::Post>>>,
}
impl FreezeError
{
/// The post associated with this error, if there is one.
pub fn post(&self) -> Option<&Arc<RwLock<post::Post>>>
{
self.held.as_ref()
}
}
impl error::Error for FreezeError{}
impl fmt::Display for FreezeError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f,"Failed to create freeze image")?;
if let Some(aref) = &self.held
{
let cnt = Arc::strong_count(&aref) - 1;
if cnt > 0 {
write!(f, "Post reference still held in {} other places.", cnt)
} else {
write!(f, "Post reference was still held in another place at the time, but no longer is.")
}
} else {
Ok(())
}
}
}
impl Imouto
{
/// Create a serialisable image of this store by cloning each post into it.
pub async fn freeze(&self) -> Freeze
{
let read = &self.all;
let mut sis = Freeze{
posts: Vec::with_capacity(read.len()),
};
for (_, post) in read.iter()
{
sis.posts.push(post.read().await.clone());
}
sis
}
/// Consume into a serialisable image of this store.
///
/// # Fails
/// If references to any posts are still held elsewhere.
pub fn try_into_freeze(self) -> Result<Freeze, FreezeError>
{
let read = self.all;
let mut sis = Freeze{
posts: Vec::with_capacity(read.len()),
};
for post in read.into_iter()
{
sis.posts.push(match Arc::try_unwrap(post) {
Ok(val) => val.into_inner(),
Err(arc) => return Err(FreezeError{held: Some(arc)}),
// Err(_arc) => panic!("Reference to post is still being used"),//arc.read().await.clone(), // something else holds the reference, we can't consume it.
});
}
Ok(sis)
}
/// Consume into a serialisable image of this store.
///
/// # Panics
/// If references to any posts are still held elsewhere.
pub fn into_freeze(self) -> Freeze
{
self.try_into_freeze().expect("Failed to consume into freeze")
}
/// Create a new store from a serialisable image of one by cloning each post in it
pub fn unfreeze(freeze: &Freeze) -> Self
{
let mut posts = Arena::new();
let mut user_map = HashMap::new();
for post in freeze.posts.iter()
{
let idx = posts.insert(Arc::new(RwLock::new(post.clone())));
user_map.entry(post.owner().clone())
.or_insert_with(|| MaybeVec::new())
.push(idx);
}
Self {
all: posts,
user_map,
}
}
/// Create a new store by consuming serialisable image of one by cloning each post in it
pub fn from_freeze(freeze: Freeze) -> Self
{
let mut posts = Arena::new();
let mut user_map = HashMap::new();
for post in freeze.posts.into_iter()
{
let mapping = user_map.entry(post.owner().clone())
.or_insert_with(|| MaybeVec::new());
let idx = posts.insert(Arc::new(RwLock::new(post)));
mapping.push(idx);
}
Self {
all: posts,
user_map,
}
}
}

@ -1,69 +0,0 @@
use super::*;
use std::collections::HashMap;
use generational_arena::{
Arena, Index as ArenaIndex,
};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::ops::{Deref, DerefMut};
pub mod session;
pub mod user;
pub mod post;
pub mod body;
mod service; pub use service::*;
mod freeze; pub use freeze::*;
/// Entire post state container
#[derive(Debug)]
pub struct Imouto
{
all: Arena<Arc<RwLock<post::Post>>>,
user_map: HashMap<user::UserID, MaybeVec<ArenaIndex>>,
}
impl Imouto
{
/// Create a new empty container
pub fn new() -> Self
{
Self {
all: Arena::new(),
user_map: HashMap::new(),
}
}
}
#[derive(Debug)]
/// Entire program state
struct Oneesan
{
posts: RwLock<Imouto>,
}
/// Shares whole program state
#[derive(Debug, Clone)]
pub struct State(Arc<Oneesan>);
impl State
{
/// Create a new empty state.
pub fn new() -> Self
{
Self(Arc::new(Oneesan {
posts: RwLock::new(Imouto::new()),
}))
}
/// Get a reference to the post state container
pub async fn imouto(&self) -> tokio::sync::RwLockReadGuard<'_, Imouto>
{
self.0.posts.read().await
}
/// Get a mutable reference to the post state container
pub async fn imouto_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, Imouto>
{
self.0.posts.write().await
}
}

@ -1,37 +0,0 @@
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OpenPost
{
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClosedPost
{
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PostKind
{
Open(OpenPost),
Closed(ClosedPost),
}
/// A post
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Post
{
owner: user::UserID,
kind: PostKind
}
impl Post
{
/// The ID of the owning user & session of this post.
pub fn owner(&self) -> &user::UserID
{
&self.owner
}
}

@ -1,141 +0,0 @@
//! Controls the broadcasting of events sent from the state service task
use super::*;
/// The kind of event outputted from a state service's broadcast stream
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub enum ServiceEventKind
{
/// Does nothing.
///
/// # Associated object
/// `()`.
Ping,
/// Does nothing.
///
/// # Associated object
/// None.
KeepAlive,
}
cfg_if!{
if #[cfg(debug_assertions)] {
/// Type used for directed array.
/// Currently testing `smolset` over eagerly allocating.
pub(super) type SESet<T> = smolset::SmolSet<[T; 1]>;
} else {
pub(super) type SESet<T> = std::collections::HashSet<T>;
}
}
/// An event outputted from a state service's broadcast stream
#[derive(Debug, Clone)]
pub struct ServiceEvent
{
bc_id: BroadcastID,
kind: ServiceEventKind,
directed: Option<SESet<ServiceSubID>>,
obj: Option<ServiceEventObject>,
}
impl ServiceEvent
{
/// Create a new event to be broadcast
fn new<T>(kind: ServiceEventKind, directed: Option<impl IntoIterator<Item=ServiceSubID>>, obj: Option<T>) -> Self
where T: Any + Send + Sync + 'static
{
Self {
bc_id: BroadcastID::id_new(),
kind,
directed: directed.map(|x| x.into_iter().collect()).and_then(|n: SESet<_>| if n.len() < 1 {
None
} else {
Some(n)
}),
obj: obj.map(|x| ServiceEventObject(Arc::new(x))),
}
}
#[inline] pub fn id(&self) -> &BroadcastID
{
&self.bc_id
}
/// The kind of this event.
#[inline] pub fn kind(&self) -> ServiceEventKind
{
self.kind
}
/// Check if this event is for you
pub fn is_directed_for(&self, whom: &ServiceSubID) -> bool
{
if let Some(yes) = self.directed.as_ref() {
yes.contains(whom)
} else {
false
}
}
/// Check if this event is directed to anyone
pub fn is_directed(&self) -> bool
{
self.directed.is_some()
}
/// Check who this event is directed to.
///
/// If it is not directed, an empty slice will be returned.
pub fn directed_to(&self) -> impl Iterator<Item = &'_ ServiceSubID> + '_
{
match self.directed.as_ref()
{
Some(yes) => MaybeIter::many(yes.iter()),
None => MaybeIter::none(),
}
}
/// Get a reference to the object, if there is one.
pub fn obj_ref(&self) -> Option<&ServiceEventObject>
{
self.obj.as_ref()
}
/// Get a mutable reference to the object, if there is one.
pub fn obj_mut(&mut self) -> Option<&mut ServiceEventObject>
{
self.obj.as_mut()
}
/// Try to consume into the inner object. If there is no object, return self.
pub fn try_into_object(self) -> Result<ServiceEventObject, Self>
{
match self.obj
{
Some(obj) => Ok(obj),
None => Err(self),
}
}
}
impl From<ServiceEvent> for Option<ServiceEventObject>
{
#[inline] fn from(from: ServiceEvent) -> Self
{
from.obj
}
}
impl TryFrom<ServiceEvent> for ServiceEventObject
{
type Error = NoObjectError;
#[inline] fn try_from(from: ServiceEvent) -> Result<Self, Self::Error>
{
match from.obj
{
Some(obj) => Ok(obj),
None => Err(NoObjectError),
}
}
}

@ -1,28 +0,0 @@
//! Global state service
use super::*;
use tokio::{
sync::{
watch,
mpsc,
oneshot,
broadcast,
},
task::JoinHandle,
};
use crate::service::{
ExitStatus,
};
use std::{error, fmt};
use std::sync::Weak;
use std::any::Any;
use std::collections::{BTreeMap};
id_type!(ServiceSubID; "Optional ID for filtering directed broadcast messages");
id_type!(BroadcastID; "Each broadcast message has a unique ID.");
mod supervisor; pub use supervisor::*;
mod resreq; pub use resreq::*;
mod obj; pub use obj::*;
mod events; pub use events::*;

@ -1,114 +0,0 @@
//! broadcast object definitions
use super::*;
/// Object sent through the broadcast channel.
///
/// These objects can be cloned and downcasted, becaause they are atomically refcounted if that is more desireable.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObject(pub(super) Arc<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObject);
/// A weak reference to a `ServiceEventObject`.
#[derive(Clone)]
#[repr(transparent)]
pub struct ServiceEventObjectRef(pub(super) Weak<dyn Any + Send + Sync + 'static>);
shim_debug!(ServiceEventObjectRef);
impl ServiceEventObjectRef
{
/// Try to upgrade to a concrete reference, and then clone the inner object.
pub fn try_clone(&self) -> Option<ServiceEventObject>
{
match self.0.upgrade()
{
Some(arc) => Some(ServiceEventObject(arc).clone()),
None => None
}
}
/// Try to upgrade to a concrete reference.
pub fn upgrade(self) -> Result<ServiceEventObject, Self>
{
match self.0.upgrade()
{
Some(arc) => Ok(ServiceEventObject(arc)),
None => Err(self),
}
}
/// Check if the object has not been destroyed yet.
pub fn is_alive(&self) -> bool
{
self.0.strong_count() > 0
}
}
impl ServiceEventObject
{
pub fn clone_inner(&self) -> Self
{
Self(Arc::from(self.0.clone_dyn_any_sync()))
}
/// Get a weak reference counted handle to the object, without cloning the object itself.
pub fn clone_weak(&self) -> ServiceEventObjectRef
{
ServiceEventObjectRef(Arc::downgrade(&self.0))
}
/// Try to downcast the inner object to a concrete type and then clone it.
///
/// This will fail if:
/// * The downcasted type is invalid
#[inline] pub fn downcast_clone<T: Any + Clone + Send + Sync + 'static>(&self) -> Option<T>
{
self.downcast_ref::<T>().map(|x| *x.clone_dyn_any().downcast().unwrap())
}
/// Try to consume this instance into downcast.
///
/// This will fail if:
/// * The downcasted type is invalid
/// * There are other references to this object (created through `clone_ref()`.).
pub fn try_into_downcast<T: Any + Send + Sync + 'static>(self) -> Result<T, Self>
{
match Arc::downcast(self.0)
{
Ok(v) => match Arc::try_unwrap(v) {
Ok(v) => Ok(v),
Err(s) => Err(Self(s)),
},
Err(e) => Err(Self(e)),
}
}
/// Check if there are any other references to this object
#[inline] pub fn is_unique(&self) -> bool
{
Arc::strong_count(&self.0) == 1
}
/// Try to downcast the object into a concrete type
#[inline] pub fn is<T: Any + Send + Sync + 'static>(&self) -> bool
{
self.0.as_ref().is::<T>()
}
/// Try to downcast the object into a concrete type
#[inline] pub fn downcast_ref<T: Any + Send + Sync + 'static>(&self) -> Option<&T>
{
self.0.as_ref().downcast_ref::<T>()
}
}
#[derive(Debug)]
/// Error returned from trying to extract an object from a `ServiceEvent` which has none.
pub struct NoObjectError;
impl error::Error for NoObjectError{}
impl fmt::Display for NoObjectError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "there was no object broadcasted along with this ")
}
}

@ -1,65 +0,0 @@
//! Responses and requests for the state service(s).
//!
//! These are sent to `Supervisor` which then dispatches them accordingly.
use super::*;
/// The kind of request to send to the the service
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceRequestKind
{
/// A no-op request.
None,
/// Test request.
#[cfg(debug_assertions)] EchoRequest(String),
}
/// The kind of response to expect from a service query, if any.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceResponseKind
{
/// Test response.
#[cfg(debug_assertions)] EchoResponse(String),
/// Empty response
None,
}
/// A response from a service to a specific query.
///
/// It is sent theough the `output` onehot channel in the `ServiceCommand` struct.
#[derive(Debug)]
pub struct ServiceResponse(ServiceRequestKind);
impl ServiceResponse
{
/// An empty (default) response
#[inline] pub const fn none() -> Self
{
Self(ServiceRequestKind::None)
}
}
/// A formed service request.
#[derive(Debug)]
pub struct ServiceRequest
{
kind: ServiceRequestKind,
output: oneshot::Sender<ServiceResponse>, // If there is no response, this sender will just be dropped and the future impl can return `None` instead of `Some(response)`.
}
impl ServiceRequest
{
/// Create a new request
pub(in super) fn new(kind: ServiceRequestKind) -> (Self, oneshot::Receiver<ServiceResponse>)
{
let (tx, rx) = oneshot::channel();
(Self {
kind,
output: tx
}, rx)
}
}

@ -1,92 +0,0 @@
//! Dispatching to state service task(s) through a supervisor
use super::*;
use tokio::time;
use std::{fmt, error};
use futures::prelude::*;
impl Supervisor
{
/// Dispatch a request to the supervisor to be passed through to a subtask.
///
/// # Returns
/// Returns a `Future` that can be awaited on to produce the value sent back by the task (if there is one).
///
/// # Errors
/// * The first failure will be caused if sending to the supervisor fails.
/// * The 2nd failure will be caused if either the supervisor, or its delegated task panics before being able to respond, or if the task simply does not respond.
pub async fn dispatch_req(&mut self, kind: ServiceRequestKind) -> Result<impl Future<Output=Result<ServiceResponse, SupervisorDispatchError>> + 'static, SupervisorDispatchError>
{
let (req, rx) = ServiceRequest::new(kind);
self.pipe.send(req).await.map_err(|_| SupervisorDispatchError::Send)?;
Ok(rx.map_err(|_| SupervisorDispatchError::Recv))
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait(&mut self, kind: ServiceRequestKind) -> Result<ServiceResponse, SupervisorDispatchError>
{
Ok(self.dispatch_req(kind)
.await?
.await?)
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the timeout expires before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_timeout(&mut self, kind: ServiceRequestKind, timeout: time::Duration) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = time::delay_for(timeout) => {
return Err(SupervisorDispatchError::Timeout("receiving response", Some(timeout)))
}
}
}
/// Dispatch a request to the supervisor to be passed through to a subtask and then wait for a response from it.
/// If the future `until` completes before a response from the server is received, then the operation will cancel and the error returned will be `SupervisorDispatchError::Timeout`.
///
/// # Returns
/// Returns the value sent back by the task, if there is one
pub async fn dispatch_and_wait_until(&mut self, kind: ServiceRequestKind, until: impl Future) -> Result<ServiceResponse, SupervisorDispatchError>
{
let resp_wait = self.dispatch_req(kind)
.await?;
tokio::select! {
val = resp_wait => {
return Ok(val?);
}
_ = until => {
return Err(SupervisorDispatchError::Timeout("receiving response", None))
}
}
}
}
/// Error when dispatching a request to the supervisor
#[derive(Debug)]
pub enum SupervisorDispatchError
{
Send, Recv, Timeout(&'static str, Option<tokio::time::Duration>),
}
impl error::Error for SupervisorDispatchError{}
impl fmt::Display for SupervisorDispatchError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Send => write!(f, "dispatching the request failed"),
Self::Recv => write!(f, "receiving the response failed"),
Self::Timeout(msg, Some(duration)) => write!(f, "timeout on {} was reached ({:?})", msg, duration),
Self::Timeout(msg, _) => write!(f, "timeout on {} was reached", msg),
}
}
}

@ -1,309 +0,0 @@
//! Handles spawning and restarting service task(s)
use super::*;
use tokio::sync::RwLock;
use std::mem::MaybeUninit;
use std::ops;
use futures::prelude::*;
use tokio::time;
use std::{fmt, error};
const SUPERVISOR_BACKLOG: usize = 32;
mod dispatch; pub use dispatch::*;
TODO: This all needs redoing when i'm actually lucid. This part seems okay but the rest of `service` needs to go and be replaced by something like this
/// Signal the shutdown method to the supervisor.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub enum SupervisorControl
{
/// Normal working
Initialise,
/// Signal the subtask(s) to shut down, then wait for them and exit, with an optional timeout.
///
/// # Notes
/// If the timeout expires while waiting, then the mode is switched to `Drop`.
Signal(Option<time::Duration>),
/// Drop all handles and pipes to subtask(s) then immediately exit.
Drop,
/// Restart any and all subtask(s)
Restart,
/// Set the max task limit. Default is 0.
TaskLimit(usize),
}
impl Default for SupervisorControl
{
#[inline]
fn default() -> Self
{
Self::Initialise
}
}
/// Supervisor responsible for spawning the state handler service.
#[derive(Debug)]
pub(super) struct Supervisor
{
/// Handle for the supervisor task itself
handle: JoinHandle<ExitStatus>,
/// Watch sender for signalling shutdowns the supervisor task itself
shutdown: watch::Sender<SupervisorControl>,
/// The pipe to send requests to the supervisor's subtasks
pipe: mpsc::Sender<ServiceRequest>,
/// The initial receiver created from `broadcast_root`.
broadcast_receiver: broadcast::Receiver<ServiceEvent>,
/// Data shared between the supervisor's task and its controller instance here.
shared: Arc<SupervisorShared>,
}
/// Object shared btweeen the Supervisor control instance and its supervisor task.
#[derive(Debug)]
struct SupervisorShared
{
/// this is for filtering specific messages to specific subscribers
sub: RwLock<BTreeMap<ServiceEventKind, SESet<ServiceSubID>>>,
broadcast_root: broadcast::Sender<ServiceEvent>,
state: state::State,
}
/// A subscriber to supervisor task(s) event pump
#[derive(Debug)]
pub struct Subscriber
{
id: ServiceSubID,
/// For directed messages
spec: mpsc::Receiver<ServiceEvent>,
/// For broadcast messages
broad: broadcast::Receiver<ServiceEvent>,
}
impl Supervisor
{
/// Attempt to send a control signal to the supervisor itself
pub fn signal_control(&self, sig: SupervisorControl) -> Result<(), watch::error::SendError<SupervisorControl>>
{
self.shutdown.broadcast(sig)?;
Ok(())
}
/// Drop all communications with background worker and wait for it to complete
pub async fn join_now(self) -> eyre::Result<()>
{
let handle = {
let Self { handle, ..} = self; // drop everything else
handle
};
handle.await?;
Ok(())
}
/// Check if the background worker has not been dropped.
///
/// If this returns false it usually indicates a fatal error.
pub fn is_alive(&self) -> bool
{
Arc::strong_count(&self.shared) > 1
}
/// Create a new supervisor for this state.
pub fn new(state: state::State) -> Self
{
let shutdown = watch::channel(Default::default());
let pipe = mpsc::channel(SUPERVISOR_BACKLOG);
let (broadcast_root, broadcast_receiver) = broadcast::channel(SUPERVISOR_BACKLOG);
let shared = Arc::new(SupervisorShared{
broadcast_root,
state,
});
let (shutdown_0, shutdown_1) = shutdown;
let (pipe_0, pipe_1) = pipe;
Self {
shutdown: shutdown_0,
pipe: pipe_0,
broadcast_receiver,
shared: Arc::clone(&shared),
handle: tokio::spawn(async move {
let shared = shared;
ExitStatus::from(service_fn(SupervisorTaskState {
shared,
recv: pipe_1,
shutdown: shutdown_1,
}).await.or_else(|err| err.into_own_result()))
}),
}
}
}
/// The state held by the running superviser service
#[derive(Debug)]
struct SupervisorTaskState
{
shutdown: watch::Receiver<SupervisorControl>,
recv: mpsc::Receiver<ServiceRequest>,
shared: Arc<SupervisorShared>,
}
/// Detached supervisor server
async fn service_fn(SupervisorTaskState {shared, mut recv, mut shutdown}: SupervisorTaskState) -> Result<(), ServiceTerminationError>
{
impl Default for TerminationKind
{
#[inline]
fn default() -> Self
{
Self::Graceful
}
}
// The command stream to dispatch to the worker tasks
let command_dispatch = async {
while let Some(req) = recv.recv().await {
todo!("Dispatch to child(s)");
}
TerminationKind::Graceful
};
tokio::pin!(command_dispatch);
// The signal stream to be handled here
let signal_stream = async {
while let Some(value) = shutdown.recv().await
{
use SupervisorControl::*;
match value {
Initialise => info!("Initialised"),
Signal(None) => return TerminationKind::SignalHup,
Signal(Some(to)) => return TerminationKind::SignalTimeout(to),
Drop => return TerminationKind::Immediate,
Restart => todo!("not implemented"),
TaskLimit(_limit) => todo!("not implemented"),
}
}
TerminationKind::Graceful
};
tokio::pin!(signal_stream);
//loop {
tokio::select! {
sd_kind = &mut signal_stream => {
// We received a signal
Err(ServiceTerminationError::Signal(sd_kind))
}
disp_end = &mut command_dispatch => {
// The command dispatch exited first, the logical error is `Graceful`. But it's not really an error, so...
disp_end.into()
}
}
// }
}
/// The mannor in which the supervisor exited.
#[derive(Debug)]
pub enum TerminationKind
{
/// The child task(s) were signalled to stop and they were waited on.
SignalHup,
/// If there was a timeout specified, and that timeout expired, the message will be `SignalTimeout` instead of `SignalHup`.
SignalTimeout(time::Duration),
/// Immediately drop everything and exit
Immediate,
/// A non-signalled shutdown. There were no more watchers for the shutdown channel.
Graceful,
}
impl TerminationKind
{
/// Convert `TerminationKind::Graceful` into a non-error
fn strip_grace(self) -> Result<(), Self>
{
match self {
Self::Graceful => Ok(()),
e => Err(e),
}
}
}
impl error::Error for TerminationKind{}
impl fmt::Display for TerminationKind
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SignalHup => write!(f, "children were signalled to shut down and compiled"),
Self::SignalTimeout(to) => write!(f, "children were signalled to shut but did not do so within the {:?} timeout", to),
Self::Immediate => write!(f, "children were dropped and an immediate exit was made"),
Self::Graceful => write!(f, "a graceful shutdown order was issued and compiled with"),
}
}
}
/// The error returned on a failed service termination.
#[derive(Debug)]
#[non_exhaustive]
pub enum ServiceTerminationError
{
/// Was terminated by a signal.
Signal(TerminationKind),
/// Was terminated by a panic.
Panic,
/// There were no more commands being sent through, and the worker gracefully shut down.
Interest,
}
impl From<TerminationKind> for Result<(), ServiceTerminationError>
{
fn from(from: TerminationKind) -> Self
{
ServiceTerminationError::Signal(from).into_own_result()
}
}
impl ServiceTerminationError
{
fn into_own_result(self) -> Result<(), Self>
{
match self {
Self::Signal(term) => term.strip_grace().map_err(Self::Signal),
x => Err(x),
}
}
}
impl error::Error for ServiceTerminationError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)>
{
Some(match &self {
Self::Signal(ts) => ts,
_ => return None,
})
}
}
impl fmt::Display for ServiceTerminationError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Signal(_) => write!(f, "shut down by signal"),
Self::Panic => write!(f, "shut down by panic. this is usually fatal"),
Self::Interest => write!(f, "all communications with this service stopped"),
}
}
}

@ -1,145 +0,0 @@
//! Session for each connected user
use super::*;
use tokio::{
sync::{
mpsc,
broadcast,
oneshot,
},
task::JoinHandle,
};
use std::sync::Arc;
use crate::service::{self, SubscribeError};
id_type!(SessionID; "A unique session ID, not bound to a user.");
impl SessionID
{
/// Generate a random session ID.
#[inline] fn generate() -> Self
{
Self::id_new()
}
}
#[derive(Debug)]
pub enum SessionResponse
{
Closed(SessionID),
}
#[derive(Debug)]
pub enum SessionCommand
{
Shutdown,
/// Subscribe to the session's message pump.
Subscribe(oneshot::Sender<broadcast::Receiver<SessionResponse>>),
/// Take this websocket connection.
//TODO: websockets
Connect(!),
}
/// Metadata for a session, scored across its service and handle(s)
#[derive(Debug)]
struct SessionMetadata
{
id: SessionID,
user: user::User,
}
/// A single connected session.
/// Hold the service for this session, its ID, and (TODO) its websocket connection.
#[derive(Debug)]
pub struct Session
{
meta: Arc<SessionMetadata>,
tx: mpsc::Sender<SessionCommand>,
rx: broadcast::Receiver<SessionResponse>,
handle: JoinHandle<()>,
}
async fn service_task(meta: Arc<SessionMetadata>, state: state::State, response_pair: (broadcast::Sender<SessionResponse>, mpsc::Receiver<SessionCommand>))
{
let (tx, mut rx) = response_pair;
while let Some(command) = rx.recv().await
{
match command
{
SessionCommand::Shutdown => break,
SessionCommand::Subscribe(out) => ignore!(out.send(tx.subscribe())),
_ => todo!()
}
}
let _ = tx.send(SessionResponse::Closed(meta.id.clone()));
}
impl Session
{
/// Create a new session object
pub fn create(user: user::User, state: state::State) -> Self
{
let id = SessionID::generate();
let meta =Arc::new(SessionMetadata{
user,
id,
});
let (handle, tx, rx) = {
let (s_tx, s_rx) = broadcast::channel(16);
let (r_tx, r_rx) = mpsc::channel(16);
(tokio::spawn(service_task(Arc::clone(&meta), state, (s_tx, r_rx))),
r_tx, s_rx)
};
Self {
meta,
handle,
tx, rx,
}
}
/// The randomly generated ID of this session, irrespective of the user of this session.
#[inline] pub fn session_id(&self) -> &SessionID
{
&self.meta.id
}
/// The unique user ID of this session
pub fn user_id(&self) -> user::UserID
{
self.meta.user.id_for_session(self)
}
/// Ask the service to subscribe to it.
pub async fn subscribe(&mut self) -> Result<broadcast::Receiver<SessionResponse>, SubscribeError>
{
let (tx, rx) = oneshot::channel();
self.tx.send(SessionCommand::Subscribe(tx)).await.map_err(|_| SubscribeError::SenderDropped)?;
rx.await.map_err(|_| SubscribeError::NoResponse)
}
}
// impl service::Service for Session
// {
// type Message = SessionCommand;
// type Response = SessionResponse;
// #[inline] fn wait_on(self) -> JoinHandle<()> {
// self.handle
// }
// #[inline] fn message_in_ref(&self) -> &mpsc::Sender<Self::Message> {
// &self.tx
// }
// #[inline] fn message_in(&mut self) -> &mut mpsc::Sender<Self::Message> {
// &mut self.tx
// }
// #[inline] fn message_out(&mut self) -> &mut broadcast::Receiver<Self::Response> {
// &mut self.rx
// }
// #[inline] fn is_alive(&self) -> Option<bool> {
// Some(Arc::strong_count(&self.meta) > 1)
// }
// }

@ -1,106 +0,0 @@
//! Used to determine which post belongs to who.
//!
//! Mostly for determining if a poster owns a post.
use super::*;
use std::{
net::SocketAddr,
};
use cryptohelpers::sha256;
/// A user's unique ID.
///
/// This is composed by the user's address and their session ID.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UserID(SocketAddr, session::SessionID);
static COUNTER: GlobalCounter = GlobalCounter::new();
impl UserID
{
/// Generate a token from this instance.
///
/// User tokens are deterministically generated and can be deterministically verified.
pub fn generate_token(&self) -> u64
{
let cnt = COUNTER.get();
let mut trunc = [0u8; std::mem::size_of::<u64>()];
let hash = GloballySalted::new(self).compute_sha256_hash();
bytes::move_slice(&mut trunc[..], hash.as_ref());
u64::from_le_bytes(trunc) ^ cnt
}
/// Validate a token for this instance created with `generate_token`.
pub fn validate_token(&self, val: u64) -> bool
{
let mut trunc = [0u8; std::mem::size_of::<u64>()];
let hash = GloballySalted::new(self).compute_sha256_hash();
bytes::move_slice(&mut trunc[..], hash.as_ref());
COUNTER.valid(u64::from_le_bytes(trunc) ^ val)
}
}
/// A user not bound to a session.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct User
{
addr: SocketAddr,
}
impl User
{
/// Get the user ID for this session.
pub fn id_for_session(&self, session: &session::Session) -> UserID
{
UserID(self.addr, session.session_id().clone())
}
}
#[cfg(test)]
mod tests
{
use super::*;
use tokio::sync::mpsc;
use tokio::time;
use std::net::SocketAddrV4;
use std::net::Ipv4Addr;
#[tokio::test]
async fn counter_tokens()
{
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 80));
let usr = User{addr};
let ses = session::Session::create(usr);
let id = ses.user_id();
let (mut tx, mut rx) = mpsc::channel(5);
let task = tokio::spawn(async move {
let id = ses.user_id();
while let Some(token) = rx.recv().await {
if !id.validate_token(token) {
panic!("Failed to validate token {:x} for id {:?}", token, id);
} else {
eprintln!("Token {:x} valid for id {:?}", token, id);
}
}
});
for x in 1..=10
{
if x % 2 == 0 {
time::delay_for(time::Duration::from_millis(10 * x)).await;
}
if tx.send(id.generate_token()).await.is_err() {
eprintln!("Failed to send to task");
break;
}
}
drop(tx);
task.await.expect("Background validate task failed");
}
}
Loading…
Cancel
Save