From ee3cdbf8bbcd82155c54d76fe1b3a544cc75598d Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 16 Sep 2020 18:10:36 +0100 Subject: [PATCH] failed single move --- Cargo.lock | 94 +++++++++++++++++++++++++ Cargo.toml | 8 ++- TODO | 1 + src/config.rs | 22 +++++- src/ext.rs | 80 ++++++++++++++++++++-- src/main.rs | 3 +- src/regex.rs | 127 ++++++++++++++++++++++++++++++++++ src/web/error.rs | 44 ++++++++++++ src/web/mod.rs | 72 ++++++++++++++------ src/web/route.rs | 174 +++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 592 insertions(+), 33 deletions(-) create mode 100644 TODO create mode 100644 src/regex.rs create mode 100644 src/web/error.rs create mode 100644 src/web/route.rs diff --git a/Cargo.lock b/Cargo.lock index 1eb6007..01da0fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,6 +126,27 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "bzip2" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.9+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad3b39a260062fca31f7b0b12f207e8f2590a67d32ec7d59c20484b07ea7285e" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cc" version = "1.0.59" @@ -245,6 +266,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "enum-set" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf2bab91676279dd3148f1d28737631e98b3f6c662e444d700d9d6ccf6665df" + [[package]] name = "env_logger" version = "0.7.1" @@ -268,6 +295,18 @@ dependencies = [ "once_cell", ] +[[package]] +name = "filetime" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed85775dcc68644b5c950ac06a2b23768d3bc9390464151aaf27136998dcf9e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi 0.3.9", +] + [[package]] name = "fnv" version = "1.0.7" @@ -400,6 +439,15 @@ dependencies = [ "slab", ] +[[package]] +name = "generational-arena" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d3b771574f62d0548cee0ad9057857e9fc25d7a3335f140c84f6acd0bf601" +dependencies = [ + "cfg-if", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -596,6 +644,18 @@ version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3" +[[package]] +name = "libpcre-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ff3dd28ba96d6fe6752882f2f1b25ba8e1646448e79042442347cf3a92a6666" +dependencies = [ + "bzip2", + "libc", + "pkg-config", + "tar", +] + [[package]] name = "log" version = "0.4.11" @@ -791,6 +851,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "pcre" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f2cf69d4d3e55fac13ccbbf142e650bccbcd9cd2e5b5b69033df9e97d190562" +dependencies = [ + "enum-set", + "libc", + "libpcre-sys", +] + [[package]] name = "pin-project" version = "0.4.23" @@ -1073,6 +1144,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tar" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290" +dependencies = [ + "filetime", + "libc", + "redox_syscall", + "xattr", +] + [[package]] name = "termcolor" version = "1.1.0" @@ -1325,6 +1408,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "xattr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" +dependencies = [ + "libc", +] + [[package]] name = "yuurei" version = "0.1.0" @@ -1337,10 +1429,12 @@ dependencies = [ "cryptohelpers", "difference", "futures", + "generational-arena", "hyper", "libc", "log", "once_cell", + "pcre", "pin-project", "pretty_env_logger", "rustc_version", diff --git a/Cargo.toml b/Cargo.toml index 710ff17..ea14a10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["experimental_inserter"] - -# Use non-allocating array inserter +# Use `memmove` array inserter. +# Depenging on context this can perhaps be slower or perhaps be faster. +# Best to leave it off if you don't have explicit reason to use it. experimental_inserter = [] [profile.release] @@ -37,6 +37,8 @@ hyper = "0.13.7" cidr = {version = "0.1.1", features=["serde"]} pretty_env_logger = "0.4.0" log = "0.4.11" +pcre = "0.2.3" +generational-arena = "0.2.8" [build-dependencies] rustc_version = "0.2" diff --git a/TODO b/TODO new file mode 100644 index 0000000..6f4f13a --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +Remove move_via_unsafe. Splice is now faster diff --git a/src/config.rs b/src/config.rs index b6accb7..7722b8f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,18 @@ pub fn get() -> &'static Config INSTANCE.get().expect("Global config tried to be accessed before it was set") } +/// Get the global config instance or return default config instance. +pub fn get_or_default() -> Cow<'static, Config> +{ + match INSTANCE.get() { + Some(e) => Cow::Borrowed(e), + _ => { + warn!("Config instance not initialised, using default."); + Cow::Owned(Config::default()) + } + } +} + /// Config for this run #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Config @@ -39,7 +51,11 @@ pub struct Config pub accept_mask: Vec, /// Deny all connections in this match, even if previously matched by allow pub deny_mask: Vec, - + /// Accept by default + pub accept_default: bool, + + /// The number of connections allowed to be processed at once on one route + pub dos_max: usize, } impl Default for Config @@ -49,9 +65,11 @@ impl Default for Config { Self { anon_name: Cow::Borrowed("Nanashi"), - listen: SocketAddr::from(([127,0,0,1], 8088)), + listen: SocketAddr::from(([0,0,0,0], 8088)), accept_mask: vec![cidr::Ipv4Cidr::new(Ipv4Addr::new(127,0,0,1), 32).unwrap().into()], deny_mask: Vec::new(), + accept_default: false, + dos_max: 16, } } } diff --git a/src/ext.rs b/src/ext.rs index 8217e8f..2a7d7c0 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -2,8 +2,72 @@ use super::*; use std::{ marker::PhantomData, + fmt, + ops, }; +/// Wrapper to derive debug for types that don't implement it. +#[repr(transparent)] +#[derive(Clone, PartialEq, Eq, Ord,PartialOrd, Hash)] +pub struct OpaqueDebug(T); + +impl OpaqueDebug +{ + /// Create a new wrapper + #[inline] pub const fn new(value: T) -> Self + { + Self(value) + } + + /// Consume into the value + #[inline] pub const fn into_inner(self) -> T + { + self.0 + } +} + +impl AsRef for OpaqueDebug +{ + #[inline] fn as_ref(&self) -> &T + { + &self.0 + } +} + +impl AsMut for OpaqueDebug +{ + #[inline] fn as_mut(&mut self) -> &mut T + { + &mut self.0 + } +} + +impl ops::Deref for OpaqueDebug +{ + type Target = T; + #[inline] fn deref(&self) -> &Self::Target + { + &self.0 + } +} + +impl ops::DerefMut for OpaqueDebug +{ + #[inline] fn deref_mut(&mut self) -> &mut Self::Target + { + &mut self.0 + } +} + +impl fmt::Debug for OpaqueDebug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "") + } +} + + /// A trait for types that can insert objects at their end. pub trait BackInserter { @@ -81,7 +145,7 @@ pub trait VecExt { /// Insert many elements with exact size iterator fn insert_exact>(&mut self, location: usize, slice: I) - where Ex: ExactSizeIterator + std::panic::UnwindSafe; + where Ex: ExactSizeIterator; /// Insert many elements fn insert_many>(&mut self, location: usize, slice: I); @@ -93,13 +157,13 @@ impl VecExt for Vec #[cfg(not(feature="experimental_inserter"))] #[inline(always)] fn insert_exact>(&mut self, location: usize, slice: I) - where Ex: ExactSizeIterator + std::panic::UnwindSafe + where Ex: ExactSizeIterator { self.insert_many(location, slice) } - #[cfg(feature="experimental_inserter")] + #[cfg(feature="experimental_inserter")] fn insert_exact>(&mut self, location: usize, slice: I) - where Ex: ExactSizeIterator + std::panic::UnwindSafe, + where Ex: ExactSizeIterator, { #[inline(never)] #[cold] @@ -164,6 +228,7 @@ impl VecExt for Vec self.set_len(len + sent); } } + #[inline] fn insert_many>(&mut self, location: usize, slice: I) { let slice = slice.into_iter(); @@ -240,7 +305,7 @@ mod tests let mut vec = vec![0,10,11,12]; let span = [0,1,2,3]; b.iter(|| { - black_box(vec.insert_exact(2, span.iter().copied())); + black_box(vec.insert_exact(vec.len()/2, span.iter().copied())); }); } #[bench] @@ -250,9 +315,10 @@ mod tests let span = [0,1,2,3]; b.iter(|| { - black_box(vec.insert_many(2, span.iter().copied())); + black_box(vec.insert_many(vec.len()/2, span.iter().copied())); }); } + #[cfg(feature="experimental_inserter")] #[bench] fn move_via_unsafe(b: &mut Bencher) @@ -260,7 +326,7 @@ mod tests let mut vec = vec![0,10,11,12]; let span = [0,1,2,3]; b.iter(|| { - black_box(vec.insert_exact(2, span.iter().copied())); + black_box(vec.insert_exact(vec.len()/2, span.iter().copied())); }); } } diff --git a/src/main.rs b/src/main.rs index ed8a1d6..a228cd0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ use ext::*; mod bytes; mod suspend; +mod regex; mod cache; mod config; @@ -43,7 +44,7 @@ async fn main() -> Result<(), eyre::Report>{ config::set(Default::default()); - web::serve(web::State::new()).await?; + web::serve(Default::default()).await?; info!("Server shutdown gracefully"); /* diff --git a/src/regex.rs b/src/regex.rs new file mode 100644 index 0000000..bb52a5f --- /dev/null +++ b/src/regex.rs @@ -0,0 +1,127 @@ +//! PCRE +use super::*; +use pcre::{ + Pcre, +}; +use std::{ + sync::{ + Mutex, + }, + error, + fmt, +}; + +#[derive(Debug)] +pub struct Regex(Mutex, String); + +/// A regex error +#[derive(Debug)] +pub struct Error(pcre::CompilationError); + +impl error::Error for Error{} + +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f,"regex failed to compile: ")?; + self.0.fmt(f) + } +} + +impl From for Error +{ + #[inline] fn from(from: pcre::CompilationError) -> Self + { + Self(from) + } +} + +pub struct Iter<'a>(Option>, usize); + +impl<'a> Iterator for Iter<'a> +{ + type Item = &'a str; + fn next(&mut self) -> Option { + if let Some(m) = &mut self.0 { + if self.1 >= m.string_count() { + None + } else { + let string = m.group(self.1); + self.1 +=1; + Some(string) + } + } else { + None + } + } + + #[inline] fn size_hint(&self) -> (usize, Option) { + (self.len(), Some(self.len())) + } +} + +impl<'a> std::iter::FusedIterator for Iter<'a>{} +impl<'a> ExactSizeIterator for Iter<'a> +{ + fn len(&self) -> usize + { + match &self.0 { + Some(m) => m.string_count(), + None => 0, + } + } +} + +impl Regex +{ + /// Create a new regular expression from a string + pub fn new(from: impl AsRef, opt: I) -> Result + where I: IntoIterator + { + let string = from.as_ref(); + Ok(Self(Mutex::new(Pcre::compile_with_options(string, &opt.into_iter().collect())?), string.to_owned())) + } + + /// Test against string + pub fn test(&self, string: impl AsRef) -> bool + { + let mut regex = self.0.lock().unwrap(); + regex.exec(string.as_ref()).is_some() + } + + /// Get the groups of a match against string + pub fn exec_ref<'a>(&mut self, string: &'a (impl AsRef + ?Sized)) -> Iter<'a> + { + let regex = self.0.get_mut().unwrap(); + Iter(regex.exec(string.as_ref()), 0) + } + + /// Get the groups as an owned string vector from map against string + pub fn exec(&self, string: impl AsRef) -> Vec + { + let mut regex = self.0.lock().unwrap(); + Iter(regex.exec(string.as_ref()), 0).map(std::borrow::ToOwned::to_owned).collect() + } + + /// Get the string used to create this regex + #[inline] pub fn as_str(&self) -> &str{ + &self.1[..] + } +} + +impl From for Pcre +{ + #[inline] fn from(from: Regex) -> Self + { + from.0.into_inner().unwrap() + } +} + +impl fmt::Display for Regex +{ + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "{}", self.1) + } +} diff --git a/src/web/error.rs b/src/web/error.rs new file mode 100644 index 0000000..5833135 --- /dev/null +++ b/src/web/error.rs @@ -0,0 +1,44 @@ +//! Internal errors +use super::*; +use std::{ + fmt, + error, + net::SocketAddr, +}; + +#[derive(Debug)] +#[non_exhaustive] +pub enum Error { + Denied(SocketAddr, bool), + Unknown, +} + +impl Error +{ + /// Print this error as a warning + #[inline(never)] #[cold] pub fn warn(self) -> Self + { + warn!("{}", self); + self + } + + /// Print this error as info + #[inline(never)] #[cold] pub fn info(self) -> Self + { + info!("{}", self); + self + } +} + +impl error::Error for Error{} +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Denied(sock, true) => write!(f, "denied connection (explicit): {}", sock), + Self::Denied(sock, _) => write!(f, "denied connection (implicit): {}", sock), + _ => write!(f, "unknown error"), + } + } +} diff --git a/src/web/mod.rs b/src/web/mod.rs index b6a0ef1..bf892aa 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -19,24 +19,52 @@ use hyper::{ use futures::{ TryStreamExt as _, }; +use cidr::{ + Cidr, +}; + +pub mod error; +pub mod route; #[derive(Debug)] pub struct State { - + config: config::Config, } impl State { - pub fn new() -> Self + pub fn new(config: config::Config) -> Self { - Self{} + Self{ + config + } } } +impl Default for State +{ + #[inline] + fn default() -> Self + { + Self{config: config::get().clone()} + } +} -async fn test(req: Request) -> Result, !> +fn mask_contains(mask: &[cidr::IpCidr], value: &std::net::IpAddr) -> bool { + for mask in mask.iter() + { + if mask.contains(value) { + return true; + } + } + false +} + +async fn handle_conn(state: Arc, req: Request) -> Result, error::Error> +{ + //TODO: Create client, route, and such Ok(Response::new("Hi".into())) } @@ -44,31 +72,35 @@ pub async fn serve(state: State) -> Result<(), eyre::Report> { let state = Arc::new(state); - let (addr, accept, deny) = { - let cfg =config::get(); - (cfg.listen, - cfg.accept_mask.clone(), - cfg.deny_mask.clone()) - }; - let service = make_service_fn(|conn: &AddrStream| { let state = Arc::clone(&state); let remote_addr = conn.remote_addr(); - // TODO: Match config allow and deny masks - - + let remote_ip = remote_addr.ip(); + let denied = mask_contains(&state.config.deny_mask[..], &remote_ip); + let allowed = mask_contains(&state.config.accept_mask[..], &remote_ip); async move { - Ok::<_, !>(service_fn(test)) - } + if denied { + Err(error::Error::Denied(remote_addr, true).warn()) + } else if allowed || state.config.accept_default { + trace!("Accepted conn: {}", remote_addr); + + Ok(service_fn(move |req: Request| { + handle_conn(Arc::clone(&state), req) + })) + } else { + Err(error::Error::Denied(remote_addr,false).info()) + } + } }); - let server = Server::bind(&addr).serve(service) - .with_graceful_shutdown(async { + let server = Server::bind(&state.config.listen).serve(service) + .with_graceful_shutdown(async { tokio::signal::ctrl_c().await.expect("Failed to catch SIGINT"); + info!("Going down for shutdown now!"); }); - + server.await?; - + Ok(()) } diff --git a/src/web/route.rs b/src/web/route.rs new file mode 100644 index 0000000..14b76d2 --- /dev/null +++ b/src/web/route.rs @@ -0,0 +1,174 @@ +//! Basic router +use super::*; +use hyper::{ + Method, +}; +use std::{ + fmt, + marker::Send, + iter, +}; +use tokio::{ + sync::{ + mpsc, + broadcast, + notify, + }, +}; +use futures::{ + future::{ + self, + Future, + }, +}; +use generational_arena::{ + Index, + Arena, +}; + +pub trait UriRoute +{ + fn is_match(&self, uri: &str) -> bool; + #[inline] fn as_string(&self) -> &str + { + "" + } +} + +impl UriRoute for str +{ + #[inline] fn is_match(&self, uri: &str) -> bool { + self.eq(uri) + } + #[inline] fn as_string(&self) -> &str { + self + } +} + +impl> UriRoute for T +{ + #[inline] fn is_match(&self, uri: &str) -> bool { + self.as_ref().eq(uri) + } + #[inline] fn as_string(&self) -> &str { + self.as_ref() + } +} + +impl UriRoute for regex::Regex +{ + #[inline] fn is_match(&self, uri: &str) -> bool { + self.test(uri) + } + #[inline] fn as_string(&self) -> &str { + self.as_str() + } +} + +/// Contains a routing table +#[derive(Debug)] +pub struct Router +{ + routes: Arena<(Option, OpaqueDebug>, mpsc::Sender)>, +} + +impl Router +{ + /// Create an empty routing table + pub fn new() -> Self + { + Self{ + routes: Arena::new(), + } + } + + /// Push a new route into the router. + /// + /// # Returns + /// The hook's new index, and the receiver that `dispatch()` sends to. + pub fn hook(&mut self, method: Option, uri: Uri) -> (Index, mpsc::Receiver) + { + let (tx, rx) = mpsc::channel(config::get_or_default().dos_max); + (self.routes.insert((method, OpaqueDebug::new(Box::new(uri)), tx)), rx) + } + + /// Dispatch the URI location across this router, sending to all that match it. + /// + /// # Returns + /// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends. + pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef, timeout: impl Future) -> Result)> + { + let string = uri.as_ref(); + let (tcx, trx) = broadcast::channel(1); + tokio::pin!(timeout); + let begin_to = tokio::sync::Barrier::new(self.routes.len()+1); + let timeout = async { + timeout.await; + begin_to.wait().await; + tcx.send(()); + }; + let mut timeouts = iter::once(trx) + .chain(iter::repeat_with(|| tcx.subscribe())); + let output = async { + let mut success=0usize; + let vec: Vec<_> = + future::join_all(self.routes.iter_mut() + .filter_map(|(i, (a_method, route, sender))| { + match a_method { + Some(x) if x != method => None, + _ => { + if route.is_match(string) { + trace!("{:?} @{}: -> {}",i, route.as_string(), string); + let mut timeout = timeouts.next().unwrap(); + Some(async move { + match tokio::select!{ + _ = timeout.recv() => { + None + } + s = sender.send(string.to_owned()) => { + Some(s) + } + } { + Some(Err(er)) => { + warn!("{:?}: Dispatch failed on hooked route for {}", i, er.0); + Err(i) + }, + Some(_) => Ok(()), + None => { + warn!("{:?}: Dispatch timed out on hooked route", i); + Err(i) + }, + } + }) + } else { + None + } + }, + } + })).await.into_iter() + .filter_map(|res| { + if res.is_ok() { + success+=1; + } + res.err() + }).collect(); + (success, vec) + }; + tokio::pin!(output); + let (_, (success, vec)) = future::join(timeout, output).await; + if vec.len() > 0 { + Err((success, vec)) + } else { + Ok(success) + } + } + + /// Attempt to unhook these hooks. If one or more of the provided indecies does not exist in the routing table, it is ignored. + pub fn unhook(&mut self, items: I) + where I: IntoIterator + { + for item in items.into_iter() { + self.routes.remove(item); + } + } +}