From 57c74bd4117fbd9c42bbbafa17652bba34f0e290 Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 8 Oct 2020 13:19:03 +0100 Subject: [PATCH] reorganise --- src/feed.rs | 49 +++++++++++++++++++++++++++++++++++ src/gen.rs | 33 ++++++++++++++++++++++++ src/main.rs | 73 +++-------------------------------------------------- 3 files changed, 86 insertions(+), 69 deletions(-) create mode 100644 src/feed.rs create mode 100644 src/gen.rs diff --git a/src/feed.rs b/src/feed.rs new file mode 100644 index 0000000..8c849e3 --- /dev/null +++ b/src/feed.rs @@ -0,0 +1,49 @@ +//! Feeding the chain +use super::*; + +fn feed(chain: &mut Chain, what: impl AsRef) +{ + chain.feed(what.as_ref().split_whitespace() + .filter(|word| !word.is_empty()) + .map(|s| s.to_owned()).collect::>()); +} + +pub async fn full(who: &IpAddr, state: State, mut body: impl Unpin + Stream>) -> Result { + let mut buffer = Vec::new(); + + let mut written = 0usize; + while let Some(buf) = body.next().await { + let mut body = buf.map_err(|_| FillBodyError)?; + while body.has_remaining() { + buffer.extend_from_slice(body.bytes()); + let cnt = body.bytes().len(); + body.advance(cnt); + written += cnt; + } + } + if !buffer.ends_with(&[b'\n']) { // probably useless eh? + buffer.push(b'\n'); + } + let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?; + info!("{} -> {:?}", who, buffer); + let mut chain = state.chain().write().await; + feed(&mut chain, buffer); + + state.notify_save(); + Ok(written) +} + + +#[derive(Debug)] +pub struct FillBodyError; + +impl error::Error for FillBodyError{} +impl warp::reject::Reject for FillBodyError{} +impl fmt::Display for FillBodyError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "failed to feed chain with this data") + } +} + diff --git a/src/gen.rs b/src/gen.rs new file mode 100644 index 0000000..6c1e4e5 --- /dev/null +++ b/src/gen.rs @@ -0,0 +1,33 @@ +//! Generating the strings +use super::*; + +#[derive(Debug)] +pub struct GenBodyError(String); + +impl error::Error for GenBodyError{} +impl fmt::Display for GenBodyError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "failed to write {:?} to body", self.0) + } +} + + +pub async fn body(state: State, num: Option, mut output: mpsc::Sender) -> Result<(), GenBodyError> +{ + let chain = state.chain().read().await; + if !chain.is_empty() { + match num { + Some(num) if num < state.config().max_gen_size => { + //This could DoS `full_body` and writes, potentially. + for string in chain.str_iter_for(num) { + output.send(string).await.map_err(|e| GenBodyError(e.0))?; + } + }, + _ => output.send(chain.generate_str()).await.map_err(|e| GenBodyError(e.0))?, + } + } + Ok(()) +} + diff --git a/src/main.rs b/src/main.rs index 42652d7..1eb0a8f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,74 +46,9 @@ mod save; mod forwarded_list; use forwarded_list::XForwardedFor; -#[derive(Debug)] -pub struct FillBodyError; +mod feed; +mod gen; -impl error::Error for FillBodyError{} -impl warp::reject::Reject for FillBodyError{} -impl fmt::Display for FillBodyError -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result - { - write!(f, "failed to feed chain with this data") - } -} - - -async fn full_body(who: &IpAddr, state: State, mut body: impl Unpin + Stream>) -> Result { - let mut buffer = Vec::new(); - - let mut written = 0usize; - while let Some(buf) = body.next().await { - let mut body = buf.map_err(|_| FillBodyError)?; - while body.has_remaining() { - buffer.extend_from_slice(body.bytes()); - let cnt = body.bytes().len(); - body.advance(cnt); - written += cnt; - } - } - - let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?; - info!("{} -> {:?}", who, buffer); - let mut chain = state.chain().write().await; - chain.feed(&buffer.split_whitespace() - .filter(|word| !word.is_empty()) - .map(|s| s.to_owned()).collect::>()); - - state.notify_save(); - Ok(written) -} - -#[derive(Debug)] -pub struct GenBodyError(String); - -impl error::Error for GenBodyError{} -impl fmt::Display for GenBodyError -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result - { - write!(f, "failed to write {:?} to body", self.0) - } -} - - -async fn gen_body(state: State, num: Option, mut output: mpsc::Sender) -> Result<(), GenBodyError> -{ - let chain = state.chain().read().await; - if !chain.is_empty() { - match num { - Some(num) if num < state.config().max_gen_size => { - //This could DoS `full_body` and writes, potentially. - for string in chain.str_iter_for(num) { - output.send(string).await.map_err(|e| GenBodyError(e.0))?; - } - }, - _ => output.send(chain.generate_str()).await.map_err(|e| GenBodyError(e.0))?, - } - } - Ok(()) -} #[tokio::main] async fn main() { pretty_env_logger::init(); @@ -176,7 +111,7 @@ async fn main() { .and(warp::body::stream()) .and_then(|state: State, host: IpAddr, buf| { async move { - full_body(&host, state, buf).await + feed::full(&host, state, buf).await .map(|_| warp::reply::with_status(warp::reply(), status!(201))) .map_err(warp::reject::custom) } @@ -191,7 +126,7 @@ async fn main() { .and_then(|state: State, host: IpAddr, num: Option| { async move { let (tx, rx) = mpsc::channel(state.config().max_gen_size); - tokio::spawn(gen_body(state, num, tx)); + tokio::spawn(gen::body(state, num, tx)); Ok::<_, std::convert::Infallible>(Response::new(Body::wrap_stream(rx.map(move |x| { info!("{} <- {:?}", host, x); Ok::<_, std::convert::Infallible>(x)