failed single move

legacy
Avril 4 years ago
parent af6446b335
commit ee3cdbf8bb
Signed by: flanchan
GPG Key ID: 284488987C31F630

94
Cargo.lock generated

@ -126,6 +126,27 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bzip2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b"
dependencies = [
"bzip2-sys",
"libc",
]
[[package]]
name = "bzip2-sys"
version = "0.1.9+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad3b39a260062fca31f7b0b12f207e8f2590a67d32ec7d59c20484b07ea7285e"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "cc"
version = "1.0.59"
@ -245,6 +266,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "enum-set"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf2bab91676279dd3148f1d28737631e98b3f6c662e444d700d9d6ccf6665df"
[[package]]
name = "env_logger"
version = "0.7.1"
@ -268,6 +295,18 @@ dependencies = [
"once_cell",
]
[[package]]
name = "filetime"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed85775dcc68644b5c950ac06a2b23768d3bc9390464151aaf27136998dcf9e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"winapi 0.3.9",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -400,6 +439,15 @@ dependencies = [
"slab",
]
[[package]]
name = "generational-arena"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1d3b771574f62d0548cee0ad9057857e9fc25d7a3335f140c84f6acd0bf601"
dependencies = [
"cfg-if",
]
[[package]]
name = "generic-array"
version = "0.14.4"
@ -596,6 +644,18 @@ version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3"
[[package]]
name = "libpcre-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ff3dd28ba96d6fe6752882f2f1b25ba8e1646448e79042442347cf3a92a6666"
dependencies = [
"bzip2",
"libc",
"pkg-config",
"tar",
]
[[package]]
name = "log"
version = "0.4.11"
@ -791,6 +851,17 @@ dependencies = [
"subtle",
]
[[package]]
name = "pcre"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f2cf69d4d3e55fac13ccbbf142e650bccbcd9cd2e5b5b69033df9e97d190562"
dependencies = [
"enum-set",
"libc",
"libpcre-sys",
]
[[package]]
name = "pin-project"
version = "0.4.23"
@ -1073,6 +1144,18 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tar"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290"
dependencies = [
"filetime",
"libc",
"redox_syscall",
"xattr",
]
[[package]]
name = "termcolor"
version = "1.1.0"
@ -1325,6 +1408,15 @@ dependencies = [
"winapi-build",
]
[[package]]
name = "xattr"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c"
dependencies = [
"libc",
]
[[package]]
name = "yuurei"
version = "0.1.0"
@ -1337,10 +1429,12 @@ dependencies = [
"cryptohelpers",
"difference",
"futures",
"generational-arena",
"hyper",
"libc",
"log",
"once_cell",
"pcre",
"pin-project",
"pretty_env_logger",
"rustc_version",

@ -7,9 +7,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["experimental_inserter"]
# Use non-allocating array inserter
# Use `memmove` array inserter.
# Depenging on context this can perhaps be slower or perhaps be faster.
# Best to leave it off if you don't have explicit reason to use it.
experimental_inserter = []
[profile.release]
@ -37,6 +37,8 @@ hyper = "0.13.7"
cidr = {version = "0.1.1", features=["serde"]}
pretty_env_logger = "0.4.0"
log = "0.4.11"
pcre = "0.2.3"
generational-arena = "0.2.8"
[build-dependencies]
rustc_version = "0.2"

@ -0,0 +1 @@
Remove move_via_unsafe. Splice is now faster

@ -27,6 +27,18 @@ pub fn get() -> &'static Config
INSTANCE.get().expect("Global config tried to be accessed before it was set")
}
/// Get the global config instance or return default config instance.
pub fn get_or_default() -> Cow<'static, Config>
{
match INSTANCE.get() {
Some(e) => Cow::Borrowed(e),
_ => {
warn!("Config instance not initialised, using default.");
Cow::Owned(Config::default())
}
}
}
/// Config for this run
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Config
@ -39,7 +51,11 @@ pub struct Config
pub accept_mask: Vec<cidr::IpCidr>,
/// Deny all connections in this match, even if previously matched by allow
pub deny_mask: Vec<cidr::IpCidr>,
/// Accept by default
pub accept_default: bool,
/// The number of connections allowed to be processed at once on one route
pub dos_max: usize,
}
impl Default for Config
@ -49,9 +65,11 @@ impl Default for Config
{
Self {
anon_name: Cow::Borrowed("Nanashi"),
listen: SocketAddr::from(([127,0,0,1], 8088)),
listen: SocketAddr::from(([0,0,0,0], 8088)),
accept_mask: vec![cidr::Ipv4Cidr::new(Ipv4Addr::new(127,0,0,1), 32).unwrap().into()],
deny_mask: Vec::new(),
accept_default: false,
dos_max: 16,
}
}
}

