initial commit

clone-session-service-objects
Avril 4 years ago
parent 66748ff65d
commit 0e68aa4f7f
Signed by: flanchan
GPG Key ID: 284488987C31F630

7
.gitignore vendored

@ -1,2 +1,9 @@
/target
*~
# Added by cargo
#
# already existing elements were commented out
#/target

1500
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,49 +1,10 @@
[package]
name = "yuurei"
description = "ghost text messaging"
version = "0.1.0"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
# Use `memmove` array inserter.
# Depenging on context this can perhaps be slower or perhaps be faster.
# Best to leave it off if you don't have explicit reason to use it.
experimental_inserter = []
[profile.release]
opt-level = 3
lto = "fat"
codegen-units = 1
panic = "unwind"
[dependencies]
tokio = {version = "0.2", features=["full"]}
async-trait = "0.1"
chrono = {version = "0.4.15", features=["serde"]}
uuid = { version = "0.8.1", features = ["v4", "serde"] }
once_cell = "1.4"
crypto = {version = "1.1.2", package= "cryptohelpers", features= ["serialise", "async", "sha256"]}
libc = "0.2.76"
byteorder = "1.3"
serde_cbor = "0.11.1"
serde = {version = "1.0", features= ["derive"]}
futures = "0.3"
pin-project = "0.4.23"
difference = "2.0.0"
color-eyre = "0.5.2"
hyper = "0.13.7"
cidr = {version = "0.1.1", features=["serde"]}
pretty_env_logger = "0.4.0"
log = "0.4.11"
pcre = "0.2.3"
generational-arena = "0.2.8"
khash = {version = "2.0.0", default-features=false}
hex-literal = "0.3.1"
lazy_static = "1.4.0"
smallmap = "1.1"
maud = "0.22.0"
[build-dependencies]
rustc_version = "0.2"

@ -1,2 +0,0 @@
* yuurei
Anonymous real-time single page textboard

@ -1 +0,0 @@
Remove move_via_unsafe. Splice is now faster

@ -1,24 +0,0 @@
extern crate rustc_version;
use rustc_version::{version, version_meta, Channel};
fn main() {
// Assert we haven't travelled back in time
assert!(version().unwrap().major >= 1);
// Set cfg flags depending on release channel
match version_meta().unwrap().channel {
Channel::Stable => {
println!("cargo:rustc-cfg=stable");
}
Channel::Beta => {
println!("cargo:rustc-cfg=beta");
}
Channel::Nightly => {
println!("cargo:rustc-cfg=nightly");
}
Channel::Dev => {
println!("cargo:rustc-cfg=dev");
}
}
}

@ -1,108 +0,0 @@
use std::{
fmt,
error,
};
use libc::c_void;
pub unsafe fn refer<'a, T>(src: &'a T) -> &'a [u8]
where T: ?Sized
{
std::slice::from_raw_parts(src as *const T as *const u8, std::mem::size_of_val(src))
}
pub unsafe fn refer_mut<'a, T>(src: &'a mut T) -> &'a mut [u8]
where T: ?Sized
{
std::slice::from_raw_parts_mut(src as *mut T as *mut u8, std::mem::size_of_val(src))
}
pub unsafe fn derefer<'a, T>(src: &'a [u8]) -> &'a T
{
assert!(src.len() >= std::mem::size_of::<T>());
&*(&src[0] as *const u8 as *const T)
}
pub unsafe fn derefer_mut<'a, T>(src: &'a mut [u8]) -> &'a mut T
{
assert!(src.len() >= std::mem::size_of::<T>());
&mut *(&mut src[0] as *mut u8 as *mut T)
}
mod old {
use super::*;
/// Represents a type that can be converted from bytes to itself
pub trait FromBytes: Sized
{
type Error;
fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Self,Self::Error>;
}
/// Represents a type that can be converted into bytes
pub trait IntoBytes:Sized
{
fn into_bytes(self) -> Box<[u8]>;
}
impl<T: Into<Box<[u8]>>> IntoBytes for T
{
#[inline] fn into_bytes(self) -> Box<[u8]>
{
self.into()
}
}
impl FromBytes for Vec<u8>
{
type Error = !;
#[inline] fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Self,Self::Error>
{
Ok(Vec::from(bytes.as_ref()))
}
}
impl FromBytes for Box<[u8]>
{
type Error = !;
#[inline] fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Self,Self::Error>
{
Ok(Vec::from(bytes.as_ref()).into_boxed_slice())
}
}
/// The error used when a `FromBytes` conversion fails because the buffer was not the correct size
#[derive(Debug)]
pub struct SizeError;
impl error::Error for SizeError{}
impl fmt::Display for SizeError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f,"buffer was not the correct size")
}
}
}
/// Copy slice of bytes only
///
/// # Notes
/// `dst` and `src` must not overlap. See [move_slice].
pub fn copy_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memcpy(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
}
sz
}
/// Move slice of bytes only
///
/// # Notes
/// `dst` and `src` can overlap.
pub fn move_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memmove(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
}
sz
}

@ -1,3 +0,0 @@
//! Caching interfaces
//TODO

@ -1,94 +0,0 @@
use super::*;
use std::{
borrow::Cow,
net::{
SocketAddr,
Ipv4Addr,
},
};
use tokio::{
time,
};
use cidr::Cidr;
//TODO: Use tokio Watcher instead, to allow hotreloading?
use once_cell::sync::OnceCell;
static INSTANCE: OnceCell<Config> = OnceCell::new();
/// Set the global config instance.
/// This can only be done once for the duration of the program's runtime
pub fn set(conf: Config)
{
INSTANCE.set(conf).expect("Failed to set global config state: Already set");
}
/// Get the global config instance.
/// `set()` must have been previously called or this function panics.
pub fn get() -> &'static Config
{
INSTANCE.get().expect("Global config tried to be accessed before it was set")
}
/// Get the global config instance or return default config instance.
pub fn get_or_default() -> Cow<'static, Config>
{
match INSTANCE.get() {
Some(e) => Cow::Borrowed(e),
_ => {
warn!("Config instance not initialised, using default.");
Cow::Owned(Config::default())
}
}
}
/// A salt for secure tripcode
pub type Salt = [u8; khash::salt::SIZE];
/// Config for this run
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Config
{
/// Name for nanashi
pub anon_name: Cow<'static, str>,
/// The server will listen on this address
pub listen: SocketAddr,
/// Accept all connections in this match
pub accept_mask: Vec<cidr::IpCidr>,
/// Deny all connections in this match, even if previously matched by allow
pub deny_mask: Vec<cidr::IpCidr>,
/// Accept by default
pub accept_default: bool,
/// The number of connections allowed to be processed at once on one route
pub dos_max: usize,
/// The timeout for any routing dispatch
pub req_timeout_local: Option<time::Duration>,
/// The timeout for *all* routing dispatchs
pub req_timeout_global: Option<time::Duration>,
/// The salt for secure tripcode
pub tripcode_salt: Salt,
}
impl Default for Config
{
#[inline]
fn default() -> Self
{
Self {
anon_name: Cow::Borrowed("Nanashi"),
listen: SocketAddr::from(([0,0,0,0], 8088)),
accept_mask: vec![cidr::Ipv4Cidr::new(Ipv4Addr::new(127,0,0,1), 32).unwrap().into()],
deny_mask: Vec::new(),
accept_default: false,
dos_max: 16,
req_timeout_local: Some(time::Duration::from_millis(500)),
req_timeout_global: Some(time::Duration::from_secs(1)),
tripcode_salt: hex!("770d64c6bf46da1a812d067fd224bbe671b7607d3274265abfcdda45c44ca3c1"),
}
}
}

