You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
5.1 KiB
160 lines
5.1 KiB
//! Feeding the chain
|
|
use super::*;
|
|
#[cfg(any(feature="feed-sentance", feature="split-sentance"))]
|
|
use sanitise::Sentance;
|
|
|
|
pub const DEFAULT_FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..; //TODO: Add to config somehow
|
|
|
|
/// Feed `what` into `chain`, at least `bounds` tokens.
|
|
///
|
|
/// # Tokenising
|
|
/// How the tokens are split within this function that operates on single buffers is determined largely by the features `split-sentance` and `feed-sentance` determining the use of the sentance API.
|
|
///
|
|
/// ## Pipeline
|
|
/// Since this is called on single buffers, it happens after the `split-newlines` tokenising if it's enabled, and thus the sentance API is only able to operate on each seperate line if that feature is enabled, regardless of `always-aggre`, or `feed-sentance` or `split-sentance`.
|
|
/// This is the pipeline for just within this function, after splitting through newlines if enabled.
|
|
///
|
|
/// * `feed-sentance`
|
|
/// ** Feed the buffer through the sentance split tokeniser
|
|
/// ** Feed the sentances through the word split tokeniser
|
|
/// ** Feed each collection of words into the chain seperately
|
|
/// * `split-sentance`
|
|
/// ** Feed the buffer through the sentance split tokeniser
|
|
/// ** Feed the sentances through the word split tokeniser
|
|
/// ** Feed the flattened collection into the chain once, concatenated.
|
|
/// * Neither
|
|
/// ** Feed the buffer through the word split tokeniser
|
|
/// ** Feed the collection into the chain
|
|
pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>, bounds: impl std::ops::RangeBounds<usize>)
|
|
{
|
|
cfg_if! {
|
|
if #[cfg(feature="feed-sentance")] {
|
|
let map = Sentance::new_iter(&what) //get each sentance in string
|
|
.map(|what| what.words()
|
|
.map(|s| s.to_owned()).collect::<Vec<_>>());
|
|
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
|
|
for map in map {// feed each sentance seperately
|
|
if bounds.contains(&map.len()) {
|
|
debug!("Feeding chain {} items", map.len());
|
|
chain.feed(map);
|
|
}
|
|
else {
|
|
debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map);
|
|
}
|
|
}
|
|
} else {
|
|
cfg_if!{
|
|
if #[cfg(feature="split-sentance")] {
|
|
let map = Sentance::new_iter(&what) //get each sentance in string
|
|
.map(|what| what.words())
|
|
.flatten() // add all into one buffer
|
|
.map(|s| s.to_owned()).collect::<Vec<_>>();
|
|
} else {
|
|
let map: Vec<_> = sanitise::words(what.as_ref()).map(ToOwned::to_owned)
|
|
.collect();
|
|
}
|
|
}
|
|
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
|
|
if bounds.contains(&map.len()) {
|
|
debug!("Feeding chain {} items", map.len());
|
|
chain.feed(map);
|
|
}
|
|
else {
|
|
debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map);
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = Result<impl Buf, impl std::error::Error + 'static>>) -> Result<usize, FillBodyError> {
|
|
|
|
let mut written = 0usize;
|
|
if_debug! {
|
|
let timer = std::time::Instant::now();
|
|
}
|
|
let bounds = &state.config_cache().feed_bounds;
|
|
macro_rules! feed {
|
|
($chain:expr, $buffer:ident) => {
|
|
{
|
|
let buffer = $buffer;
|
|
feed($chain, &buffer, bounds)
|
|
}
|
|
}
|
|
}
|
|
|
|
cfg_if!{
|
|
if #[cfg(any(not(feature="split-newlines"), feature="always-aggregate"))] {
|
|
let mut body = body;
|
|
let mut buffer = Vec::new();
|
|
while let Some(buf) = body.next().await {
|
|
let mut body = buf.map_err(|_| FillBodyError)?;
|
|
while body.has_remaining() {
|
|
if body.bytes().len() > 0 {
|
|
buffer.extend_from_slice(body.bytes());
|
|
let cnt = body.bytes().len();
|
|
body.advance(cnt);
|
|
written += cnt;
|
|
}
|
|
}
|
|
}
|
|
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
|
|
let buffer = state.inbound_filter().filter_cow(buffer);
|
|
info!("{} -> {:?}", who, buffer);
|
|
let mut chain = state.chain().write().await;
|
|
cfg_if! {
|
|
if #[cfg(feature="split-newlines")] {
|
|
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
|
|
feed!(&mut chain, buffer);
|
|
}
|
|
} else {
|
|
feed!(&mut chain, buffer);
|
|
|
|
}
|
|
}
|
|
} else {
|
|
use tokio::prelude::*;
|
|
|
|
let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok()));
|
|
let mut lines = reader.lines();
|
|
|
|
#[cfg(feature="hog-buffer")]
|
|
let mut chain = state.chain().write().await;
|
|
while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? {
|
|
let line = state.inbound_filter().filter_cow(&line);
|
|
let line = line.trim();
|
|
if !line.is_empty() {
|
|
#[cfg(not(feature="hog-buffer"))]
|
|
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
|
|
|
|
feed!(&mut chain, line);
|
|
info!("{} -> {:?}", who, line);
|
|
}
|
|
written+=line.len();
|
|
}
|
|
}
|
|
}
|
|
|
|
if_debug!{
|
|
trace!("Write took {}ms", timer.elapsed().as_millis());
|
|
}
|
|
state.notify_save();
|
|
Ok(written)
|
|
|
|
}
|
|
|
|
|
|
#[derive(Debug)]
|
|
pub struct FillBodyError;
|
|
|
|
impl error::Error for FillBodyError{}
|
|
impl warp::reject::Reject for FillBodyError{}
|
|
impl fmt::Display for FillBodyError
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
write!(f, "failed to feed chain with this data")
|
|
}
|
|
}
|
|
|