@ -2,8 +2,72 @@
use super::*;
use std::{
marker::PhantomData,
fmt,
ops,
};
/// Wrapper to derive debug for types that don't implement it.
#[repr(transparent)]
#[derive(Clone, PartialEq, Eq, Ord,PartialOrd, Hash)]
pub struct OpaqueDebug<T>(T);
impl<T> OpaqueDebug<T>
{
/// Create a new wrapper
#[inline] pub const fn new(value: T) -> Self
{
Self(value)
}
/// Consume into the value
#[inline] pub const fn into_inner(self) -> T
{
self.0
}
}
impl<T> AsRef<T> for OpaqueDebug<T>
{
#[inline] fn as_ref(&self) -> &T
{
&self.0
}
}
impl<T> AsMut<T> for OpaqueDebug<T>
{
#[inline] fn as_mut(&mut self) -> &mut T
{
&mut self.0
}
}
impl<T> ops::Deref for OpaqueDebug<T>
{
type Target = T;
#[inline] fn deref(&self) -> &Self::Target
{
&self.0
}
}
impl<T> ops::DerefMut for OpaqueDebug<T>
{
#[inline] fn deref_mut(&mut self) -> &mut Self::Target
{
&mut self.0
}
}
impl<T> fmt::Debug for OpaqueDebug<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "<opaque value>")
}
}
/// A trait for types that can insert objects at their end.
pub trait BackInserter<T>
{
@ -81,7 +145,7 @@ pub trait VecExt<T>
{
/// Insert many elements with exact size iterator
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T> + std::panic::UnwindSafe;
where Ex: ExactSizeIterator<Item = T>;
/// Insert many elements
fn insert_many<I: IntoIterator<Item =T>>(&mut self, location: usize, slice: I);
@ -93,13 +157,13 @@ impl<T> VecExt<T> for Vec<T>
#[cfg(not(feature="experimental_inserter"))]
#[inline(always)]
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T> + std::panic::UnwindSafe
where Ex: ExactSizeIterator<Item = T>
{
self.insert_many(location, slice)
}
#[cfg(feature="experimental_inserter")]
#[cfg(feature="experimental_inserter")]
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T> + std::panic::UnwindSafe,
where Ex: ExactSizeIterator<Item = T>,
{
#[inline(never)]
#[cold]
@ -164,6 +228,7 @@ impl<T> VecExt<T> for Vec<T>
self.set_len(len + sent);
}
}
#[inline]
fn insert_many<I: IntoIterator<Item =T>>(&mut self, location: usize, slice: I)
{
let slice = slice.into_iter();
@ -240,7 +305,7 @@ mod tests
let mut vec = vec![0,10,11,12];
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_exact(2, span.iter().copied()));
black_box(vec.insert_exact(vec.len()/2, span.iter().copied()));
});
}
#[bench]
@ -250,9 +315,10 @@ mod tests
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_many(2, span.iter().copied()));
black_box(vec.insert_many(vec.len()/2, span.iter().copied()));
});
}
#[cfg(feature="experimental_inserter")]
#[bench]
fn move_via_unsafe(b: &mut Bencher)
@ -260,7 +326,7 @@ mod tests
let mut vec = vec![0,10,11,12];
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_exact(2, span.iter().copied()));
black_box(vec.insert_exact(vec.len()/2, span.iter().copied()));
});
}
}

