diff --git a/Cargo.toml b/Cargo.toml index 91e2b5c..3e74899 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ default = ["compress-chain"] # Compress the chain data file when saved to disk compress-chain = ["async-compression"] +# Treat each new line as a new set to feed instead of feeding the whole data at once +split-newlines = [] + # Enable the /api/ route api = [] diff --git a/Makefile b/Makefile index 32c7e43..892a3a0 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ reinstall: uninstall rm -f /var/nginx/markov.dat rc-service markov start sleep 0.2 - curl -X PUT -d @default http://127.0.0.1:8001/put + curl -X PUT --data-binary @default http://127.0.0.1:8001/put uninstall: -rc-service markov stop diff --git a/src/api.rs b/src/api.rs index 95b1415..54e36ba 100644 --- a/src/api.rs +++ b/src/api.rs @@ -43,7 +43,15 @@ async fn single_stream(host: IpAddr, num: Option, body: impl Buf) -> Resu info!("{} <- {:?}", host, &body[..]); let mut chain = Chain::new(); - feed::feed(&mut chain, body); + cfg_if!{ + if #[cfg(feature="split-newlines")] { + for body in body.split('\n').filter(|line| !line.trim().is_empty()) { + feed::feed(&mut chain, body); + } + }else { + feed::feed(&mut chain, body); + } + } match num { None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()), Some(num) => { diff --git a/src/feed.rs b/src/feed.rs index 2b90da3..4b2281a 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -3,34 +3,43 @@ use super::*; pub fn feed(chain: &mut Chain, what: impl AsRef) { - chain.feed(what.as_ref().split_whitespace() - .filter(|word| !word.is_empty()) - .map(|s| s.to_owned()).collect::>()); + let map = what.as_ref().split_whitespace() + .filter(|word| !word.is_empty()) + .map(|s| s.to_owned()).collect::>(); + if map.len() > 0 { + chain.feed(map); + } } pub async fn full(who: &IpAddr, state: State, mut body: impl Unpin + Stream>) -> Result { let mut buffer = Vec::new(); let mut written = 0usize; + //TODO: Change to pushing lines to mpsc channel, instead of manually aggregating. while let Some(buf) = body.next().await { let mut body = buf.map_err(|_| FillBodyError)?; while body.has_remaining() { if body.bytes().len() > 0 { - buffer.extend_from_slice(body.bytes()); // XXX: what the fuck is wrong with this??? why it eat spaces???? + buffer.extend_from_slice(body.bytes()); let cnt = body.bytes().len(); body.advance(cnt); written += cnt; } } } - if !buffer.ends_with(&[b'\n']) { // probably useless eh? - buffer.push(b'\n'); - } let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?; info!("{} -> {:?}", who, buffer); let mut chain = state.chain().write().await; - feed(&mut chain, buffer); + cfg_if! { + if #[cfg(feature="split-newlines")] { + for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) { + feed(&mut chain, buffer); + } + } else { + feed(&mut chain, buffer); + } + } state.notify_save(); Ok(written) }