You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yuurei/src/state/session.rs

146 lines
3.3 KiB

//! Session for each connected user
use super::*;
use tokio::{
sync::{
mpsc,
broadcast,
oneshot,
},
task::JoinHandle,
};
use std::sync::Arc;
use crate::service::{self, SubscribeError};
id_type!(SessionID; "A unique session ID, not bound to a user.");
impl SessionID
{
/// Generate a random session ID.
#[inline] fn generate() -> Self
{
Self::id_new()
}
}
#[derive(Debug)]
pub enum SessionResponse
{
Closed(SessionID),
}
#[derive(Debug)]
pub enum SessionCommand
{
Shutdown,
/// Subscribe to the session's message pump.
Subscribe(oneshot::Sender<broadcast::Receiver<SessionResponse>>),
/// Take this websocket connection.
//TODO: websockets
Connect(!),
}
/// Metadata for a session, scored across its service and handle(s)
#[derive(Debug)]
struct SessionMetadata
{
id: SessionID,
user: user::User,
}
/// A single connected session.
/// Hold the service for this session, its ID, and (TODO) its websocket connection.
#[derive(Debug)]
pub struct Session
{
meta: Arc<SessionMetadata>,
tx: mpsc::Sender<SessionCommand>,
rx: broadcast::Receiver<SessionResponse>,
handle: JoinHandle<()>,
}
async fn service_task(meta: Arc<SessionMetadata>, state: state::State, response_pair: (broadcast::Sender<SessionResponse>, mpsc::Receiver<SessionCommand>))
{
let (tx, mut rx) = response_pair;
while let Some(command) = rx.recv().await
{
match command
{
SessionCommand::Shutdown => break,
SessionCommand::Subscribe(out) => ignore!(out.send(tx.subscribe())),
_ => todo!()
}
}
let _ = tx.send(SessionResponse::Closed(meta.id.clone()));
}
impl Session
{
/// Create a new session object
pub fn create(user: user::User, state: state::State) -> Self
{
let id = SessionID::generate();
let meta =Arc::new(SessionMetadata{
user,
id,
});
let (handle, tx, rx) = {
let (s_tx, s_rx) = broadcast::channel(16);
let (r_tx, r_rx) = mpsc::channel(16);
(tokio::spawn(service_task(Arc::clone(&meta), state, (s_tx, r_rx))),
r_tx, s_rx)
};
Self {
meta,
handle,
tx, rx,
}
}
/// The randomly generated ID of this session, irrespective of the user of this session.
#[inline] pub fn session_id(&self) -> &SessionID
{
&self.meta.id
}
/// The unique user ID of this session
pub fn user_id(&self) -> user::UserID
{
self.meta.user.id_for_session(self)
}
/// Ask the service to subscribe to it.
pub async fn subscribe(&mut self) -> Result<broadcast::Receiver<SessionResponse>, SubscribeError>
{
let (tx, rx) = oneshot::channel();
self.tx.send(SessionCommand::Subscribe(tx)).await.map_err(|_| SubscribeError::SenderDropped)?;
rx.await.map_err(|_| SubscribeError::NoResponse)
}
}
// impl service::Service for Session
// {
// type Message = SessionCommand;
// type Response = SessionResponse;
// #[inline] fn wait_on(self) -> JoinHandle<()> {
// self.handle
// }
// #[inline] fn message_in_ref(&self) -> &mpsc::Sender<Self::Message> {
// &self.tx
// }
// #[inline] fn message_in(&mut self) -> &mut mpsc::Sender<Self::Message> {
// &mut self.tx
// }
// #[inline] fn message_out(&mut self) -> &mut broadcast::Receiver<Self::Response> {
// &mut self.rx
// }
// #[inline] fn is_alive(&self) -> Option<bool> {
// Some(Arc::strong_count(&self.meta) > 1)
// }
// }