From 51a5d0aeba279dc0d35602a00ab8a17056468955 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 11 Oct 2020 03:12:41 +0100 Subject: [PATCH] start sentance, sanitise --- Cargo.toml | 2 +- markov.toml | 2 +- src/api.rs | 99 -------------------------------------------- src/api/error.rs | 55 ++++++++++++++++++++++++ src/api/mod.rs | 32 ++++++++++++++ src/api/sentance.rs | 2 + src/api/single.rs | 47 +++++++++++++++++++++ src/feed.rs | 2 +- src/main.rs | 8 +++- src/sanitise/mod.rs | 37 +++++++++++++++++ src/sanitise/word.rs | 45 ++++++++++++++++++++ 11 files changed, 227 insertions(+), 104 deletions(-) delete mode 100644 src/api.rs create mode 100644 src/api/error.rs create mode 100644 src/api/mod.rs create mode 100644 src/api/sentance.rs create mode 100644 src/api/single.rs create mode 100644 src/sanitise/mod.rs create mode 100644 src/sanitise/word.rs diff --git a/Cargo.toml b/Cargo.toml index 661860e..2ad06f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["compress-chain", "split-newlines"] +default = ["compress-chain", "split-newlines", "api"] # Compress the chain data file when saved to disk compress-chain = ["async-compression"] diff --git a/markov.toml b/markov.toml index 6090632..dc0a048 100644 --- a/markov.toml +++ b/markov.toml @@ -2,5 +2,5 @@ bindpoint = '127.0.0.1:8001' file = 'chain.dat' max_content_length = 4194304 max_gen_size = 256 -#save_interval_secs = 2 +#save_interval_secs = 15 trust_x_forwarded_for = false diff --git a/src/api.rs b/src/api.rs deleted file mode 100644 index c6eeb7c..0000000 --- a/src/api.rs +++ /dev/null @@ -1,99 +0,0 @@ -//! For API calls if enabled -use super::*; -use std::{ - fmt, - error, - iter, - convert::Infallible, -}; -use futures::{ - stream::{ - self, - BoxStream, - StreamExt, - }, -}; - -#[inline] fn aggregate(mut body: impl Buf) -> Result -{ - /*let mut output = Vec::new(); - while body.has_remaining() { - let bytes = body.bytes(); - output.extend_from_slice(&bytes[..]); - let cnt = bytes.len(); - body.advance(cnt); -}*/ - - std::str::from_utf8(&body.to_bytes()).map(ToOwned::to_owned) -} - -pub async fn single(host: IpAddr, num: Option, body: impl Buf) -> Result -{ - single_stream(host, num, body).await - .map(|rx| Response::new(Body::wrap_stream(rx.map(move |x| { - info!("{} <- {:?}", host, x); - x - })))) - .map_err(warp::reject::custom) -} - -//TODO: Change to stream impl like normal `feed` has, instead of taking aggregate? -async fn single_stream(host: IpAddr, num: Option, body: impl Buf) -> Result>, ApiError> -{ - let body = aggregate(body)?; - info!("{} <- {:?}", host, &body[..]); - - let mut chain = Chain::new(); - - if_debug! { - let timer = std::time::Instant::now(); - } - cfg_if! { - if #[cfg(feature="split-newlines")] { - for body in body.split('\n').filter(|line| !line.trim().is_empty()) { - feed::feed(&mut chain, body, 1..); - } - }else { - feed::feed(&mut chain, body, 1..); - } - } - if_debug!{ - trace!("Write took {}ms", timer.elapsed().as_millis()); - } - match num { - None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()), - Some(num) => { - let (mut tx, rx) = mpsc::channel(num); - tokio::spawn(async move { - for string in chain.str_iter_for(num) { - tx.send(string).await.expect("Failed to send string to body"); - } - }); - Ok(StreamExt::map(rx, |x| Ok::<_, Infallible>(x)).boxed()) - } - } -} - -#[derive(Debug)] -pub enum ApiError { - Body, -} -impl warp::reject::Reject for ApiError{} -impl error::Error for ApiError{} -impl std::fmt::Display for ApiError -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result - { - match self { - Self::Body => write!(f, "invalid data in request body"), - } - } -} - -impl From for ApiError -{ - fn from(_: std::str::Utf8Error) -> Self - { - Self::Body - } -} diff --git a/src/api/error.rs b/src/api/error.rs new file mode 100644 index 0000000..97aa6e0 --- /dev/null +++ b/src/api/error.rs @@ -0,0 +1,55 @@ +//! API errors +//use super::*; +use std::{ + error, + fmt, +}; +use warp::{ + Rejection, + Reply, +}; + +#[derive(Debug)] +pub enum ApiError { + Body, +} + +impl ApiError +{ + #[inline] fn error_code(&self) -> warp::http::StatusCode + { + status!(match self { + Self::Body => 422, + }) + } +} + +impl warp::reject::Reject for ApiError{} +impl error::Error for ApiError{} +impl std::fmt::Display for ApiError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Body => write!(f, "invalid data in request body"), + } + } +} + +impl From for ApiError +{ + fn from(_: std::str::Utf8Error) -> Self + { + Self::Body + } +} + +// Handles API rejections +pub async fn rejection(err: Rejection) -> Result +{ + if let Some(api) = err.find::() { + Ok(warp::reply::with_status(format!("ApiError: {}\n", api), api.error_code())) + } else { + Err(err) + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs new file mode 100644 index 0000000..cfd7fb1 --- /dev/null +++ b/src/api/mod.rs @@ -0,0 +1,32 @@ +//! For API calls if enabled +use super::*; +use std::{ + iter, + convert::Infallible, +}; +use futures::{ + stream::{ + self, + BoxStream, + StreamExt, + }, +}; +pub mod error; +use error::ApiError; + +mod single; + +#[inline] fn aggregate(mut body: impl Buf) -> Result +{ + std::str::from_utf8(&body.to_bytes()).map(ToOwned::to_owned) +} + +pub async fn single(host: IpAddr, num: Option, body: impl Buf) -> Result +{ + single::single_stream(host, num, body).await + .map(|rx| Response::new(Body::wrap_stream(rx.map(move |x| { + info!("{} <- {:?}", host, x); + x + })))) + .map_err(warp::reject::custom) +} diff --git a/src/api/sentance.rs b/src/api/sentance.rs new file mode 100644 index 0000000..9f8f411 --- /dev/null +++ b/src/api/sentance.rs @@ -0,0 +1,2 @@ +//! /sentance/ +use super::*; diff --git a/src/api/single.rs b/src/api/single.rs new file mode 100644 index 0000000..572656b --- /dev/null +++ b/src/api/single.rs @@ -0,0 +1,47 @@ +//! Handler for /single/ +use super::*; + +//TODO: Change to stream impl like normal `feed` has, instead of taking aggregate? +pub async fn single_stream(host: IpAddr, num: Option, body: impl Buf) -> Result>, ApiError> +{ + let body = aggregate(body)?; + info!("{} <- {:?}", host, &body[..]); + + let mut chain = Chain::new(); + + if_debug! { + let timer = std::time::Instant::now(); + } + cfg_if! { + if #[cfg(feature="split-newlines")] { + for body in body.split('\n').filter(|line| !line.trim().is_empty()) { + feed::feed(&mut chain, body, 1..); + } + }else { + feed::feed(&mut chain, body, 1..); + } + } + if_debug!{ + trace!("Write took {}ms", timer.elapsed().as_millis()); + } + if chain.is_empty() { + Ok(stream::empty().boxed()) + } else { + match num { + + None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()), + Some(num) => { + let (mut tx, rx) = mpsc::channel(num); + tokio::spawn(async move { + for string in chain.str_iter_for(num) { + if let Err(e) = tx.send(string).await { + error!("Failed to send string to body, aborting: {:?}", e.0); + break; + } + } + }); + Ok(StreamExt::map(rx, |x| Ok::<_, Infallible>(x)).boxed()) + } + } + } +} diff --git a/src/feed.rs b/src/feed.rs index 6f16027..0a9eb8f 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -57,7 +57,7 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream(from: &mut T, to: &mut U) -> Result +{ + todo!() +} + + +#[derive(Debug)] +pub enum Error { + +} +impl error::Error for Error{} +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + Ok(()) + } +} + diff --git a/src/sanitise/word.rs b/src/sanitise/word.rs new file mode 100644 index 0000000..0730282 --- /dev/null +++ b/src/sanitise/word.rs @@ -0,0 +1,45 @@ +//! Word splitting +use super::*; +use std::{ + borrow::Borrow, +}; + +#[derive(Debug, PartialEq, Eq)] +#[repr(transparent)] +pub struct Word(str); + +impl Word +{ + pub fn new<'a>(from: &'a str) -> &'a Self + { + unsafe { + std::mem::transmute(from) + } + } +} + +impl<'a> From<&'a str> for &'a Word +{ + fn from(from: &'a str) -> Self + { + Word::new(from) + } +} + +impl AsRef for Word +{ + fn as_ref(&self) -> &str + { + &self.0 + } +} + +impl AsRef for str +{ + fn as_ref(&self) -> &Word + { + Word::new(self) + } +} + +//impl Borrow<>