diff --git a/src/ext.rs b/src/ext.rs index ba53044..aeb363e 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -528,3 +528,8 @@ pub type MaybeVec = smallvec::SmallVec<[T; 1]>; })? }; } + +/// Ignore the value of this expression, returning unit `()` value. +#[macro_export] macro_rules! ignore { + ($expr:expr) => ({let _ = $expr;}); +} diff --git a/src/main.rs b/src/main.rs index 8cc63a2..9c8455a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ #![cfg_attr(feature="nightly", feature(test))] +#![cfg_attr(feature="nightly", feature(never_type))] #![allow(dead_code)] @@ -13,6 +14,7 @@ mod bytes; mod delta; mod state; +mod service; #[tokio::main] async fn main() { diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..8178975 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,69 @@ +//! Session services +use super::*; +use tokio::{ + task::JoinHandle, + sync::{ + mpsc, + broadcast, + }, +}; +use std::{fmt,error}; +use std::marker::{Send,Sync}; + +///// A boxed message that can be downcasted. +//pub type BoxedMessage = Box; + +/// A handle to a service. +pub trait Service + where T: Send + 'static +{ + /// The message type to send to the service. + type Message: Send + Sync + 'static; + /// The response to expect from the service. + type Response: Send + Sync + 'static; + + /// Return the wait handle. + /// + /// This method should drop the message pipeline. + fn wait_on(self) -> JoinHandle; + + /// An immutable reference to the pipe for sending message. Useful for service handle detaching (i.e. cloning the message input pipe). + fn message_in_ref(&self) -> &mpsc::Sender; + + /// The message pipe for sending messages to the service. + fn message_in(&mut self) -> &mut mpsc::Sender; + /// The message pipe for receiving messages from the service. + fn message_out(&mut self) -> &mut broadcast::Receiver; + + /// Is the service alive? A `None` value means 'maybe', and is the default return. + /// + /// # Note + /// This should not be considered an infallible indicator of if the service has crashed or not. A better method is attempting to send or receive a message and the sender/receiver returning an error. + #[inline] fn is_alive(&self) -> Option + { + None + } +} + +/// Error returned when subscribing to a service +#[derive(Debug)] +#[non_exhaustive] +pub enum SubscribeError +{ + /// The service's receive has already been dropped. + SenderDropped, + /// The service dropped the response oneshot channel. + NoResponse, +} + +impl error::Error for SubscribeError{} +impl fmt::Display for SubscribeError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::SenderDropped => write!(f, "the service has already stopped"), + Self::NoResponse => write!(f, "the service declined to, or was unable to respond"), + } + } +} diff --git a/src/state/mod.rs b/src/state/mod.rs index 2980136..1d0aacc 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -24,7 +24,6 @@ pub struct Imouto impl Imouto { - /// Create a new empty container pub fn new() -> Self { diff --git a/src/state/session.rs b/src/state/session.rs index 7f7aabf..820fbd1 100644 --- a/src/state/session.rs +++ b/src/state/session.rs @@ -1,5 +1,15 @@ //! Session for each connected user use super::*; +use tokio::{ + sync::{ + mpsc, + broadcast, + oneshot, + }, + task::JoinHandle, +}; +use std::sync::Arc; +use service::SubscribeError; id_type!(SessionID; "A unique session ID, not bound to a user."); @@ -13,30 +23,123 @@ impl SessionID } #[derive(Debug)] -pub struct Session +pub enum SessionResponse +{ + Closed(SessionID), +} + +#[derive(Debug)] +pub enum SessionCommand +{ + Shutdown, + /// Subscribe to the session's message pump. + Subscribe(oneshot::Sender>), + + /// Take this websocket connection. + //TODO: websockets + Connect(!), +} + +/// Metadata for a session, scored across its service and handle(s) +#[derive(Debug)] +struct SessionMetadata { id: SessionID, user: user::User, + +} + +/// A single connected session. +/// Hold the service for this session, its ID, and (TODO) its websocket connection. +#[derive(Debug)] +pub struct Session +{ + meta: Arc, + + tx: mpsc::Sender, + rx: broadcast::Receiver, + handle: JoinHandle<()>, +} + +async fn service_task(meta: Arc, state: state::State, response_pair: (broadcast::Sender, mpsc::Receiver)) +{ + let (tx, mut rx) = response_pair; + while let Some(command) = rx.recv().await + { + match command + { + SessionCommand::Shutdown => break, + SessionCommand::Subscribe(out) => ignore!(out.send(tx.subscribe())), + + _ => todo!() + } + } + + let _ = tx.send(SessionResponse::Closed(meta.id.clone())); } impl Session { /// Create a new session object - pub fn create(user: user::User) -> Self + pub fn create(user: user::User, state: state::State) -> Self { - Self { + let id = SessionID::generate(); + let meta =Arc::new(SessionMetadata{ user, - id: SessionID::generate(), + id, + }); + let (handle, tx, rx) = { + let (s_tx, s_rx) = broadcast::channel(16); + let (r_tx, r_rx) = mpsc::channel(16); + (tokio::spawn(service_task(Arc::clone(&meta), state, (s_tx, r_rx))), + r_tx, s_rx) + }; + Self { + meta, + handle, + tx, rx, } } /// The randomly generated ID of this session, irrespective of the user of this session. #[inline] pub fn session_id(&self) -> &SessionID { - &self.id + &self.meta.id } /// The unique user ID of this session pub fn user_id(&self) -> user::UserID { - self.user.id_for_session(self) + self.meta.user.id_for_session(self) + } + + /// Ask the service to subscribe to it. + pub async fn subscribe(&mut self) -> Result, SubscribeError> + { + let (tx, rx) = oneshot::channel(); + self.tx.send(SessionCommand::Subscribe(tx)).await.map_err(|_| SubscribeError::SenderDropped)?; + + rx.await.map_err(|_| SubscribeError::NoResponse) + } +} + +impl service::Service for Session +{ + type Message = SessionCommand; + type Response = SessionResponse; + + #[inline] fn wait_on(self) -> JoinHandle<()> { + self.handle + } + #[inline] fn message_in_ref(&self) -> &mpsc::Sender { + &self.tx + } + #[inline] fn message_in(&mut self) -> &mut mpsc::Sender { + &mut self.tx + } + #[inline] fn message_out(&mut self) -> &mut broadcast::Receiver { + &mut self.rx + } + + #[inline] fn is_alive(&self) -> Option { + Some(Arc::strong_count(&self.meta) > 1) } }