@ -1,333 +0,0 @@
//! Extensions
use super::*;
use std::{
marker::PhantomData,
fmt,
ops,
};
/// Wrapper to derive debug for types that don't implement it.
#[repr(transparent)]
#[derive(Clone, PartialEq, Eq, Ord,PartialOrd, Hash)]
pub struct OpaqueDebug<T>(T);
impl<T> OpaqueDebug<T>
{
/// Create a new wrapper
#[inline] pub const fn new(value: T) -> Self
{
Self(value)
}
/// Consume into the value
#[inline] pub fn into_inner(self) -> T
{
self.0
}
}
impl<T> AsRef<T> for OpaqueDebug<T>
{
#[inline] fn as_ref(&self) -> &T
{
&self.0
}
}
impl<T> AsMut<T> for OpaqueDebug<T>
{
#[inline] fn as_mut(&mut self) -> &mut T
{
&mut self.0
}
}
impl<T> ops::Deref for OpaqueDebug<T>
{
type Target = T;
#[inline] fn deref(&self) -> &Self::Target
{
&self.0
}
}
impl<T> ops::DerefMut for OpaqueDebug<T>
{
#[inline] fn deref_mut(&mut self) -> &mut Self::Target
{
&mut self.0
}
}
impl<T> fmt::Debug for OpaqueDebug<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "<opaque value>")
}
}
/// A trait for types that can insert objects at their end.
pub trait BackInserter<T>
{
/// Insert an object at the end of this container
fn push_back(&mut self, value: T);
}
impl<T> BackInserter<T> for Vec<T>
{
#[inline]
fn push_back(&mut self, value: T)
{
self.push(value)
}
}
/// Absracts a closure for `BackInserter<T>`.
pub struct BackInsertPass<T,F>(F, PhantomData<T>)
where F: FnMut(T);
impl<T,F: FnMut(T)> BackInsertPass<T,F>
{
/// Create a new instance with this closure
#[inline] pub fn new(func: F) -> Self
{
Self(func, PhantomData)
}
}
impl<T, F: FnMut(T)> BackInserter<T> for BackInsertPass<T,F>
{
#[inline] fn push_back(&mut self, value: T)
{
self.0(value)
}
}
/// A `BackInserter<T>` that will only add a max capacity of items before it starts dropping input to its `push_back` function.
pub struct CappedBackInserter<'a, T>(&'a mut T, usize, usize)
where T: BackInserter<T>;
impl<'a, T> CappedBackInserter<'a, T>
where T: BackInserter<T>
{
/// Create a new instance with this max capacity
#[inline] pub fn new(from: &'a mut T, cap: usize) -> Self
{
Self(from, 0, cap)
}
/// The number of elements pushed so far
#[inline] pub fn len(&self) -> usize {
self.1
}
/// The max number of elemnts allowed to be pushed
#[inline] pub fn cap(&self) -> usize {
self.2
}
}
impl<'a, T> BackInserter<T> for CappedBackInserter<'a, T>
where T: BackInserter<T>
{
#[inline] fn push_back(&mut self, value: T)
{
if self.1 < self.2 {
self.0.push_back(value);
self.1+=1;
}
}
}
pub trait VecExt<T>
{
/// Insert many elements with exact size iterator
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T>;
/// Insert many elements
fn insert_many<I: IntoIterator<Item =T>>(&mut self, location: usize, slice: I);
}
impl<T> VecExt<T> for Vec<T>
{
#[cfg(not(feature="experimental_inserter"))]
#[inline(always)]
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T>
{
self.insert_many(location, slice)
}
#[cfg(feature="experimental_inserter")]
fn insert_exact<Ex, I: IntoIterator<Item = T, IntoIter = Ex>>(&mut self, location: usize, slice: I)
where Ex: ExactSizeIterator<Item = T>,
{
#[inline(never)]
#[cold]
fn panic_len(l1: usize, l2: usize) -> !
{
panic!("Location must be in range 0..{}, got {}", l1,l2)
}
#[inline(never)]
#[cold]
fn inv_sz() -> !
{
panic!("ExactSizeIterator returned invalid size");
}
if location >= self.len() {
panic_len(self.len(), location);
}
let mut slice = slice.into_iter();
let slen = slice.len();
match slen {
0 => return,
1 => {
self.insert(location, slice.next().unwrap());
return
},
_ => (),
};
self.reserve(slice.len());
unsafe {
let this = self.as_mut_ptr().add(location);
let len = self.len();
let rest = std::mem::size_of::<T>() * (location..len).len();
libc::memmove(this.add(slen) as *mut libc::c_void, this as *mut libc::c_void, rest);
let mut sent=0;
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut this = this;
for item in slice {
if sent >= slen {
inv_sz();
}
this.write(item);
this = this.add(1);
sent+=1;
}
if sent != slen {
inv_sz();
}
})) {
Err(e) => {
// memory at (location+sent)..slen is now invalid, move the old one back before allowing unwind to contine
libc::memmove(this.add(sent) as *mut libc::c_void, this.add(slen) as *mut libc::c_void, rest);
self.set_len(len + sent);
std::panic::resume_unwind(e)
},
_ => (),
}
self.set_len(len + sent);
}
}
#[inline]
fn insert_many<I: IntoIterator<Item =T>>(&mut self, location: usize, slice: I)
{
let slice = slice.into_iter();
match slice.size_hint() {
(0, Some(0)) | (0, None) => (),
(_, Some(bound)) | (bound, _) => self.reserve(bound),
};
self.splice(location..location, slice);
//let splice = self.split_off(location);
//self.extend(slice.chain(splice.into_iter()));
/*
// shift everything across, replacing with the new values
let splice: Vec<_> = self.splice(location.., slice).collect();
// ^ -- this allocation bugs me, but we violate aliasing rules if we don't somehow collect it before adding it back in so...
// add tail back
self.extend(splice);*/
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn vec_insert_exact()
{
let mut vec = vec![0,1,2,8,9,10];
vec.insert_exact(3, [3,4,5,6, 7].iter().copied());
assert_eq!(&vec[..],
&[0,1,2,3,4,5,6,7,8,9,10]
);
}
#[test]
fn vec_insert_exact_nt()
{
macro_rules! string {
($str:literal) => (String::from($str));
}
let mut vec = vec![
string!("Hello"),
string!("world"),
string!("foo"),
string!("uhh"),
];
let vec2 = vec![
string!("Hello"),
string!("world"),
string!("hi"),
string!("hello"),
string!("foo"),
string!("uhh"),
];
vec.insert_exact(2, vec![string!("hi"), string!("hello")]);
assert_eq!(&vec[..], &vec2[..]);
}
#[cfg(nightly)]
mod benchmatks
{
use super::super::*;
use test::{
Bencher, black_box,
};
#[cfg(not(feature="experimental_inserter"))]
#[bench]
fn move_exact(b: &mut Bencher)
{
let mut vec = vec![0,10,11,12];
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_exact(vec.len()/2, span.iter().copied()));
});
}
#[bench]
fn move_via_splice(b: &mut Bencher)
{
let mut vec = vec![0,10,11,12];
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_many(vec.len()/2, span.iter().copied()));
});
}
#[cfg(feature="experimental_inserter")]
#[bench]
fn move_via_unsafe(b: &mut Bencher)
{
let mut vec = vec![0,10,11,12];
let span = [0,1,2,3];
b.iter(|| {
black_box(vec.insert_exact(vec.len()/2, span.iter().copied()));
});
}
}
}

@ -1,165 +0,0 @@
//! HTML rendering
use super::*;
use std::{
fmt::{
self,
Display,
},
io,
};
pub struct HtmlTokenStream<W: io::Write>(W);
impl<W: io::Write> HtmlTokenStream<W>
{
fn push<T: DisplayHtml>(&mut self, token: T) -> io::Result<()>
{
write!(&mut self.0, "{}", display_html(&token)) //TODO: How to do async version of this?
}
}
/// Coerce a `DisplayHtml` value into an opaque implementor of `fmt::Display`.
#[inline] pub fn display_html<'a, T: DisplayHtml+?Sized>(from: &'a T) -> impl fmt::Display +'a
{
struct Wrap<'a, T: DisplayHtml+?Sized>(&'a T);
impl<'a,T> fmt::Display for Wrap<'a,T>
where T: DisplayHtml+?Sized
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
DisplayHtml::fmt(self.0, f)
}
}
Wrap(from)
}
/// Coerse a `Display` value into a `DisplayHtml` by HTML-escaping its output.
///
/// To output raw HTML, see `Safe<T>`.
#[inline] pub fn escape_html<'a, T: Display+?Sized>(from: &'a T) -> impl DisplayHtml +'a
{
struct Wrap<'a, T: Display+?Sized>(&'a T);
struct FilterStream<T,F>(T, F)
where F: FnMut(&mut T, char) -> fmt::Result;
impl<T,F> fmt::Write for FilterStream<T,F>
where T: fmt::Write,
F: FnMut(&mut T, char) -> fmt::Result
{
fn write_str(&mut self, s: &str) -> fmt::Result
{
for c in s.chars()
{
self.1(&mut self.0, c)?;
}
Ok(())
}
fn write_char(&mut self, c: char) -> fmt::Result
{
self.1(&mut self.0, c)
}
}
impl<'a,T> DisplayHtml for Wrap<'a,T>
where T: Display +?Sized
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
use smallmap::Map;
lazy_static! {
static ref HTML_EQ_TABLE: Map<char, &'static str> = {
let mut map = Map::new();
map.insert('&', "&amp;");
map.insert('<', "&lt;");
map.insert('>', "&gt;");
map.insert('"', "&quot;");
map
};
}
let mut output = FilterStream(f, |output, c| {
output.write_str(match HTML_EQ_TABLE.get(&c) {
Some(s) => s,
None => {
return output.write_char(c);
},
})
});
use fmt::Write;
write!(&mut output, "{}", self.0)
}
}
Wrap(from)
}
/// HTML-safe wrapper around
#[derive(Debug, PartialEq, Eq)]
pub struct Safe<T: Display>(T);
impl<T: Display> Safe<T>
{
/// Create a new instance with this value
#[inline] pub fn new(value: T) -> Self
{
Self(value)
}
/// Consume this instance into its inner value
pub fn into_inner(self) -> T
{
self.0
}
/// Gets a reference to the inner value
pub fn display(&self) -> &T
{
&self.0
}
}
impl<T: Display> From<T> for Safe<T>
{
#[inline] fn from(from: T) -> Self
{
Self(from)
}
}
/// A trait for displaying as html.
///
/// Any type implementing `Display` will implement `DisplayHtml` by HTML-escaping the output from it's display formatter.
/// You can also use `Safe<T>` to render as raw HTML.
pub trait DisplayHtml
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result;
#[inline] fn to_html(&self) -> String
{
display_html(self).to_string()
}
}
impl<T: Display> DisplayHtml for Safe<T>
{
#[inline] fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result
{
self.0.fmt(fmt)
}
#[inline] fn to_html(&self) -> String
{
self.0.to_string()
}
}
impl<T: ?Sized> DisplayHtml for T
where T: Display
{
#[inline] fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result
{
escape_html(self).fmt(fmt)
}
}

