fix fuse error

serve
Avril 4 years ago
parent 206d38300a
commit f2565088b4
Signed by: flanchan
GPG Key ID: 284488987C31F630

2
Cargo.lock generated

@ -639,7 +639,7 @@ dependencies = [
[[package]] [[package]]
name = "markov" name = "markov"
version = "0.4.1" version = "0.5.0"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"cfg-if 1.0.0", "cfg-if 1.0.0",

@ -1,6 +1,6 @@
[package] [package]
name = "markov" name = "markov"
version = "0.5.0" version = "0.5.1"
description = "Generate string of text from Markov chain fed by stdin" description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"

@ -51,10 +51,10 @@ async fn single_stream(host: IpAddr, num: Option<usize>, body: impl Buf) -> Resu
cfg_if! { cfg_if! {
if #[cfg(feature="split-newlines")] { if #[cfg(feature="split-newlines")] {
for body in body.split('\n').filter(|line| !line.trim().is_empty()) { for body in body.split('\n').filter(|line| !line.trim().is_empty()) {
feed::feed(&mut chain, body); feed::feed(&mut chain, body, 1..);
} }
}else { }else {
feed::feed(&mut chain, body); feed::feed(&mut chain, body, 1..);
} }
} }
if_debug!{ if_debug!{

@ -17,6 +17,8 @@ use tokio::{
use futures::{ use futures::{
stream::{ stream::{
Stream, Stream,
StreamExt,
Fuse,
}, },
}; };
use pin_project::pin_project; use pin_project::pin_project;
@ -27,7 +29,7 @@ pub struct StreamReader<I, T>
where I: Stream<Item=T> where I: Stream<Item=T>
{ {
#[pin] #[pin]
source: I, source: Fuse<I>,
buffer: Vec<u8>, buffer: Vec<u8>,
} }
@ -43,13 +45,13 @@ where I: Stream<Item=T>,
/// Consume into the original stream /// Consume into the original stream
pub fn into_inner(self) -> I pub fn into_inner(self) -> I
{ {
self.source self.source.into_inner()
} }
/// Create a new instance with a buffer capacity /// Create a new instance with a buffer capacity
pub fn with_capacity(source: I, cap: usize) -> Self pub fn with_capacity(source: I, cap: usize) -> Self
{ {
Self { Self {
source, source: source.fuse(),
buffer: Vec::with_capacity(cap) buffer: Vec::with_capacity(cap)
} }
} }
@ -57,7 +59,7 @@ where I: Stream<Item=T>,
pub fn new(source: I) -> Self pub fn new(source: I) -> Self
{ {
Self { Self {
source, source: source.fuse(),
buffer: Vec::new(), buffer: Vec::new(),
} }
} }

@ -1,13 +1,22 @@
//! Feeding the chain //! Feeding the chain
use super::*; use super::*;
pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>) const FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..; //TODO: Add to config somehow
pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>, bounds: impl std::ops::RangeBounds<usize>) -> bool
{ {
let map = what.as_ref().split_whitespace() let map = what.as_ref().split_whitespace()
.filter(|word| !word.is_empty()) .filter(|word| !word.is_empty())
.map(|s| s.to_owned()).collect::<Vec<_>>(); .map(|s| s.to_owned()).collect::<Vec<_>>();
if map.len() > 0 { debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
if bounds.contains(&map.len()) {
chain.feed(map); chain.feed(map);
true
}
else {
debug!("Ignoring feed of invalid length {}", map.len());
false
} }
} }
@ -38,10 +47,10 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
cfg_if! { cfg_if! {
if #[cfg(feature="split-newlines")] { if #[cfg(feature="split-newlines")] {
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) { for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
feed(&mut chain, buffer); feed(&mut chain, buffer, FEED_BOUNDS);
} }
} else { } else {
feed(&mut chain, buffer); feed(&mut chain, buffer, FEED_BOUNDS);
} }
} }
@ -59,7 +68,7 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
#[cfg(not(feature="hog-buffer"))] #[cfg(not(feature="hog-buffer"))]
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right? let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
feed(&mut chain, line); feed(&mut chain, line, FEED_BOUNDS);
info!("{} -> {:?}", who, line); info!("{} -> {:?}", who, line);
} }
written+=line.len(); written+=line.len();

@ -140,7 +140,7 @@ async fn main() {
async move { async move {
feed::full(&host, state, buf).await feed::full(&host, state, buf).await
.map(|_| warp::reply::with_status(warp::reply(), status!(201))) .map(|_| warp::reply::with_status(warp::reply(), status!(201)))
.map_err(warp::reject::custom) .map_err(|_| warp::reject::not_found()) //(warp::reject::custom) //TODO: Recover rejection filter down below for custom error return
} }
}) })
.with(warp::log("markov::put")); .with(warp::log("markov::put"));

Loading…
Cancel
Save