route dispatch with timeout

legacy
Avril 4 years ago
parent ee3cdbf8bb
commit d06dee54eb
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -20,7 +20,7 @@ impl<T> OpaqueDebug<T>
} }
/// Consume into the value /// Consume into the value
#[inline] pub const fn into_inner(self) -> T #[inline] pub fn into_inner(self) -> T
{ {
self.0 self.0
} }

@ -10,10 +10,12 @@ use std::{
}; };
use tokio::{ use tokio::{
sync::{ sync::{
mpsc, mpsc::{
broadcast, self,
notify, error::SendTimeoutError,
},
}, },
time,
}; };
use futures::{ use futures::{
future::{ future::{
@ -96,66 +98,53 @@ impl Router
/// ///
/// # Returns /// # Returns
/// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends. /// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends.
pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef<str>, timeout: impl Future) -> Result<usize, (usize, Vec<Index>)> pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef<str>, timeout: Option<time::Duration>) -> Result<usize, (usize, Vec<Index>)>
{ {
let string = uri.as_ref(); let string = uri.as_ref();
let (tcx, trx) = broadcast::channel(1); let mut success=0usize;
tokio::pin!(timeout); let vec: Vec<_> =
let begin_to = tokio::sync::Barrier::new(self.routes.len()+1); future::join_all(self.routes.iter_mut()
let timeout = async { .filter_map(|(i, (a_method, route, sender))| {
timeout.await; match a_method {
begin_to.wait().await; Some(x) if x != method => None,
tcx.send(()); _ => {
}; if route.is_match(string) {
let mut timeouts = iter::once(trx) trace!("{:?} @{}: -> {}",i, route.as_string(), string);
.chain(iter::repeat_with(|| tcx.subscribe())); let timeout = timeout.clone();
let output = async { macro_rules! send {
let mut success=0usize; () => {
let vec: Vec<_> = match timeout {
future::join_all(self.routes.iter_mut() None => sender.send(string.to_owned()).await
.filter_map(|(i, (a_method, route, sender))| { .map_err(|e| SendTimeoutError::Closed(e.0)),
match a_method { Some(time) => sender.send_timeout(string.to_owned(), time).await
Some(x) if x != method => None,
_ => {
if route.is_match(string) {
trace!("{:?} @{}: -> {}",i, route.as_string(), string);
let mut timeout = timeouts.next().unwrap();
Some(async move {
match tokio::select!{
_ = timeout.recv() => {
None
}
s = sender.send(string.to_owned()) => {
Some(s)
}
} {
Some(Err(er)) => {
warn!("{:?}: Dispatch failed on hooked route for {}", i, er.0);
Err(i)
},
Some(_) => Ok(()),
None => {
warn!("{:?}: Dispatch timed out on hooked route", i);
Err(i)
},
} }
}) }
} else { };
None Some(async move {
} match send!() {
}, Err(SendTimeoutError::Closed(er)) => {
} error!("{:?}: Dispatch failed on hooked route for {}", i, er);
})).await.into_iter() Err(i)
.filter_map(|res| { },
if res.is_ok() { Err(SendTimeoutError::Timeout(er)) => {
success+=1; warn!("{:?}: Dispatch timed out on hooked route for {}", i, er);
} Err(i)
res.err() },
}).collect(); _ => Ok(()),
(success, vec) }
}; })
tokio::pin!(output); } else {
let (_, (success, vec)) = future::join(timeout, output).await; None
}
},
}
})).await.into_iter()
.filter_map(|res| {
if res.is_ok() {
success+=1;
}
res.err()
}).collect();
if vec.len() > 0 { if vec.len() > 0 {
Err((success, vec)) Err((success, vec))
} else { } else {

Loading…
Cancel
Save