@ -1,66 +0,0 @@
//! Handles post and user identity.
use super::*;
use std::{
fmt,
};
use uuid::Uuid;
/// A globally unique post identifier.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Serialize, Deserialize)]
pub struct PostID(Uuid);
impl PostID
{
/// Generate a new `PostID`.
pub fn new() -> Self
{
Self(Uuid::new_v4())
}
}
impl fmt::Display for PostID
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
self.0.fmt(f) //TODO: Change to use kana-hash.
}
}
/// A user's data, if set.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
#[derive(Serialize, Deserialize)]
pub struct User
{
pub name: Option<String>,
pub email: Option<String>,
pub tripcode: Option<tripcode::Tripcode>,
}
impl User
{
/// The name string of this instance, or the default name.
pub fn name_str(&self) -> &str
{
self.name.as_ref().map(|x| &x[..]).unwrap_or(&config::get().anon_name[..])
}
/// The email string of this instance, if it is set
pub fn email_str(&self) -> Option<&str>
{
self.email.as_ref().map(|x| &x[..])
}
// No str for tripcode because it's not just a string (yet)
/// Anon with no detials.
pub const fn anon() -> Self
{
Self {
name: None,
email: None,
tripcode: None,
}
}
}

@ -1,92 +1,3 @@
#![cfg_attr(nightly, feature(option_unwrap_none))]
#![cfg_attr(nightly, feature(never_type))]
#![allow(dead_code)]
#![cfg_attr(nightly, feature(test))]
#[cfg(all(nightly, test))] extern crate test;
#[macro_use] extern crate log;
use async_trait::async_trait;
use serde::{
Serialize, Deserialize,
};
use color_eyre::{
eyre,
Help,
SectionExt,
};
use futures::{
FutureExt as _,
prelude::*,
};
use hex_literal::hex;
use lazy_static::lazy_static;
macro_rules! cfg_debug {
(if {$($if:tt)*} else {$($else:tt)*}) => {
{
#[cfg(debug_assertions)] {
$($if)*
}
#[cfg(not(debug_assertions))] {
$($else)*
}
}
};
(if $if:expr) => {
{
#[cfg(debug_assertions)] $if
}
};
(else $else:expr) => {
{
#[cfg(not(debug_assertions))] $else
}
};
}
mod ext;
use ext::*;
mod bytes;
mod suspend;
mod regex;
mod cache;
mod config;
mod tripcode;
mod identity;
mod post;
mod state;
mod html;
mod web;
#[tokio::main]
async fn main() -> Result<(), eyre::Report>{
color_eyre::install()?;
pretty_env_logger::init();
trace!("Setting default config");
config::set(Default::default());
web::serve(Default::default()).await?;
info!("Server shutdown gracefully");
/*
let mut vec = vec![vec![1, 0, 0],
vec![0, 0, 1]];
let span = vec![vec![0, 1, 0],
vec![1, 0, 1],
vec![0, 1, 0]];
for _ in 0..10000 {
vec.insert_exact(1, span.iter().cloned());
}*/
Ok(())
fn main() {
println!("Hello, world!");
}

@ -1,156 +0,0 @@
//! Structure for updating posts in-place
use super::*;
use std::{
fmt,
hash::{
Hash,Hasher,
},
};
use chrono::prelude::*;
/// Represents all valid timestamps for a post.
///
/// Usually in UTC, pls keep it in Utc...
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Serialize, Deserialize)]
pub struct PostTimestamp
{
pub opened: DateTime<Utc>,
pub closed: Option<DateTime<Utc>>,
pub last_edit: DateTime<Utc>,
}
impl PostTimestamp
{
/// Create a new timestamp for a new post
pub fn new() -> Self
{
let now = Utc::now();
Self {
opened: now.clone(),
closed: None,
last_edit: now,
}
}
}
impl Hash for PostTimestamp {
fn hash<H: Hasher>(&self, state: &mut H) {
self.opened.hash(state);
self.closed.hash(state);
self.last_edit.hash(state);
}
}
impl fmt::Display for PostTimestamp
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
writeln!(f, " Opened - {}", self.opened)?;
writeln!(f, " Edited - {}", self.last_edit)?;
write!(f, " Closed - ")?;
if let Some(closed) = &self.closed {
writeln!(f, "{}", closed)?;
} else {
writeln!(f, "Stil open")?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
/// A closed and finished post. The inverse of `Imouto`.
pub struct Static
{
id: identity::PostID,
user: identity::User,
title: Option<String>,
karada: String,
timestamp: PostTimestamp,
/// Hash of the rest of the post data. . . .
hash: crypto::sha256::Sha256Hash,
}
impl Static
{
/// Compute the hash for this instance
pub fn into_hashed(mut self) -> Self
{
self.hash = Default::default();
let bytes = serde_cbor::to_vec(&self).expect("Failed to serialise static post to CBOR");
Self {
hash: crypto::sha256::compute_slice(&bytes),
..self
}
}
/// Validate the internal hash of this instance
pub fn validate_hash(&self) -> bool
{
self.clone().into_hashed().hash == self.hash
}
}
use suspend::{
Suspendable,
SuspendStream,
};
#[async_trait]
impl Suspendable for Static
{
async fn suspend<S: SuspendStream +Send+Sync + ?Sized>(self, into: &mut S) -> Result<(), suspend::Error>
{
let mut output = suspend::Object::new();
output.insert("post-static", self);
into.set_object(output).await
}
async fn load<S: SuspendStream +Send+Sync+ ?Sized>(from: &mut S) -> Result<Self, suspend::Error>
{
let mut input = from.get_object().await?.ok_or(suspend::Error::BadObject)?;
input.try_get("post-static").ok_or(suspend::Error::BadObject)
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[tokio::test]
async fn static_post_ser()
{
let mut output = suspend::MemorySuspendStream::new();
let post1 = Static {
id: identity::PostID::new(),
user: Default::default(),
title: None,
karada: "hello world".to_owned(),
timestamp: PostTimestamp::new(),
hash: Default::default(),
}.into_hashed();
let post1c = post1.clone();
assert!(post1.validate_hash());
post1.suspend(&mut output).await.expect("Suspend failed");
let buffer = output.buffer().clone();
let post2 = Static::load(&mut output).await.expect("Suspend load failed");
assert_eq!(post1c, post2);
assert!(post1c.validate_hash());
assert!(post2.validate_hash());
let buffer2 = suspend::oneshot(post2.clone()).await.expect("Oneshot failed");
assert_eq!(&buffer2[..], &buffer[..]);
let post3: Static = suspend::single(buffer2).await.expect("Single failed");
assert!(post3.validate_hash());
assert_eq!(post3, post2);
}
}

@ -1,127 +0,0 @@
//! PCRE
use super::*;
use pcre::{
Pcre,
};
use std::{
sync::{
Mutex,
},
error,
fmt,
};
#[derive(Debug)]
pub struct Regex(Mutex<Pcre>, String);
/// A regex error
#[derive(Debug)]
pub struct Error(pcre::CompilationError);
impl error::Error for Error{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f,"regex failed to compile: ")?;
self.0.fmt(f)
}
}
impl From<pcre::CompilationError> for Error
{
#[inline] fn from(from: pcre::CompilationError) -> Self
{
Self(from)
}
}
pub struct Iter<'a>(Option<pcre::Match<'a>>, usize);
impl<'a> Iterator for Iter<'a>
{
type Item = &'a str;
fn next(&mut self) -> Option<Self::Item> {
if let Some(m) = &mut self.0 {
if self.1 >= m.string_count() {
None
} else {
let string = m.group(self.1);
self.1 +=1;
Some(string)
}
} else {
None
}
}
#[inline] fn size_hint(&self) -> (usize, Option<usize>) {
(self.len(), Some(self.len()))
}
}
impl<'a> std::iter::FusedIterator for Iter<'a>{}
impl<'a> ExactSizeIterator for Iter<'a>
{
fn len(&self) -> usize
{
match &self.0 {
Some(m) => m.string_count(),
None => 0,
}
}
}
impl Regex
{
/// Create a new regular expression from a string
pub fn new<I>(from: impl AsRef<str>, opt: I) -> Result<Self, Error>
where I: IntoIterator<Item=pcre::CompileOption>
{
let string = from.as_ref();
Ok(Self(Mutex::new(Pcre::compile_with_options(string, &opt.into_iter().collect())?), string.to_owned()))
}
/// Test against string
pub fn test(&self, string: impl AsRef<str>) -> bool
{
let mut regex = self.0.lock().unwrap();
regex.exec(string.as_ref()).is_some()
}
/// Get the groups of a match against string
pub fn exec_ref<'a>(&mut self, string: &'a (impl AsRef<str> + ?Sized)) -> Iter<'a>
{
let regex = self.0.get_mut().unwrap();
Iter(regex.exec(string.as_ref()), 0)
}
/// Get the groups as an owned string vector from map against string
pub fn exec(&self, string: impl AsRef<str>) -> Vec<String>
{
let mut regex = self.0.lock().unwrap();
Iter(regex.exec(string.as_ref()), 0).map(std::borrow::ToOwned::to_owned).collect()
}
/// Get the string used to create this regex
#[inline] pub fn as_str(&self) -> &str{
&self.1[..]
}
}
impl From<Regex> for Pcre
{
#[inline] fn from(from: Regex) -> Self
{
from.0.into_inner().unwrap()
}
}
impl fmt::Display for Regex
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.1)
}
}

