hacked together some session service shit

clone-session-service-objects
Avril 3 years ago
parent 3bd8ff68be
commit f66f5a6565
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -528,3 +528,8 @@ pub type MaybeVec<T> = smallvec::SmallVec<[T; 1]>;
})?
};
}
/// Ignore the value of this expression, returning unit `()` value.
#[macro_export] macro_rules! ignore {
($expr:expr) => ({let _ = $expr;});
}

@ -1,4 +1,5 @@
#![cfg_attr(feature="nightly", feature(test))]
#![cfg_attr(feature="nightly", feature(never_type))]
#![allow(dead_code)]
@ -13,6 +14,7 @@ mod bytes;
mod delta;
mod state;
mod service;
#[tokio::main]
async fn main() {

@ -0,0 +1,69 @@
//! Session services
use super::*;
use tokio::{
task::JoinHandle,
sync::{
mpsc,
broadcast,
},
};
use std::{fmt,error};
use std::marker::{Send,Sync};
///// A boxed message that can be downcasted.
//pub type BoxedMessage = Box<dyn std::any::Any + Send + Sync + 'static>;
/// A handle to a service.
pub trait Service<T=()>
where T: Send + 'static
{
/// The message type to send to the service.
type Message: Send + Sync + 'static;
/// The response to expect from the service.
type Response: Send + Sync + 'static;
/// Return the wait handle.
///
/// This method should drop the message pipeline.
fn wait_on(self) -> JoinHandle<T>;
/// An immutable reference to the pipe for sending message. Useful for service handle detaching (i.e. cloning the message input pipe).
fn message_in_ref(&self) -> &mpsc::Sender<Self::Message>;
/// The message pipe for sending messages to the service.
fn message_in(&mut self) -> &mut mpsc::Sender<Self::Message>;
/// The message pipe for receiving messages from the service.
fn message_out(&mut self) -> &mut broadcast::Receiver<Self::Response>;
/// Is the service alive? A `None` value means 'maybe', and is the default return.
///
/// # Note
/// This should not be considered an infallible indicator of if the service has crashed or not. A better method is attempting to send or receive a message and the sender/receiver returning an error.
#[inline] fn is_alive(&self) -> Option<bool>
{
None
}
}
/// Error returned when subscribing to a service
#[derive(Debug)]
#[non_exhaustive]
pub enum SubscribeError
{
/// The service's receive has already been dropped.
SenderDropped,
/// The service dropped the response oneshot channel.
NoResponse,
}
impl error::Error for SubscribeError{}
impl fmt::Display for SubscribeError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::SenderDropped => write!(f, "the service has already stopped"),
Self::NoResponse => write!(f, "the service declined to, or was unable to respond"),
}
}
}

@ -24,7 +24,6 @@ pub struct Imouto
impl Imouto
{
/// Create a new empty container
pub fn new() -> Self
{

@ -1,5 +1,15 @@
//! Session for each connected user
use super::*;
use tokio::{
sync::{
mpsc,
broadcast,
oneshot,
},
task::JoinHandle,
};
use std::sync::Arc;
use service::SubscribeError;
id_type!(SessionID; "A unique session ID, not bound to a user.");
@ -13,30 +23,123 @@ impl SessionID
}
#[derive(Debug)]
pub struct Session
pub enum SessionResponse
{
Closed(SessionID),
}
#[derive(Debug)]
pub enum SessionCommand
{
Shutdown,
/// Subscribe to the session's message pump.
Subscribe(oneshot::Sender<broadcast::Receiver<SessionResponse>>),
/// Take this websocket connection.
//TODO: websockets
Connect(!),
}
/// Metadata for a session, scored across its service and handle(s)
#[derive(Debug)]
struct SessionMetadata
{
id: SessionID,
user: user::User,
}
/// A single connected session.
/// Hold the service for this session, its ID, and (TODO) its websocket connection.
#[derive(Debug)]
pub struct Session
{
meta: Arc<SessionMetadata>,
tx: mpsc::Sender<SessionCommand>,
rx: broadcast::Receiver<SessionResponse>,
handle: JoinHandle<()>,
}
async fn service_task(meta: Arc<SessionMetadata>, state: state::State, response_pair: (broadcast::Sender<SessionResponse>, mpsc::Receiver<SessionCommand>))
{
let (tx, mut rx) = response_pair;
while let Some(command) = rx.recv().await
{
match command
{
SessionCommand::Shutdown => break,
SessionCommand::Subscribe(out) => ignore!(out.send(tx.subscribe())),
_ => todo!()
}
}
let _ = tx.send(SessionResponse::Closed(meta.id.clone()));
}
impl Session
{
/// Create a new session object
pub fn create(user: user::User) -> Self
pub fn create(user: user::User, state: state::State) -> Self
{
Self {
let id = SessionID::generate();
let meta =Arc::new(SessionMetadata{
user,
id: SessionID::generate(),
id,
});
let (handle, tx, rx) = {
let (s_tx, s_rx) = broadcast::channel(16);
let (r_tx, r_rx) = mpsc::channel(16);
(tokio::spawn(service_task(Arc::clone(&meta), state, (s_tx, r_rx))),
r_tx, s_rx)
};
Self {
meta,
handle,
tx, rx,
}
}
/// The randomly generated ID of this session, irrespective of the user of this session.
#[inline] pub fn session_id(&self) -> &SessionID
{
&self.id
&self.meta.id
}
/// The unique user ID of this session
pub fn user_id(&self) -> user::UserID
{
self.user.id_for_session(self)
self.meta.user.id_for_session(self)
}
/// Ask the service to subscribe to it.
pub async fn subscribe(&mut self) -> Result<broadcast::Receiver<SessionResponse>, SubscribeError>
{
let (tx, rx) = oneshot::channel();
self.tx.send(SessionCommand::Subscribe(tx)).await.map_err(|_| SubscribeError::SenderDropped)?;
rx.await.map_err(|_| SubscribeError::NoResponse)
}
}
impl service::Service for Session
{
type Message = SessionCommand;
type Response = SessionResponse;
#[inline] fn wait_on(self) -> JoinHandle<()> {
self.handle
}
#[inline] fn message_in_ref(&self) -> &mpsc::Sender<Self::Message> {
&self.tx
}
#[inline] fn message_in(&mut self) -> &mut mpsc::Sender<Self::Message> {
&mut self.tx
}
#[inline] fn message_out(&mut self) -> &mut broadcast::Receiver<Self::Response> {
&mut self.rx
}
#[inline] fn is_alive(&self) -> Option<bool> {
Some(Arc::strong_count(&self.meta) > 1)
}
}

Loading…
Cancel
Save