//! Feeding the chain use super::*; #[cfg(any(feature="feed-sentance", feature="split-sentance"))] use sanitise::Sentance; #[allow(unused_imports)] use futures::stream; pub const DEFAULT_FEED_BOUNDS: std::ops::RangeFrom = 2..; /// Feed `what` into `chain`, at least `bounds` tokens. /// /// # Tokenising /// How the tokens are split within this function that operates on single buffers is determined largely by the features `split-sentance` and `feed-sentance` determining the use of the sentance API. /// /// ## Pipeline /// Since this is called on single buffers, it happens after the `split-newlines` tokenising if it's enabled, and thus the sentance API is only able to operate on each seperate line if that feature is enabled, regardless of `always-aggre`, or `feed-sentance` or `split-sentance`. /// This is the pipeline for just within this function, after splitting through newlines if enabled. /// /// * `feed-sentance` /// ** Feed the buffer through the sentance split tokeniser /// ** Feed the sentances through the word split tokeniser /// ** Feed each collection of words into the chain seperately /// * `split-sentance` /// ** Feed the buffer through the sentance split tokeniser /// ** Feed the sentances through the word split tokeniser /// ** Feed the flattened collection into the chain once, concatenated. /// * Neither /// ** Feed the buffer through the word split tokeniser /// ** Feed the collection into the chain pub fn feed(chain: &mut Chain, what: impl AsRef, bounds: impl std::ops::RangeBounds) { cfg_if! { if #[cfg(feature="feed-sentance")] { let map = Sentance::new_iter(&what) //get each sentance in string .map(|what| what.words() .map(|s| s.to_owned()).collect::>()); debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds"); for map in map {// feed each sentance seperately if bounds.contains(&map.len()) { debug!("Feeding chain {} items", map.len()); chain.feed(map); } else { debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map); } } } else { cfg_if!{ if #[cfg(feature="split-sentance")] { let map = Sentance::new_iter(&what) //get each sentance in string .map(|what| what.words()) .flatten() // add all into one buffer .map(|s| s.to_owned()).collect::>(); } else { let map: Vec<_> = sanitise::words(what.as_ref()).map(ToOwned::to_owned) .collect(); } } debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds"); if bounds.contains(&map.len()) { //debug!("Feeding chain {} items", map.len()); chain.feed(map); } else { debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map); } } } } pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream>) -> Result { let mut written = 0usize; if_debug! { let timer = std::time::Instant::now(); } //let bounds = &state.config_cache().feed_bounds; macro_rules! feed { ($buffer:expr) => { { let buffer = $buffer; state.chain_write(buffer).await.map_err(|_| FillBodyError)?; } } } cfg_if!{ if #[cfg(any(not(feature="split-newlines"), feature="always-aggregate"))] { let mut body = body; let mut buffer = Vec::new(); while let Some(buf) = body.next().await { let mut body = buf.map_err(|_| FillBodyError)?; while body.has_remaining() { if body.bytes().len() > 0 { buffer.extend_from_slice(body.bytes()); let cnt = body.bytes().len(); body.advance(cnt); written += cnt; } } } let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?; let buffer = state.inbound_filter().filter_cow(buffer); info!("{} -> {:?}", who, buffer); cfg_if! { if #[cfg(feature="split-newlines")] { feed!(stream::iter(buffer.split('\n').filter(|line| !line.trim().is_empty()) .map(|x| x.to_owned()))) } else { feed!(stream::once(async move{buffer.into_owned()})); } } } else { use tokio::prelude::*; let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok())); let lines = reader.lines(); feed!(lines.filter_map(|x| x.ok().and_then(|line| { let line = state.inbound_filter().filter_cow(&line); let line = line.trim(); if !line.is_empty() { //#[cfg(not(feature="hog-buffer"))] //let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right? info!("{} -> {:?}", who, line); written+=line.len(); Some(line.to_owned()) } else { None } }))); } } if_debug! { trace!("Write took {}ms", timer.elapsed().as_millis()); } Ok(written) } #[derive(Debug)] pub struct FillBodyError; impl error::Error for FillBodyError{} impl warp::reject::Reject for FillBodyError{} impl fmt::Display for FillBodyError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to feed chain with this data") } }