@ -1,8 +0,0 @@
//! Container state for all posts
use super::*;
/// Contains all posts in an instance, open or closed.
pub struct Oneesan
{
}

@ -1,255 +0,0 @@
//! Deltas and applying them
use super::*;
use difference::{
Changeset,
Difference,
};
const MAX_SINGLE_DELTA_SIZE: usize = 14;
const DELTA_BREAK: &str = "";
/// Infer all deltas needed to be sequentially applied to `orig` to transform it to `new`, return the number inserted into `output`.
pub fn infer_deltas<T: BackInserter<Delta> + ?Sized>(output: &mut T, orig: &str, new: &str) -> usize
{
let set = Changeset::new(orig, new, DELTA_BREAK);
let mut done=0;
let mut position =0;
for diff in set.diffs.into_iter() {
match diff {
Difference::Same(string) => {
position += string.len();
},
Difference::Rem(string) => {
output.push_back(Delta {
location: position,
kind: DeltaKind::RemoveAhead{span_len: string.len()},
});
done+=1;
},
Difference::Add(string) => {
let mut passer = BackInsertPass::new(|(span, span_len)| {
output.push_back(Delta {
location: position,
kind: DeltaKind::Insert {
span, span_len,
},
});
position+= usize::from(span_len);
done+=1;
});
delta_span_many(&mut passer, string.chars());
},
}
}
done
}
/// Create a delta span from an input iterator.
///
/// This function can take no more than `min(255, MAX_SINGLE_DELTA_SIZE)` chars from the input. The number of chars inserted is also returned as `u8`.
pub(super) fn delta_span<I>(from: I) -> ([char; MAX_SINGLE_DELTA_SIZE], u8)
where I: IntoIterator<Item = char>
{
let mut output: [char; MAX_SINGLE_DELTA_SIZE] = Default::default();
let mut sz: u8 = 0;
for (d, s) in output.iter_mut().zip(from.into_iter().take(usize::from(u8::MAX)))
{
*d = s;
sz += 1;
}
(output, sz)
}
/// Create as many delta spans needed from an input iterator.
///
/// This function can take as many inputs as needed, and outputs the needed amount of spans into `into`, and then returns the number added.
pub(super) fn delta_span_many<T: BackInserter<([char; MAX_SINGLE_DELTA_SIZE], u8)> + ?Sized, I>(into: &mut T, from: I) -> usize
where I: IntoIterator<Item = char>
{
let mut iter = from.into_iter();
let mut added=0;
loop {
match delta_span(&mut iter) {
(_, 0) => break,
other => into.push_back(other),
}
added+=1;
}
added
}
/// Information about the delta to be applied in `Delta`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum DeltaKind
{
/// Append to the end of body. Equivilant to `Insert` with a location at `karada.scape.len()` (the end of the buffer).
Append{
span: [char; MAX_SINGLE_DELTA_SIZE],
span_len: u8,
},
/// Insert `span_len` chars from `span` into body starting *at* `location` and moving ahead
Insert{
span: [char; MAX_SINGLE_DELTA_SIZE],
span_len: u8,
},
/// Remove `span_len` chars from `location`.
RemoveAhead{
span_len: usize,
},
/// Remove `span_len` chars up to but not including `location`.
RemoveBehind{
span_len: usize,
},
/// Remove everything from `location` to the end.
Truncate,
/// Remove everything from 0 to `location`.
Shift,
/// Remove char at `location`
RemoveSingle,
/// Remove entire post body
Clear,
}
/// A delta to apply to `Karada`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct Delta
{
/// Location to insert into. This is the INclusive range of: 0..=(karada.scape.len()).
///
/// Insertions off the end of the buffer are to be appened instead.
location: usize,
/// The kind of delta to insert
kind: DeltaKind,
}
/// Static assertion: `MAX_SINGLE_DELTA_SIZE` can fit into `u8`.
const _: [u8;(MAX_SINGLE_DELTA_SIZE < (!0u8 as usize)) as usize] = [0];
impl Delta
{
pub fn insert(&self, inserter: &mut MessageSpan)
{
match self.kind {
DeltaKind::Append{span, span_len} => {
inserter.extend_from_slice(&span[..usize::from(span_len)]);
},
DeltaKind::Insert{span, span_len} => {
let span = &span[..usize::from(span_len)];
if self.location == inserter.len() {
inserter.extend_from_slice(span);
} else if span.len() == 1 {
inserter.insert(self.location, span[0]);
} else if span.len() > 1 {
inserter.insert_exact(self.location, span.iter().copied());
}
},
DeltaKind::RemoveAhead{span_len} => {
inserter.drain(self.location..(self.location+span_len));
},
DeltaKind::RemoveBehind{span_len} => {
let span_st = self.location.checked_sub(span_len).unwrap_or(0);
inserter.drain(span_st..self.location);
},
DeltaKind::RemoveSingle => {
inserter.remove(self.location);
},
DeltaKind::Clear => inserter.clear(),
DeltaKind::Truncate => inserter.truncate(self.location),
DeltaKind::Shift => ((),inserter.drain(..self.location)).0,
}
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn infer_and_insert()
{
let orig = "the quick brown fox jumped over the lazy dog !!!!";
let new = "the quick brown dog jumped over the lazy fox twice";
let mut deltas = Vec::new();
infer_deltas(&mut deltas, orig, new);
println!("{:#?}", deltas);
let output: String = {
let mut scape: Vec<_> = orig.chars().collect();
for delta in deltas.into_iter() {
delta.insert(&mut scape);
}
scape.into_iter().collect()
};
assert_eq!(&output[..], &new[..]);
}
macro_rules! insert {
($expects:literal; $start:literal, $ins:literal, $where:expr) => {
{
let mut message: Vec<char> = $start.chars().collect();
let delta = {
let (span, span_len) = delta_span($ins.chars());
Delta {
location: $where,
kind: DeltaKind::Insert{span, span_len},
}
};
println!("from: {:?}", message);
println!("delta: {:?}", delta);
delta.insert(&mut message);
assert_eq!(&message.into_iter().collect::<String>()[..], $expects);
}
};
}
#[test]
fn insert_body()
{
insert!("123456789"; "126789", "345", 2);
}
#[test]
fn insert_end()
{
insert!("123456789"; "1289", "34567", 2);
}
#[test]
fn insert_end_rev()
{
insert!("123456789"; "1234569", "78", 6);
}
#[test]
fn insert_begin_rev()
{
insert!("123456789"; "1456789", "23", 1);
}
#[test]
fn insert_begin()
{
insert!("123456789"; "789", "123456", 0);
}
#[test]
fn insert_end_f()
{
insert!("123456789"; "123", "456789", 3);
}
#[test]
fn insert_end_f_rev()
{
insert!("123456789"; "1234567", "89", 7);
}
#[test]
fn insert_single()
{
insert!("123456789"; "12346789", "5",4);
}
}

@ -1,26 +0,0 @@
//! Local state change error
use super::*;
use std::{
error,
fmt,
};
/// Local state updating errors
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
BroadcastUpdate,
Unknown,
}
impl error::Error for Error{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::BroadcastUpdate => write!(f, "failed to broadcast state body update"),
_ => write!(f, "unknown error"),
}
}
}

@ -1,79 +0,0 @@
//! Hosts `Imouto`'s `Kokoro`.
use super::*;
use std::{
pin::Pin,
task::{
Poll,
Context,
},
mem,
};
use pin_project::{
pin_project,
};
use futures::{
future::Future,
};
use tokio::{
task,
sync::{
oneshot,
mpsc,
},
};
#[derive(Debug)]
pub enum Worker
{
Attached(task::JoinHandle<Result<(), eyre::Report>>),
/// Worker has not been attached yet
Suspended(Kokoro),
/// Worker filed to spawn,
Crashed,
}
/// Host this `Kokoro`
async fn work(mut kokoro: Kokoro, mut recv: mpsc::Receiver<Vec<Delta>>) -> Result<(), eyre::Report>
{
while let Some(new) = recv.recv().await {
kokoro.apply(new).await?;
}
Ok(())
}
impl Worker
{
/// Host this suspended state worker with a channel to receive updates from.
///
/// # Panics
/// If we are not in the `Suspended` state.
pub fn host(&mut self, recv: mpsc::Receiver<Vec<Delta>>) -> WorkerHandle
{
match mem::replace(self, Self::Crashed) {
Self::Suspended(kokoro) => {
let (tx, rx) = oneshot::channel();
*self = Self::Attached(task::spawn(async move {
let vl = work(kokoro, recv).await;
tx.send(()).unwrap();
vl
}));
WorkerHandle(rx)
},
_ => panic!("Attempted to host non-suspended worker"),
}
}
}
/// A handle on a spawned worker
#[pin_project]
pub struct WorkerHandle(#[pin] oneshot::Receiver<()>);
impl Future for WorkerHandle
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.0.poll(cx).map(|x| x.unwrap())
}
}

