//! Session for each connected user use super::*; use tokio::{ sync::{ mpsc, broadcast, oneshot, }, task::JoinHandle, }; use std::sync::Arc; use crate::service::{self, SubscribeError}; id_type!(SessionID; "A unique session ID, not bound to a user."); impl SessionID { /// Generate a random session ID. #[inline] fn generate() -> Self { Self::id_new() } } #[derive(Debug)] pub enum SessionResponse { Closed(SessionID), } #[derive(Debug)] pub enum SessionCommand { Shutdown, /// Subscribe to the session's message pump. Subscribe(oneshot::Sender>), /// Take this websocket connection. //TODO: websockets Connect(!), } /// Metadata for a session, scored across its service and handle(s) #[derive(Debug)] struct SessionMetadata { id: SessionID, user: user::User, } /// A single connected session. /// Hold the service for this session, its ID, and (TODO) its websocket connection. #[derive(Debug)] pub struct Session { meta: Arc, tx: mpsc::Sender, rx: broadcast::Receiver, handle: JoinHandle<()>, } async fn service_task(meta: Arc, state: state::State, response_pair: (broadcast::Sender, mpsc::Receiver)) { let (tx, mut rx) = response_pair; while let Some(command) = rx.recv().await { match command { SessionCommand::Shutdown => break, SessionCommand::Subscribe(out) => ignore!(out.send(tx.subscribe())), _ => todo!() } } let _ = tx.send(SessionResponse::Closed(meta.id.clone())); } impl Session { /// Create a new session object pub fn create(user: user::User, state: state::State) -> Self { let id = SessionID::generate(); let meta =Arc::new(SessionMetadata{ user, id, }); let (handle, tx, rx) = { let (s_tx, s_rx) = broadcast::channel(16); let (r_tx, r_rx) = mpsc::channel(16); (tokio::spawn(service_task(Arc::clone(&meta), state, (s_tx, r_rx))), r_tx, s_rx) }; Self { meta, handle, tx, rx, } } /// The randomly generated ID of this session, irrespective of the user of this session. #[inline] pub fn session_id(&self) -> &SessionID { &self.meta.id } /// The unique user ID of this session pub fn user_id(&self) -> user::UserID { self.meta.user.id_for_session(self) } /// Ask the service to subscribe to it. pub async fn subscribe(&mut self) -> Result, SubscribeError> { let (tx, rx) = oneshot::channel(); self.tx.send(SessionCommand::Subscribe(tx)).await.map_err(|_| SubscribeError::SenderDropped)?; rx.await.map_err(|_| SubscribeError::NoResponse) } } // impl service::Service for Session // { // type Message = SessionCommand; // type Response = SessionResponse; // #[inline] fn wait_on(self) -> JoinHandle<()> { // self.handle // } // #[inline] fn message_in_ref(&self) -> &mpsc::Sender { // &self.tx // } // #[inline] fn message_in(&mut self) -> &mut mpsc::Sender { // &mut self.tx // } // #[inline] fn message_out(&mut self) -> &mut broadcast::Receiver { // &mut self.rx // } // #[inline] fn is_alive(&self) -> Option { // Some(Arc::strong_count(&self.meta) > 1) // } // }