nothing of note

master
Avril 3 years ago
parent c326a63259
commit c491d50315

@ -1,5 +1,12 @@
//! datse server over HTTP
use super::*;
use futures::{
future::{
OptionFuture,
FutureExt,
},
stream,
};
use warp::{
Filter,
hyper::body::Bytes,

@ -1,4 +1,6 @@
//! Handles sessions and maintaining them
//!
//! Each active and authed client has a `Session` object associated with it. Clients can auth as multiple users within these sessions. Sessions expire after being inactive for their ttl
use super::*;
use tokio::{
time::{
@ -11,6 +13,8 @@ use tokio::{
RwLock,
RwLockReadGuard,
RwLockWriteGuard,
watch,
},
};
use std::collections::{
@ -40,6 +44,7 @@ struct Inner
{
id: SessionID,
ttl_send: watch::Sender<Duration>,
ttl: Duration,
users: RwLock<Vec<UserID>>,
}
@ -64,8 +69,63 @@ struct Inner
#[derive(Debug)]
pub struct SessionLock<'a>(Arc<Inner>, PhantomData<&'a Session>);
impl<'a, 'b> AsRef<SessionID> for &'b SessionLock<'a>
where 'a: 'b
{
#[inline] fn as_ref(&self) -> &SessionID
{
&self.0.id
}
}
impl AsRef<SessionID> for SessionID
{
#[inline] fn as_ref(&self) -> &SessionID
{
self
}
}
//impl session (`Inner`) methods on `SessionLock`
impl<'a> SessionLock<'a>
{
pub fn id(&self) -> &SessionID
{
&self.0.id
}
pub fn ttl(&self) -> &Duration
{
&self.0.ttl
}
pub fn users(&self) -> &RwLock<Vec<UserID>>
{
&self.0.users
}
pub async fn add_user(&mut self, id: UserID)
{
self.0.users.write().await.push(id)
}
pub async fn with_users<F>(&self, mut clo: F)
where F: FnMut(&UserID)
{
for x in self.0.users.read().await.iter()
{
clo(x);
}
}
pub async fn has_user(&self, id: impl AsRef<UserID>) -> bool
{
self.0.users.read().await.contains(id.as_ref())
}
pub async fn remove_user(&mut self, id: impl AsRef<UserID>)
{
self.0.users.write().await.retain(move |x| x!= id.as_ref());
}
}
/// A `Session` object.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Session(Weak<Inner>);
impl Session
@ -86,6 +146,18 @@ impl Session
{
self.0.upgrade().map(|x| SessionLock(x, PhantomData))
}
/// Check to see if this session has not yet been destroyed.
#[inline] pub fn is_alive(&self) -> bool
{
self.0.strong_count()>0
}
/// Check to see if this session is still alive, but has been removed from its pool and is awaiting destruction.
pub async fn is_zombie(&self, cont: &Sessions) -> bool
{
OptionFuture::from(self.lock().map(|ses| async move {cont.sessions.read().await.contains_key(&ses.0.id)})).await.unwrap_or(false)
}
}
/// A container of `Session` objects.
@ -110,13 +182,54 @@ impl Sessions
///
/// # Locking
/// When the TTL of this session expires, and the `Sessions` container has not been dropped, then the container's write lock is acquired to remove the session from the container. The task completes immediately after, releasing the lock after the single remove operation.
#[inline] fn detach_ttl_timer(&self, ses: Arc<Inner>)
///
/// # Timeing-out & cancelling
/// The client's session can refresh this expiration timer task by sending a new ttl to its `ttl_send` watch sender.
/// Usually this should be the same ttl which is set within the session's inner, but it can really be anything.
#[inline] fn detach_ttl_timer(&self, ses: Arc<Inner>, mut ttl: watch::Receiver<Duration>)
{
let intern = Arc::downgrade(&self.sessions);
let ses = Arc::downgrade(&ses);
tokio::spawn(async move {
time::delay_for(ses.ttl).await;
if let Some(intern) = intern.upgrade() {
intern.write().await.remove(&ses.id);
let timed_out = if let Some(mut tm) = ttl.recv().await {
loop {
tokio::select! {
_ = time::delay_for(tm) => {
break true; // We timed out
}
nttl = ttl.recv() => {
tm = match nttl {
Some(nttl) => nttl, // Client session refreshed it's ttl
_ => return, // Client session was dropped. Return here because there's no reason to try to upgrade `ses` for removal now, it has been dropped and therefor must have been removed already.
};
}
}
}
} else {
return // Client session was dropped before we could even spawn this task. No reason to try to upgrade `ses` for removal now, it has been dropped and therefor must have been removed already.
};
if let Some(ses) = ses.upgrade() {
if timed_out {
// We timed out
info!("Session {} timed out, removing", ses.id);
} else {
// There was an error / somehow the session was dropped?
error!("Impossible error: TTL timer for session {} failed to communicate with session. Attempting removal anyway", ses.id);
}
if let Some(intern) = intern.upgrade() {
if intern.write().await.remove(&ses.id).is_some() {
trace!("Removed session {} from container, now dropping upgraded reference.", ses.id);
} else{
warn!("Failed to remove valid and alive session {} from alive container, this shouldn't happen and indicates a bug that we're working on the wrong container", ses.id);
}
} else {
// Any still-alive sessions are zombies after we free our upgraded session reference here
trace!("Failed to upgrade reference to container, it has been dropped. Exiting");
}
} else {
trace!("Session was dropped as we were about to remove it.");
}
});
}
@ -145,15 +258,17 @@ impl Sessions
/// If this happens, the session creation attempt should be considered to have failed, the request should return an error, and a log should be outputted informing the user that she configured the Session control TTL incorrectly; this is a configuration error.
pub async fn insert_new_with_id(&mut self, cfg: &Settings, id: SessionID) -> Session
{
let ttl = Duration::from_millis(cfg.auth_token_ttl_millis.jitter());
let (ttl_send, rx) = watch::channel(ttl);
let ses = Arc::new(Inner {
id: id.clone(),
ttl: Duration::from_millis(cfg.auth_token_ttl_millis.jitter()),
ttl, ttl_send,
users: RwLock::new(Vec::new()),
});
let output = Session(Arc::downgrade(&ses));
self.sessions.write().await.insert(id.clone(), Arc::clone(&ses));
self.detach_ttl_timer(ses);
self.detach_ttl_timer(ses, rx);
output
}
@ -168,4 +283,10 @@ impl Sessions
.map(Arc::downgrade)
.map(Session)
}
/// Remove a session with this ID from the container.
pub async fn remove(&mut self, ses: impl AsRef<SessionID>)
{
self.sessions.write().await.remove(ses.as_ref());
}
}

@ -142,7 +142,7 @@ pub struct State
auth_tokens: RwLock<AuthContainer>,
//TODO: user auths, public keys, hashed passwords, etc.
sessions: Sessions,
sessions: RwLock<Sessions>,
settings: Settings,
}
@ -153,11 +153,17 @@ impl State
{
Self {
auth_tokens: RwLock::new(AuthContainer::new()),
sessions: Sessions::new(),
sessions: RwLock::new(Sessions::new()),
backend: RwLock::new(backend),
settings,
}
}
/// The session container
pub fn sessions(&self) -> &RwLock<Sessions>
{
&self.sessions
}
/// The web server settings
pub fn cfg(&self) -> &Settings

Loading…
Cancel
Save