@ -1,101 +0,0 @@
//! Mutating post body
use super::*;
/// A message that can be mutated by deltas.
pub type MessageSpan = Vec<char>;
/// Contains post deltas and an intermediate representation of the still-open post body.
/// Created and modified with `Kokoro` worker instances.
///
/// This should not be created by itself, instead `Kokoro` should create instances of this, so that it can retain the `watch::Sender` and other such things.
#[derive(Debug)]
pub struct Karada
{
/// The post body so far as a vector of `char`s.
pub(super) scape: Arc<RwLock<MessageSpan>>,
/// All applied deltas so far. Last applied one is at the end.
pub(super) deltas: Arc<RwLock<Vec<Delta>>>,
/// the latest render of the whole body string. Updated whenever a delta(s) are applied atomically.
pub(super) current_body: watch::Receiver<String>,
}
impl Karada
{
/// Clone the body string
pub fn body(&self) -> String
{
self.current_body.borrow().to_owned()
}
/// Create the deltas required to atomically transform current body into `new`.
///
/// Return the number of deltas added to `output`.
pub fn create_body_deltas<T: BackInserter<Delta> + ?Sized, U: AsRef<str>>(&self, output: &mut T, new: U) -> usize
{
let body = self.body();
let new = new.as_ref();
delta::infer_deltas(output, &body[..], new)
}
/// Consume this instance into a suspension
///
/// This will only acquire locks if needed, but since they might be needed, it must be awaited in case of `Kokoro` instances potentially owning the data.
pub async fn into_suspended(self) -> Suspension
{
let scape: String = {
let scape = self.scape;
match Arc::try_unwrap(scape) { //try to unwrap if possible, to avoid acquiring useless lock
Ok(scape) => scape.into_inner().into_iter().collect(),
Err(scape) => scape.read().await.iter().collect(),
}
};
let deltas: Vec<Delta> = {
let deltas = self.deltas;
match Arc::try_unwrap(deltas) {
Ok(deltas) => deltas.into_inner(),
Err(deltas) => deltas.read().await.clone(),
}
};
Suspension{scape,deltas}
}
pub(super) fn from_suspended(susp: Suspension, current_body: watch::Receiver<String>) -> Self
{
Self {
scape: Arc::new(RwLock::new(susp.scape.chars().collect())),
deltas: Arc::new(RwLock::new(susp.deltas)),
current_body,
}
}
}
/// Suspension of [`Karada`](Karada).
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Suspension
{
pub(super) scape: String,
pub(super) deltas: Vec<Delta>,
}
use suspend::{
Suspendable,
SuspendStream,
};
#[async_trait]
impl Suspendable for Suspension
{
async fn suspend<S: SuspendStream +Send+Sync + ?Sized>(self, into: &mut S) -> Result<(), suspend::Error>
{
let mut output = suspend::Object::new();
output.insert("post-dynamic", self);
into.set_object(output).await
}
async fn load<S: SuspendStream +Send+Sync+ ?Sized>(from: &mut S) -> Result<Self, suspend::Error>
{
let mut input = from.get_object().await?.ok_or(suspend::Error::BadObject)?;
input.try_get("post-dynamic").ok_or(suspend::Error::BadObject)
}
}

