From d06dee54eb36b6834b317d3e15712469711f051a Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 16 Sep 2020 18:18:21 +0100 Subject: [PATCH] route dispatch with timeout --- src/ext.rs | 2 +- src/web/route.rs | 109 +++++++++++++++++++++-------------------------- 2 files changed, 50 insertions(+), 61 deletions(-) diff --git a/src/ext.rs b/src/ext.rs index 2a7d7c0..93f716b 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -20,7 +20,7 @@ impl OpaqueDebug } /// Consume into the value - #[inline] pub const fn into_inner(self) -> T + #[inline] pub fn into_inner(self) -> T { self.0 } diff --git a/src/web/route.rs b/src/web/route.rs index 14b76d2..638cbf3 100644 --- a/src/web/route.rs +++ b/src/web/route.rs @@ -10,10 +10,12 @@ use std::{ }; use tokio::{ sync::{ - mpsc, - broadcast, - notify, + mpsc::{ + self, + error::SendTimeoutError, + }, }, + time, }; use futures::{ future::{ @@ -96,66 +98,53 @@ impl Router /// /// # Returns /// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends. - pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef, timeout: impl Future) -> Result)> + pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef, timeout: Option) -> Result)> { let string = uri.as_ref(); - let (tcx, trx) = broadcast::channel(1); - tokio::pin!(timeout); - let begin_to = tokio::sync::Barrier::new(self.routes.len()+1); - let timeout = async { - timeout.await; - begin_to.wait().await; - tcx.send(()); - }; - let mut timeouts = iter::once(trx) - .chain(iter::repeat_with(|| tcx.subscribe())); - let output = async { - let mut success=0usize; - let vec: Vec<_> = - future::join_all(self.routes.iter_mut() - .filter_map(|(i, (a_method, route, sender))| { - match a_method { - Some(x) if x != method => None, - _ => { - if route.is_match(string) { - trace!("{:?} @{}: -> {}",i, route.as_string(), string); - let mut timeout = timeouts.next().unwrap(); - Some(async move { - match tokio::select!{ - _ = timeout.recv() => { - None - } - s = sender.send(string.to_owned()) => { - Some(s) - } - } { - Some(Err(er)) => { - warn!("{:?}: Dispatch failed on hooked route for {}", i, er.0); - Err(i) - }, - Some(_) => Ok(()), - None => { - warn!("{:?}: Dispatch timed out on hooked route", i); - Err(i) - }, + let mut success=0usize; + let vec: Vec<_> = + future::join_all(self.routes.iter_mut() + .filter_map(|(i, (a_method, route, sender))| { + match a_method { + Some(x) if x != method => None, + _ => { + if route.is_match(string) { + trace!("{:?} @{}: -> {}",i, route.as_string(), string); + let timeout = timeout.clone(); + macro_rules! send { + () => { + match timeout { + None => sender.send(string.to_owned()).await + .map_err(|e| SendTimeoutError::Closed(e.0)), + Some(time) => sender.send_timeout(string.to_owned(), time).await } - }) - } else { - None - } - }, - } - })).await.into_iter() - .filter_map(|res| { - if res.is_ok() { - success+=1; - } - res.err() - }).collect(); - (success, vec) - }; - tokio::pin!(output); - let (_, (success, vec)) = future::join(timeout, output).await; + } + }; + Some(async move { + match send!() { + Err(SendTimeoutError::Closed(er)) => { + error!("{:?}: Dispatch failed on hooked route for {}", i, er); + Err(i) + }, + Err(SendTimeoutError::Timeout(er)) => { + warn!("{:?}: Dispatch timed out on hooked route for {}", i, er); + Err(i) + }, + _ => Ok(()), + } + }) + } else { + None + } + }, + } + })).await.into_iter() + .filter_map(|res| { + if res.is_ok() { + success+=1; + } + res.err() + }).collect(); if vec.len() > 0 { Err((success, vec)) } else {