From f2565088b4bc2cc0eaebcfc8807514e329b13341 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 10 Oct 2020 21:20:33 +0100 Subject: [PATCH] fix fuse error --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api.rs | 4 ++-- src/chunking.rs | 10 ++++++---- src/feed.rs | 19 ++++++++++++++----- src/main.rs | 2 +- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index efeb4d7..0eeb643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -639,7 +639,7 @@ dependencies = [ [[package]] name = "markov" -version = "0.4.1" +version = "0.5.0" dependencies = [ "async-compression", "cfg-if 1.0.0", diff --git a/Cargo.toml b/Cargo.toml index 66c22fd..661860e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "markov" -version = "0.5.0" +version = "0.5.1" description = "Generate string of text from Markov chain fed by stdin" authors = ["Avril "] edition = "2018" diff --git a/src/api.rs b/src/api.rs index 98cee79..c6eeb7c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -51,10 +51,10 @@ async fn single_stream(host: IpAddr, num: Option, body: impl Buf) -> Resu cfg_if! { if #[cfg(feature="split-newlines")] { for body in body.split('\n').filter(|line| !line.trim().is_empty()) { - feed::feed(&mut chain, body); + feed::feed(&mut chain, body, 1..); } }else { - feed::feed(&mut chain, body); + feed::feed(&mut chain, body, 1..); } } if_debug!{ diff --git a/src/chunking.rs b/src/chunking.rs index b43f358..47dcc30 100644 --- a/src/chunking.rs +++ b/src/chunking.rs @@ -17,6 +17,8 @@ use tokio::{ use futures::{ stream::{ Stream, + StreamExt, + Fuse, }, }; use pin_project::pin_project; @@ -27,7 +29,7 @@ pub struct StreamReader where I: Stream { #[pin] - source: I, + source: Fuse, buffer: Vec, } @@ -43,13 +45,13 @@ where I: Stream, /// Consume into the original stream pub fn into_inner(self) -> I { - self.source + self.source.into_inner() } /// Create a new instance with a buffer capacity pub fn with_capacity(source: I, cap: usize) -> Self { Self { - source, + source: source.fuse(), buffer: Vec::with_capacity(cap) } } @@ -57,7 +59,7 @@ where I: Stream, pub fn new(source: I) -> Self { Self { - source, + source: source.fuse(), buffer: Vec::new(), } } diff --git a/src/feed.rs b/src/feed.rs index fee939c..6f16027 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -1,13 +1,22 @@ //! Feeding the chain use super::*; -pub fn feed(chain: &mut Chain, what: impl AsRef) +const FEED_BOUNDS: std::ops::RangeFrom = 2..; //TODO: Add to config somehow + + +pub fn feed(chain: &mut Chain, what: impl AsRef, bounds: impl std::ops::RangeBounds) -> bool { let map = what.as_ref().split_whitespace() .filter(|word| !word.is_empty()) .map(|s| s.to_owned()).collect::>(); - if map.len() > 0 { + debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds"); + if bounds.contains(&map.len()) { chain.feed(map); + true + } + else { + debug!("Ignoring feed of invalid length {}", map.len()); + false } } @@ -38,10 +47,10 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream {:?}", who, line); } written+=line.len(); diff --git a/src/main.rs b/src/main.rs index 6925d8d..338a772 100644 --- a/src/main.rs +++ b/src/main.rs @@ -140,7 +140,7 @@ async fn main() { async move { feed::full(&host, state, buf).await .map(|_| warp::reply::with_status(warp::reply(), status!(201))) - .map_err(warp::reject::custom) + .map_err(|_| warp::reject::not_found()) //(warp::reject::custom) //TODO: Recover rejection filter down below for custom error return } }) .with(warp::log("markov::put"));