@ -1,128 +0,0 @@
//! Handles updating posts
use super::*;
use std::{
sync::Arc,
};
use tokio::{
sync::{
RwLock,
watch,
},
};
mod karada;
pub use karada::*;
mod delta;
pub use delta::*;
mod work;
pub use work::*;
//mod host;
//pub use host::*;
mod error;
pub use error::Error as LocalError;
/// An open, as yet unfinied post
#[derive(Debug)]
pub struct Imouto
{
id: identity::PostID,
user: identity::User,
title: Option<String>,
karada: Karada,
worker: Kokoro,
timestamp: post::PostTimestamp, //Note that `closed` should always be `None` in `Imouto`. We use this for post lifetimes and such
}
use suspend::{
Suspendable,
SuspendStream,
};
#[async_trait]
impl Suspendable for Imouto
{
async fn suspend<S: SuspendStream +Send+Sync + ?Sized>(self, into: &mut S) -> Result<(), suspend::Error>
{
let mut output = suspend::Object::new();
output.insert("id", self.id);
output.insert("user", self.user);
output.insert("title", self.title);
output.insert("body", self.worker.into_suspended().await); // consume worker if possible
//output.insert("hash", self.hash);
output.insert("timestamp", self.timestamp);
into.set_object(output).await
}
async fn load<S: SuspendStream +Send+Sync+ ?Sized>(from: &mut S) -> Result<Self, suspend::Error>
{
let mut input = from.get_object().await?.ok_or(suspend::Error::BadObject)?;
macro_rules! get {
($name:literal) => {
input.try_get($name).ok_or_else(|| suspend::Error::MissingObject(std::borrow::Cow::Borrowed($name)))?
};
}
let id = get!("id");
let user = get!("user");
let title = get!("title");
let body = get!("body");
//let hash = get!("hash");
let timestamp = get!("timestamp");
let (karada, worker) = {
let mut kokoro = Kokoro::from_suspended(body);
(kokoro.spawn().unwrap(), kokoro)
};
Ok(Self {
id,
user,
title,
karada,
worker,
//hash,
timestamp,
})
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[tokio::test]
async fn basic_ser()
{
let mut output = suspend::MemorySuspendStream::new();
let mut lolis = std::iter::repeat_with(|| {
let (karada, kokoro) = {
let mut kokoro = Kokoro::new();
(kokoro.spawn().unwrap(), kokoro)
};
Imouto {
id: identity::PostID::new(),
user: Default::default(),
title: None,
karada,
worker: kokoro,
timestamp: post::PostTimestamp::new()
}
});
let imouto = lolis.next().unwrap();
imouto.suspend(&mut output).await.expect("Suspension failed");
let imouto2 = lolis.next().unwrap();
let imouto3 = Imouto::load(&mut output).await.expect("Load failed");
assert_eq!(imouto2.karada.body(), imouto3.karada.body());
}
}

@ -1,116 +0,0 @@
//! Worker that mutates `Kokoro`.
use super::*;
use tokio::{
sync::{
watch,
oneshot,
},
};
/// Handles working on `Karada` instances.
#[derive(Debug)]
pub struct Kokoro
{
scape: Arc<RwLock<MessageSpan>>,
deltas: Arc<RwLock<Vec<Delta>>>,
update_body: watch::Sender<String>,
body_recv: Option<watch::Receiver<String>>,
}
impl Kokoro
{
/// Create a new instance. This instance can spawn a `Karada` instance.
pub fn new() -> Self
{
let (tx, rx) = watch::channel(String::new());
Self {
scape: Arc::new(RwLock::new(MessageSpan::new())),
deltas: Arc::new(RwLock::new(Vec::new())),
update_body: tx,
body_recv: Some(rx),
}
}
/// Create a new worker instance from a suspension of `Karada`.
pub fn from_suspended(susp: Suspension) -> Self
{
let span = susp.scape.chars().collect();
let (tx, rx) = watch::channel(susp.scape);
Self {
scape: Arc::new(RwLock::new(span)),
deltas: Arc::new(RwLock::new(susp.deltas)),
update_body: tx,
body_recv: Some(rx),
}
}
/// Consume this instance into a suspension
///
/// This will only acquire locks if needed, but since they might be needed, it must be awaited in case of `Kokoro` instances potentially owning the data.
pub async fn into_suspended(self) -> Suspension
{
let scape: String = {
let scape = self.scape;
match Arc::try_unwrap(scape) { //try to unwrap if possible, to avoid acquiring useless lock
Ok(scape) => scape.into_inner().into_iter().collect(),
Err(scape) => scape.read().await.iter().collect(),
}
};
let deltas: Vec<Delta> = {
let deltas = self.deltas;
match Arc::try_unwrap(deltas) {
Ok(deltas) => deltas.into_inner(),
Err(deltas) => deltas.read().await.clone(),
}
};
Suspension{scape,deltas}
}
/// Spawn one `Karada` instance. If this has already been called, it returns `None`.
pub fn spawn(&mut self) -> Option<Karada>
{
self.body_recv.take()
.map(|current_body| {
Karada {
scape: Arc::clone(&self.scape),
deltas: Arc::clone(&self.deltas),
current_body,
}
})
}
/// Spawn a clone `Karada` into a new instance. This function can be called many times to yield `Karada` instances that are all identical and controlled by this instance.
///
/// # Panics
/// If `spawn` was previously called.
pub fn spawn_clone(&self) -> Karada
{
Karada {
scape: Arc::clone(&self.scape),
deltas: Arc::clone(&self.deltas),
current_body: self.body_recv.as_ref().unwrap().clone(),
}
}
/// Apply a delta to this instance.
pub async fn apply<I: IntoIterator<Item= Delta>>(&mut self, delta: I) -> Result<(), error::Error>
{
let (mut scape, mut deltas) = tokio::join!{
self.scape.write(),
self.deltas.write(),
};
for delta in delta.into_iter() {
// Only start mutating now that both locks are writable. Is this needed, or can we do the mutation concurrently?
delta.insert(scape.as_mut());
deltas.push(delta);
}
self.update_body.broadcast(scape.iter().collect()).map_err(|_| error::Error::BroadcastUpdate)
}
}

@ -1,29 +0,0 @@
//! Structures for handling global and local post state.
use super::*;
mod global;
pub use global::*;
mod local;
pub use local::*;
#[derive(Debug)]
/// An open or closed post
pub enum Post
{
Open(Imouto),
Closed(post::Static),
}
impl Post
{
/// Is this post open
#[inline] pub fn is_open(&self) -> bool
{
if let Self::Open(_) = self {
true
} else {
false
}
}
}

@ -1,533 +0,0 @@
//! Handles suspending and reloading posts
use super::*;
use std::{
marker::{
PhantomData,
Unpin,
Send,
Sync,
},
io,
collections::HashMap,
ops::Range,
borrow::{
Cow,
Borrow,
},
error,
fmt,
};
use tokio::{
prelude::*,
io::{
AsyncRead,
AsyncWrite,
},
};
/// Represents an opaque bytes stream of serialised data to insert into `SuspendStream`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Object
{
data: Vec<u8>,
data_instances: HashMap<Cow<'static, str>, Range<usize>>,
}
fn calc_len<T,I,U>(from: I) -> U
where I: IntoIterator<Item = T>,
T: Borrow<Range<U>>,
U: std::cmp::Ord + Default + Copy,
{
let mut max = Default::default();
for i in from.into_iter()
{
let i = i.borrow().end;
if i > max {
max = i
}
}
max
}
impl Object
{
/// Create a new empty `Object`.
#[inline] pub fn new() -> Self
{
Self{data: Vec::new(), data_instances: HashMap::new()}
}
/// Are the internal mappings of this object valid?
pub fn validate(&self) -> bool
{
let len = self.data.len();
for (_, range) in self.data_instances.iter() {
if range.end > len {//range.start + range.end > len {
return false;
}
}
true
}
/// Try to get a value of type `T` from `name`.
pub fn get<'a, T>(&'a mut self, name: impl Borrow<str>) -> Option<T>
where T: Deserialize<'a>
{
if let Some(bytes) = self.get_bytes(name) {
serde_cbor::from_slice(&bytes[..]).expect("Failed to deserialize CBOR value")
} else {
None
}
}
/// Try to get a value of type `T` from `name`.
pub fn try_get<'a, T>(&'a mut self, name: impl Borrow<str>) -> Option<T>
where T: Deserialize<'a>
{
if let Some(bytes) = self.get_bytes(name) {
serde_cbor::from_slice(&bytes[..]).ok()
} else {
None
}
}
/// Serialize and insert `value` into the stream with `name`.
pub fn insert<T>(&mut self, name: impl Into<Cow<'static, str>>, value: T)
where T: Serialize
{
let len = self.data.len();
match serde_cbor::to_writer(&mut self.data, &value) {
Ok(()) => {
let nlen = self.data.len();
self.data_instances.insert(name.into(), len..nlen).unwrap_none();
},
Err(err) => {
self.data.resize(len, 0); //Roll back result
panic!("Failed inserting CBOR object: {}", err) //TODO: Return Err instead of panic
},
}
}
/// Insert bytes directly with this name
pub fn insert_bytes(&mut self, name: impl Into<Cow<'static, str>>, bytes: impl AsRef<[u8]>)
{
let bytes= bytes.as_ref();
let start = self.data.len();
self.data.extend_from_slice(bytes);
let end = self.data.len();
self.data_instances.insert(name.into(), start..end).unwrap_none();
}
/// Insert a value's bytes directly with this name
pub unsafe fn insert_value_raw<T,U>(&mut self, name: U, value: &T)
where T: ?Sized,
U: Into<Cow<'static, str>>
{
self.insert_bytes(name, bytes::refer(value))
}
/// Try to get the bytes specified by name
pub fn get_bytes(&self, name: impl Borrow<str>) -> Option<&[u8]>
{
if let Some(range) = self.data_instances.get(name.borrow()) {
Some(&self.data[range.clone()])
} else {
None
}
}
/// Try to get the value spcified by name
///
/// # Panics
/// If `T` cannot fit into the size of the range
pub unsafe fn get_value_raw<T>(&self, name: impl Borrow<str>) -> Option<&T>
{
self.get_bytes(name).map(|x| bytes::derefer(x))
}
/// Try to get the value specified by name. Will return `None` if `T` cannot fit in the returned bytes.
pub unsafe fn try_get_value_raw<T>(&self, name: impl Borrow<str>) -> Option<&T>
{
if let Some(bytes) = self.get_bytes(name) {
if bytes.len() >= std::mem::size_of::<T>() {
Some(bytes::derefer(bytes))
} else {
#[cfg(debug_assertions)] eprintln!("Warning! Likely data corruption as {} (size {}) cannot fit into {} bytes",std::any::type_name::<T>(), std::mem::size_of::<T>(), bytes.len());
None
}
} else {
None
}
}
/// Consume into the data
pub fn into_bytes(self) -> Box<[u8]>
{
let mut output = Vec::new(); //TOOO: Get cap
debug_assert!(self.validate(), "passing invalid object to serialise");
macro_rules! bin {
($bytes:expr) => {
{
let bytes = $bytes;
output.extend_from_slice(bytes.as_ref());
}
};
(usize $value:expr) => {
{
use std::convert::TryInto;
use byteorder::{
WriteBytesExt,
LittleEndian,
};
let val: u64 = $value.try_into().expect("Value could not fit into `u64`");
WriteBytesExt::write_u64::<LittleEndian>(&mut output,val).expect("Failed to append `u64` to output buffer");
}
};
}
bin!(usize self.data_instances.len());
for (name, &Range{start, end}) in self.data_instances.iter() {
let name = name.as_bytes();
bin!(usize name.len());
bin!(name);
bin!(usize start);
bin!(usize end);
}
bin!(usize self.data.len()); //for additional checks
output.extend(self.data);
output.into_boxed_slice()
}
/// Read an object from a stream
pub async fn from_stream<T>(input: &mut T) -> io::Result<Self>
where T: AsyncRead + Unpin + ?Sized
{
let mut ext_buf = Vec::new();
macro_rules! bin {
(usize) => {
{
use std::convert::TryFrom;
let value: u64 = input.read_u64().await?;
usize::try_from(value).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "u64 cannot fit in usize"))?
}
};
($size:expr) => {
{
let sz = $size;
ext_buf.resize(sz, 0);
assert_eq!(input.read_exact(&mut ext_buf[..sz]).await?, ext_buf.len());
&ext_buf[..sz]
}
}
}
let entries = bin!(usize);
let mut instances = HashMap::with_capacity(entries);
for _i in 0..entries
{
let name_len = bin!(usize);
let name_bytes = bin!(name_len);
let name_str = std::str::from_utf8(name_bytes).map_err(|_| io::Error::new(io::ErrorKind::InvalidData,"item name was corrupted"))?;
let start = bin!(usize);
let end = bin!(usize);
instances.insert(Cow::Owned(name_str.to_owned()), start..end);
}
let expected_len = bin!(usize);
if expected_len != calc_len(instances.iter().map(|(_, v)| v))
{
return Err(io::Error::new(io::ErrorKind::InvalidData, "expected and read sizes differ"));
}
let mut data = vec![0; expected_len];
assert_eq!(input.read_exact(&mut data[..]).await?, expected_len);
Ok(Self {
data,
data_instances: instances,
})
}
/// Write this instance into an async stream
pub async fn into_stream<T>(&self, output: &mut T) -> io::Result<usize>
where T: AsyncWrite + Unpin + ?Sized
{
//eprintln!("{}: {:?}", self.data.len(), self);
debug_assert!(self.validate(), "passing invalid object to serialise");
let mut written=0usize;
macro_rules! bin {
($bytes:expr) => {
{
let bytes = $bytes;
output.write_all(bytes).await?;
written+=bytes.len();
}
};
(usize $value:expr) => {
{
use std::convert::TryInto;
let val: u64 = $value.try_into().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "size cannot fit in u64"))?;
output.write_u64(val).await?;
written += std::mem::size_of::<u64>();
}
};
}
unsafe {
bin!(usize self.data_instances.len());
for (name, &Range{start, end}) in self.data_instances.iter() {
let name = name.as_bytes();
bin!(usize name.len());
bin!(name);
bin!(usize start);
bin!(usize end);
}
}
bin!(usize self.data.len()); //for additional checks
bin!(&self.data);
Ok(written)
}
}
/// A suspend stream represents a stream of objects of _the same type_. Can be any number of them, but they all must be for the same type.
#[async_trait]
pub trait SuspendStream
{
/// Write an object into the opaque stream.
async fn set_object(&mut self, obj: Object) -> Result<(), Error>;
/// Read an object from the opaque stream.
async fn get_object(&mut self) -> Result<Option<Object>, Error>;
}
/// An error that occoured in a suspend operation
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
BadObject,
MissingObject(Cow<'static, str>),
Corruption,
IO(io::Error),
Other(eyre::Report),
Unknown,
}
impl error::Error for Error
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self {
Self::IO(io) => io,
_ => return None,
})
}
}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::BadObject => write!(f, "unexpected object in stream"),
Self::MissingObject(string) => write!(f, "missing object from stream: {}", string),
Self::Corruption => write!(f, "data stream corruption"),
Self::IO(_) => write!(f, "i/o error"),
Self::Other(report) => write!(f, "internal: {}", report),
_ => write!(f, "unknown error"),
}
}
}
impl From<io::Error> for Error
{
#[inline] fn from(from: io::Error) -> Self
{
Self::IO(from)
}
}
impl From<eyre::Report> for Error
{
fn from(from: eyre::Report) -> Self
{
Self::Other(from)
}
}
/// A suspendable type, that can save and reload its data atomically
#[async_trait]
pub trait Suspendable: Sized
{
async fn suspend<S: SuspendStream + Send + Sync+ ?Sized>(self, into: &mut S) -> Result<(), Error>;
async fn load<S: SuspendStream + Send+ Sync+?Sized>(from: &mut S) -> Result<Self, Error>;
}
/// An in-memory `SuspendStream`.
#[derive(Debug, Clone)]
pub struct MemorySuspendStream(Vec<u8>);
impl MemorySuspendStream
{
/// Create a new empty instance
pub fn new() -> Self
{
Self(Vec::new())
}
/// Create from a vector of bytes
pub fn from_bytes(from: impl Into<Vec<u8>>) -> Self
{
Self(from.into())
}
/// Create from a slice of bytes
pub fn from_slice(from: impl AsRef<[u8]>) -> Self
{
Self(Vec::from(from.as_ref()))
}
/// Return the internal bytes
pub fn into_bytes(self) -> Vec<u8>
{
self.0
}
/// The internal buffer
pub fn buffer(&self) -> &Vec<u8>
{
&self.0
}
/// The internal buffer
pub fn buffer_mut(&mut self) -> &mut Vec<u8>
{
&mut self.0
}
}
impl AsRef<[u8]> for MemorySuspendStream
{
fn as_ref(&self) -> &[u8]
{
&self.0[..]
}
}
impl AsMut<[u8]> for MemorySuspendStream
{
fn as_mut(&mut self) -> &mut [u8]
{
&mut self.0[..]
}
}
impl From<Vec<u8>> for MemorySuspendStream
{
#[inline] fn from(from: Vec<u8>) -> Self
{
Self(from.into())
}
}
impl From<Box<[u8]>> for MemorySuspendStream
{
fn from(from: Box<[u8]>) -> Self
{
Self::from_bytes(from)
}
}
impl From<MemorySuspendStream> for Box<[u8]>
{
fn from(from: MemorySuspendStream) -> Self
{
from.0.into()
}
}
impl From<MemorySuspendStream> for Vec<u8>
{
#[inline] fn from(from: MemorySuspendStream) -> Self
{
from.0
}
}
#[async_trait]
impl SuspendStream for MemorySuspendStream
{
async fn get_object(&mut self) -> Result<Option<Object>, Error> {
if self.0.len() ==0 {
return Ok(None);
}
let mut ptr = &self.0[..];
let vl = Object::from_stream(&mut ptr).await?;
let diff = (ptr.as_ptr() as usize) - ((&self.0[..]).as_ptr() as usize);
self.0.drain(0..diff);
Ok(Some(vl))
}
async fn set_object(&mut self, obj: Object) -> Result<(), Error> {
obj.into_stream(&mut self.0).await?;
Ok(())
}
}
/// Suspend a single object to memory
pub async fn oneshot<T: Suspendable>(value: T) -> Result<Vec<u8>, Error>
{
let mut output = MemorySuspendStream::new();
value.suspend(&mut output).await?;
Ok(output.into_bytes())
}
/// Load a single value from memory
pub async fn single<T: Suspendable>(from: impl AsRef<[u8]>) -> Result<T, Error>
{
struct BorrowedStream<'a>(&'a [u8]);
#[async_trait]
impl<'a> SuspendStream for BorrowedStream<'a>
{
async fn get_object(&mut self) -> Result<Option<Object>, Error> {
if self.0.len() ==0 {
return Ok(None);
}
let mut ptr = &self.0[..];
let vl = Object::from_stream(&mut ptr).await?;
let diff = (ptr.as_ptr() as usize) - ((&self.0[..]).as_ptr() as usize);
self.0 = &self.0[diff..];
Ok(Some(vl))
}
async fn set_object(&mut self, _: Object) -> Result<(), Error> {
panic!("Cannot write to borrowed stream")
}
}
let bytes = from.as_ref();
let mut stream = BorrowedStream(bytes);
T::load(&mut stream).await
}