@ -24,6 +24,7 @@ use ext::*;
mod bytes;
mod suspend;
mod regex;
mod cache;
mod config;
@ -43,7 +44,7 @@ async fn main() -> Result<(), eyre::Report>{
config::set(Default::default());
web::serve(web::State::new()).await?;
web::serve(Default::default()).await?;
info!("Server shutdown gracefully");
/*

@ -0,0 +1,127 @@
//! PCRE
use super::*;
use pcre::{
Pcre,
};
use std::{
sync::{
Mutex,
},
error,
fmt,
};
#[derive(Debug)]
pub struct Regex(Mutex<Pcre>, String);
/// A regex error
#[derive(Debug)]
pub struct Error(pcre::CompilationError);
impl error::Error for Error{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f,"regex failed to compile: ")?;
self.0.fmt(f)
}
}
impl From<pcre::CompilationError> for Error
{
#[inline] fn from(from: pcre::CompilationError) -> Self
{
Self(from)
}
}
pub struct Iter<'a>(Option<pcre::Match<'a>>, usize);
impl<'a> Iterator for Iter<'a>
{
type Item = &'a str;
fn next(&mut self) -> Option<Self::Item> {
if let Some(m) = &mut self.0 {
if self.1 >= m.string_count() {
None
} else {
let string = m.group(self.1);
self.1 +=1;
Some(string)
}
} else {
None
}
}
#[inline] fn size_hint(&self) -> (usize, Option<usize>) {
(self.len(), Some(self.len()))
}
}
impl<'a> std::iter::FusedIterator for Iter<'a>{}
impl<'a> ExactSizeIterator for Iter<'a>
{
fn len(&self) -> usize
{
match &self.0 {
Some(m) => m.string_count(),
None => 0,
}
}
}
impl Regex
{
/// Create a new regular expression from a string
pub fn new<I>(from: impl AsRef<str>, opt: I) -> Result<Self, Error>
where I: IntoIterator<Item=pcre::CompileOption>
{
let string = from.as_ref();
Ok(Self(Mutex::new(Pcre::compile_with_options(string, &opt.into_iter().collect())?), string.to_owned()))
}
/// Test against string
pub fn test(&self, string: impl AsRef<str>) -> bool
{
let mut regex = self.0.lock().unwrap();
regex.exec(string.as_ref()).is_some()
}
/// Get the groups of a match against string
pub fn exec_ref<'a>(&mut self, string: &'a (impl AsRef<str> + ?Sized)) -> Iter<'a>
{
let regex = self.0.get_mut().unwrap();
Iter(regex.exec(string.as_ref()), 0)
}
/// Get the groups as an owned string vector from map against string
pub fn exec(&self, string: impl AsRef<str>) -> Vec<String>
{
let mut regex = self.0.lock().unwrap();
Iter(regex.exec(string.as_ref()), 0).map(std::borrow::ToOwned::to_owned).collect()
}
/// Get the string used to create this regex
#[inline] pub fn as_str(&self) -> &str{
&self.1[..]
}
}
impl From<Regex> for Pcre
{
#[inline] fn from(from: Regex) -> Self
{
from.0.into_inner().unwrap()
}
}
impl fmt::Display for Regex
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.1)
}
}

@ -0,0 +1,44 @@
//! Internal errors
use super::*;
use std::{
fmt,
error,
net::SocketAddr,
};
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
Denied(SocketAddr, bool),
Unknown,
}
impl Error
{
/// Print this error as a warning
#[inline(never)] #[cold] pub fn warn(self) -> Self
{
warn!("{}", self);
self
}
/// Print this error as info
#[inline(never)] #[cold] pub fn info(self) -> Self
{
info!("{}", self);
self
}
}
impl error::Error for Error{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Denied(sock, true) => write!(f, "denied connection (explicit): {}", sock),
Self::Denied(sock, _) => write!(f, "denied connection (implicit): {}", sock),
_ => write!(f, "unknown error"),
}
}
}

@ -19,24 +19,52 @@ use hyper::{
use futures::{
TryStreamExt as _,
};
use cidr::{
Cidr,
};
pub mod error;
pub mod route;
#[derive(Debug)]
pub struct State
{
config: config::Config,
}
impl State
{
pub fn new() -> Self
pub fn new(config: config::Config) -> Self
{
Self{}
Self{
config
}
}
}
impl Default for State
{
#[inline]
fn default() -> Self
{
Self{config: config::get().clone()}
}
}
async fn test(req: Request<Body>) -> Result<Response<Body>, !>
fn mask_contains(mask: &[cidr::IpCidr], value: &std::net::IpAddr) -> bool
{
for mask in mask.iter()
{
if mask.contains(value) {
return true;
}
}
false
}
async fn handle_conn(state: Arc<State>, req: Request<Body>) -> Result<Response<Body>, error::Error>
{
//TODO: Create client, route, and such
Ok(Response::new("Hi".into()))
}
@ -44,31 +72,35 @@ pub async fn serve(state: State) -> Result<(), eyre::Report>
{
let state = Arc::new(state);
let (addr, accept, deny) = {
let cfg =config::get();
(cfg.listen,
cfg.accept_mask.clone(),
cfg.deny_mask.clone())
};
let service = make_service_fn(|conn: &AddrStream| {
let state = Arc::clone(&state);
let remote_addr = conn.remote_addr();
// TODO: Match config allow and deny masks
let remote_ip = remote_addr.ip();
let denied = mask_contains(&state.config.deny_mask[..], &remote_ip);
let allowed = mask_contains(&state.config.accept_mask[..], &remote_ip);
async move {
Ok::<_, !>(service_fn(test))
}
if denied {
Err(error::Error::Denied(remote_addr, true).warn())
} else if allowed || state.config.accept_default {
trace!("Accepted conn: {}", remote_addr);
Ok(service_fn(move |req: Request<Body>| {
handle_conn(Arc::clone(&state), req)
}))
} else {
Err(error::Error::Denied(remote_addr,false).info())
}
}
});
let server = Server::bind(&addr).serve(service)
.with_graceful_shutdown(async {
let server = Server::bind(&state.config.listen).serve(service)
.with_graceful_shutdown(async {
tokio::signal::ctrl_c().await.expect("Failed to catch SIGINT");
info!("Going down for shutdown now!");
});
server.await?;
Ok(())
}

@ -0,0 +1,174 @@
//! Basic router
use super::*;
use hyper::{
Method,
};
use std::{
fmt,
marker::Send,
iter,
};
use tokio::{
sync::{
mpsc,
broadcast,
notify,
},
};
use futures::{
future::{
self,
Future,
},
};
use generational_arena::{
Index,
Arena,
};
pub trait UriRoute
{
fn is_match(&self, uri: &str) -> bool;
#[inline] fn as_string(&self) -> &str
{
""
}
}
impl UriRoute for str
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.eq(uri)
}
#[inline] fn as_string(&self) -> &str {
self
}
}
impl<T: AsRef<str>> UriRoute for T
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.as_ref().eq(uri)
}
#[inline] fn as_string(&self) -> &str {
self.as_ref()
}
}
impl UriRoute for regex::Regex
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.test(uri)
}
#[inline] fn as_string(&self) -> &str {
self.as_str()
}
}
/// Contains a routing table
#[derive(Debug)]
pub struct Router
{
routes: Arena<(Option<Method>, OpaqueDebug<Box<dyn UriRoute + Send + 'static>>, mpsc::Sender<String>)>,
}
impl Router
{
/// Create an empty routing table
pub fn new() -> Self
{
Self{
routes: Arena::new(),
}
}
/// Push a new route into the router.
///
/// # Returns
/// The hook's new index, and the receiver that `dispatch()` sends to.
pub fn hook<Uri: UriRoute + Send + 'static>(&mut self, method: Option<Method>, uri: Uri) -> (Index, mpsc::Receiver<String>)
{
let (tx, rx) = mpsc::channel(config::get_or_default().dos_max);
(self.routes.insert((method, OpaqueDebug::new(Box::new(uri)), tx)), rx)
}
/// Dispatch the URI location across this router, sending to all that match it.
///
/// # Returns
/// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends.
pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef<str>, timeout: impl Future) -> Result<usize, (usize, Vec<Index>)>
{
let string = uri.as_ref();
let (tcx, trx) = broadcast::channel(1);
tokio::pin!(timeout);
let begin_to = tokio::sync::Barrier::new(self.routes.len()+1);
let timeout = async {
timeout.await;
begin_to.wait().await;
tcx.send(());
};
let mut timeouts = iter::once(trx)
.chain(iter::repeat_with(|| tcx.subscribe()));
let output = async {
let mut success=0usize;
let vec: Vec<_> =
future::join_all(self.routes.iter_mut()
.filter_map(|(i, (a_method, route, sender))| {
match a_method {
Some(x) if x != method => None,
_ => {
if route.is_match(string) {
trace!("{:?} @{}: -> {}",i, route.as_string(), string);
let mut timeout = timeouts.next().unwrap();
Some(async move {
match tokio::select!{
_ = timeout.recv() => {
None
}
s = sender.send(string.to_owned()) => {
Some(s)
}
} {
Some(Err(er)) => {
warn!("{:?}: Dispatch failed on hooked route for {}", i, er.0);
Err(i)
},
Some(_) => Ok(()),
None => {
warn!("{:?}: Dispatch timed out on hooked route", i);
Err(i)
},
}
})
} else {
None
}
},
}
})).await.into_iter()
.filter_map(|res| {
if res.is_ok() {
success+=1;
}
res.err()
}).collect();
(success, vec)
};
tokio::pin!(output);
let (_, (success, vec)) = future::join(timeout, output).await;
if vec.len() > 0 {
Err((success, vec))
} else {
Ok(success)
}
}
/// Attempt to unhook these hooks. If one or more of the provided indecies does not exist in the routing table, it is ignored.
pub fn unhook<I>(&mut self, items: I)
where I: IntoIterator<Item = Index>
{
for item in items.into_iter() {
self.routes.remove(item);
}
}
}
Loading…
Cancel
Save