@ -1,109 +0,0 @@
//! Tripcode. TODO: Use kana-hash
use super::*;
use khash::{
ctx::{
self,
Algorithm,
},
salt,
};
use std::{
fmt,
error,
};
#[derive(Debug)]
pub struct Error(khash::error::Error);
impl From<khash::error::Error> for Error
{
#[inline] fn from(from: khash::error::Error) -> Self
{
Self(from)
}
}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to compute tripcode")
}
}
impl error::Error for Error
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
///A kana-hash or special tripcode
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Serialize, Deserialize)]
pub enum Tripcode
{
/// A secure tripcode computed with the user set configured salt
Secure(String),
/// A normal tripcode computed with the default embedded salt
Normal(String),
/// A special tripcode with a pre-set string
Special(String),
}
impl Tripcode
{
/// Generate a new normal tripcode
pub fn new_normal(from: impl AsRef<[u8]>) -> Result<Self, Error>
{
Ok(Self::Normal(khash::generate(&ctx::Context::new(Algorithm::Sha256Truncated, salt::Salt::internal()), from)?))
}
/// Generate a new `secure' tripcode.
///
/// # Panics
/// If global config has not been set yet.
pub fn new_secure(from: impl AsRef<[u8]>) -> Result<Self, Error>
{
Ok(Self::Secure(khash::generate(&ctx::Context::new(Algorithm::Sha256Truncated, salt::Salt::fixed(config::get().tripcode_salt)), from)?))
}
/// Get the internal string representing the tripcode.
///
/// # Notes
/// This does not include the prefixes.
pub fn as_str(&self) -> &str
{
match self {
Self::Secure(s) | Self::Normal(s) | Self::Special(s) => s.as_str()
}
}
/// Consume the instance returning the inner string.
///
/// # Notes
/// This does not include the prefixes.
pub fn into_inner(self) -> String
{
match self {
Self::Secure(s) | Self::Normal(s) | Self::Special(s) => s
}
}
}
impl fmt::Display for Tripcode
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Secure(sec) => write!(f, "!!{}", sec),
Self::Normal(nor) => write!(f, "!{}", nor),
Self::Special(spec) => write!(f, "{}", spec),
}
}
}

@ -1,61 +0,0 @@
//! Internal errors
use super::*;
use std::{
fmt,
error,
net::SocketAddr,
};
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
Denied(SocketAddr, bool),
TimeoutReached,
NoResponse,
Unknown,
}
#[derive(Debug)]
pub struct HandleError;
impl Error
{
/// Print this error as a warning
#[inline(never)] #[cold] pub fn warn(self) -> Self
{
warn!("{}", self);
self
}
/// Print this error as info
#[inline(never)] #[cold] pub fn info(self) -> Self
{
info!("{}", self);
self
}
}
impl error::Error for Error{}
impl error::Error for HandleError{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Denied(sock, true) => write!(f, "denied connection (explicit): {}", sock),
Self::Denied(sock, _) => write!(f, "denied connection (implicit): {}", sock),
Self::TimeoutReached => write!(f, "timeout reached"),
Self::NoResponse => write!(f, "no handler for this request"),
_ => write!(f, "unknown error"),
}
}
}
impl fmt::Display for HandleError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "handle response had already been sent or timed out by the time we tried to access it")
}
}

@ -1,269 +0,0 @@
//! Handle web serving and managing state of web clients
use super::*;
use std::{
sync::{
Arc,
Weak,
},
marker::{
Send, Sync,
},
iter,
};
use hyper::{
service::{
make_service_fn,
service_fn,
},
server::{
Server,
conn::AddrStream,
},
Request,
Response,
Body,
};
use futures::{
TryStreamExt as _,
};
use cidr::{
Cidr,
};
use tokio::{
sync::{
RwLock,
mpsc,
},
};
pub mod error;
pub mod route;
/// A unique ID generated each time a request is sent through router.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Nonce(uuid::Uuid);
#[derive(Debug, Clone)]
pub struct Handle
{
state: Arc<State>,
nonce: Nonce,
req: Arc<Request<Body>>,
/// We can let multiple router hooks mutate body if they desire. Such as adding headers, etc.
resp: Arc<RwLock<Response<Body>>>,
}
impl Handle
{
/// Attempt to upgrade the response handle into a potentially mutateable `Response`.
///
/// Function fails if the reference count to the response has expired (i.e. the response has been sent or timed out already)
pub fn access_response(&self) -> Result<Arc<RwLock<Response<Body>>>, error::HandleError>
{
Ok(self.resp.clone())
//self.resp.upgrade().ok_or(error::HandleError)
}
/// Replace the response with a new one if possible.
///
/// Fails if `access_response()` fails.
pub async fn set_response(&self, rsp: Response<Body>) -> Result<Response<Body>, error::HandleError>
{
use std::ops::DerefMut;
match self.access_response() {
Ok(resp) => Ok(std::mem::replace(resp.write().await.deref_mut(), rsp)),
Err(err) => Err(err),
}
}
}
/// Contains all web-server state
#[derive(Debug)]
pub struct State
{
config: config::Config,
router: RwLock<route::Router<Handle>>,
}
impl State
{
/// Create a new state with this specific config instance.
///
/// # Notes
/// You'll almost always want to use the *global* config instance, in which case use `default()` to create this.
pub fn new(config: config::Config) -> Self
{
Self{
config,
router: RwLock::new(route::Router::new()),
}
}
}
impl Default for State
{
#[inline]
fn default() -> Self
{
Self::new(config::get().clone())
}
}
fn mask_contains(mask: &[cidr::IpCidr], value: &std::net::IpAddr) -> bool
{
for mask in mask.iter()
{
if mask.contains(value) {
return true;
}
}
false
}
fn handle_test(state: Arc<State>) -> tokio::task::JoinHandle<()>
{
tokio::task::spawn(async move {
let (hook, mut recv) = {
let mut router = state.router.write().await;
router.hook(None, route::PrefixRouter::new("/hello"))
};
while let Some((uri, handle)) = recv.recv().await
{
match handle.set_response(Response::builder()
.status(200)
.body(format!("Hello world! You are at {}", uri).into())
.unwrap()).await {
Ok(_) => (),
Err(e) => {
error!("{}", e);
break;
},
}
}
{
let mut router = state.router.write().await;
router.unhook(iter::once(hook));
}
})
}
async fn handle_conn(state: Arc<State>, req: Request<Body>) -> Result<Response<Body>, error::Error>
{
let response = Arc::new(RwLock::new(Response::new(Body::empty())));
let nonce = Nonce(uuid::Uuid::new_v4());
let req = Arc::new(req);
let resp_num = {
let resp = Arc::clone(&response);
async {
let mut route = state.router.write().await;
let handle = Handle {
state: state.clone(),
nonce,
req: Arc::clone(&req),
resp,
};
match route.dispatch(req.method(), req.uri().path(), handle, state.config.req_timeout_local).await {
Ok(num) => {
num
},
Err((num, _)) => {
num
},
}
}
};
tokio::pin!(resp_num);
match match state.config.req_timeout_global {
Some(timeout) => tokio::time::timeout(timeout, resp_num).await,
None => Ok(resp_num.await),
} {
Ok(0) => {
// No handlers matched this
trace!(" x {}", req.uri().path());
Ok(Response::builder()
.status(404)
.body("404 not found".into())
.unwrap())
},
Ok(_) => {
let resp = {
let mut resp = response;
loop {
match Arc::try_unwrap(resp) {
Err(e) => {
resp = e;
tokio::task::yield_now().await;
},
Ok(n) => break n,
}
}
};
Ok(resp.into_inner())
},
Err(_) => {
// Timeout reached
Err(error::Error::TimeoutReached.info())
},
}
}
pub async fn serve(state: State) -> Result<(), eyre::Report>
{
cfg_debug!(if {
if &state.config != config::get() {
panic!("Our config is not the same as global? This is unsound.");
}
} else {
if &state.config != config::get() {
warn!("Our config is not the same as global? This is unsound.");
}
});
let h = {
let state = Arc::new(state);
let h = handle_test(state.clone());
let service = make_service_fn(|conn: &AddrStream| {
let state = Arc::clone(&state);
let remote_addr = conn.remote_addr();
let remote_ip = remote_addr.ip();
let denied = mask_contains(&state.config.deny_mask[..], &remote_ip);
let allowed = mask_contains(&state.config.accept_mask[..], &remote_ip);
async move {
if denied {
Err(error::Error::Denied(remote_addr, true).warn())
} else if allowed || state.config.accept_default {
trace!("Accepted conn: {}", remote_addr);
Ok(service_fn(move |req: Request<Body>| {
handle_conn(Arc::clone(&state), req)
}))
} else {
Err(error::Error::Denied(remote_addr,false).info())
}
}
});
let server = Server::bind(&state.config.listen).serve(service)
.with_graceful_shutdown(async {
tokio::signal::ctrl_c().await.expect("Failed to catch SIGINT");
info!("Going down for shutdown now!");
});
server.await?;
// remove all handles now
let mut wr= state.router.write().await;
wr.clear();
h
};
trace!("server down");
h.await?;
Ok(())
}

@ -1,254 +0,0 @@
//! Basic router
use super::*;
use hyper::{
Method,
};
use std::{
fmt,
marker::{
Send,
Sync,
},
iter,
};
use tokio::{
sync::{
mpsc::{
self,
error::SendTimeoutError,
},
},
time,
};
use futures::{
future::{
self,
Future,
},
};
use generational_arena::{
Index,
Arena,
};
pub trait UriRoute
{
fn is_match(&self, uri: &str) -> bool;
#[inline] fn as_string(&self) -> &str
{
""
}
#[inline] fn type_name(&self) -> &str
{
std::any::type_name::<Self>()
}
#[inline] fn mutate_uri(&self, uri: String) -> String
{
uri
}
}
impl UriRoute for str
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.eq(uri)
}
#[inline] fn as_string(&self) -> &str {
self
}
}
impl<T: AsRef<str>> UriRoute for T
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.as_ref().eq(uri)
}
#[inline] fn as_string(&self) -> &str {
self.as_ref()
}
}
impl UriRoute for regex::Regex
{
#[inline] fn is_match(&self, uri: &str) -> bool {
self.test(uri)
}
#[inline] fn as_string(&self) -> &str {
self.as_str()
}
}
/// A router for all under a prefix
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct PrefixRouter(String);
impl PrefixRouter
{
/// Create a new instance with this string
pub fn new(string: impl Into<String>) -> Self
{
Self(string.into())
}
}
impl UriRoute for PrefixRouter
{
#[inline] fn is_match(&self, uri: &str) -> bool {
uri.starts_with(self.0.as_str())
}
#[inline] fn as_string(&self) -> &str {
self.0.as_str()
}
fn mutate_uri(&self, mut uri: String) -> String {
uri.replace_range(..self.0.len(), "");
uri
}
}
impl fmt::Display for PrefixRouter
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}*", self.0)
}
}
/// Contains a routing table
#[derive(Debug)]
pub struct Router<T: Send>
{
routes: Arena<(Option<Method>, OpaqueDebug<Box<dyn UriRoute + Send + Sync + 'static>>, mpsc::Sender<(String, T)>)>,
}
impl<T: Send> fmt::Display for Router<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Router {{ routes: ")?;
for (i, (method, route, _)) in self.routes.iter() {
writeln!(f, "\t ({:?} => ({:?}, {} ({:?}))),", i, method, route.type_name(), route.as_string())?;
}
write!(f, "}}")
}
}
impl<T: Send + Clone> Router<T>
{
/// Create an empty routing table
pub fn new() -> Self
{
Self {
routes: Arena::new(),
}
}
/// Push a new route into the router.
///
/// # Returns
/// The hook's new index, and the receiver that `dispatch()` sends to.
pub fn hook<Uri: UriRoute + Send + Sync + 'static>(&mut self, method: Option<Method>, uri: Uri) -> (Index, mpsc::Receiver<(String, T)>)
{
let (tx, rx) = mpsc::channel(config::get_or_default().dos_max);
(self.routes.insert((method, OpaqueDebug::new(Box::new(uri)), tx)), rx)
}
/// Remove all hooks
pub fn clear(&mut self)
{
self.routes.clear();
}
/// Dispatch the URI location across this router, sending to all that match it.
///
/// # Timeout
/// The timeout is waited on the *individual* dispatches. If you want a global timeout, please timeout the future returned by this function instead.
/// Timed-out dispatches are counted the same as sending errors.
///
/// # Returns
/// When one or more dispatchers match but faile, `Err` is returned. Inside the `Err` tuple is the amount of successful dispatches, and also a vector containing the indecies of the failed hook sends.
pub async fn dispatch(&mut self, method: &Method, uri: impl AsRef<str>, nonce: T, timeout: Option<time::Duration>) -> Result<usize, (usize, Vec<Index>)>
{
let string = uri.as_ref();
let mut success=0usize;
let vec: Vec<_> =
future::join_all(self.routes.iter_mut()
.filter_map(|(i, (a_method, route, sender))| {
match a_method {
Some(x) if x != method => None,
_ => {
if route.is_match(string) {
trace!("{:?} `{}`: -> {}",i, route.as_string(), string);
let timeout = timeout.clone();
let nonce= nonce.clone();
macro_rules! send {
() => {
{
let string = route.mutate_uri(string.to_owned());
match timeout {
None => sender.send((string, nonce)).await
.map_err(|e| SendTimeoutError::Closed(e.0)),
Some(time) => sender.send_timeout((string, nonce), time).await
}
}
}
};
Some(async move {
match send!() {
Err(SendTimeoutError::Closed(er)) => {
error!("{:?}: Dispatch failed on hooked route for `{}`", i, er.0);
Err(i)
},
Err(SendTimeoutError::Timeout(er)) => {
warn!("{:?}: Dispatch timed out on hooked route for `{}`", i, er.0);
Err(i)
},
_ => Ok(()),
}
})
} else {
None
}
},
}
})).await.into_iter()
.filter_map(|res| {
if res.is_ok() {
success+=1;
}
res.err()
}).collect();
if vec.len() > 0 {
Err((success, vec))
} else {
Ok(success)
}
}
/// Forcefully dispatch `uri` on hook `which`, regardless of method or URI matching.
///
/// # Returns
/// If `which` is not contained within the table, immediately returns `None`, otherwise returns a future that completes when the dispatch is complete.
/// Note: This future must be `await`ed for the dispatch to happen.
pub fn dispatch_force(&mut self, which: Index, uri: String, nonce: T, timeout: Option<time::Duration>) -> Option<impl Future<Output = Result<(), SendTimeoutError<(String, T)>>> + '_>
{
self.routes.get_mut(which).map(move |(_,_,send)| {
match timeout {
Some(timeout) => send.send_timeout((uri, nonce), timeout).boxed(),
None => send.send((uri, nonce)).map(|res| res.map_err(|e| SendTimeoutError::Closed(e.0))).boxed(),
}
})
}
/// Attempt to unhook these hooks. If one or more of the provided indecies does not exist in the routing table, it is ignored.
pub fn unhook<I>(&mut self, items: I)
where I: IntoIterator<Item = Index>
{
for item in items.into_iter() {
self.routes.remove(item);
}
}
}
Loading…
Cancel
Save