//! Feeding the chain
use super ::* ;
#[ cfg(any(feature= " feed-sentance " , feature= " split-sentance " )) ]
use sanitise ::Sentance ;
const FEED_BOUNDS : std ::ops ::RangeFrom < usize > = 2 .. ; //TODO: Add to config somehow
/// Feed `what` into `chain`, at least `bounds` tokens.
///
/// # Tokenising
/// How the tokens are split within this function that operates on single buffers is determined largely by the features `split-sentance` and `feed-sentance` determining the use of the sentance API.
///
/// ## Pipeline
/// Since this is called on single buffers, it happens after the `split-newlines` tokenising if it's enabled, and thus the sentance API is only able to operate on each seperate line if that feature is enabled, regardless of `always-aggre`, or `feed-sentance` or `split-sentance`.
/// This is the pipeline for just within this function, after splitting through newlines if enabled.
///
/// * `feed-sentance`
/// ** Feed the buffer through the sentance split tokeniser
/// ** Feed the sentances through the word split tokeniser
/// ** Feed each collection of words into the chain seperately
/// * `split-sentance`
/// ** Feed the buffer through the sentance split tokeniser
/// ** Feed the sentances through the word split tokeniser
/// ** Feed the flattened collection into the chain once, concatenated.
/// * Neither
/// ** Feed the buffer through the word split tokeniser
/// ** Feed the collection into the chain
pub fn feed ( chain : & mut Chain < String > , what : impl AsRef < str > , bounds : impl std ::ops ::RangeBounds < usize > )
{
cfg_if ! {
if #[ cfg(feature= " feed-sentance " ) ] {
let map = Sentance ::new_iter ( & what ) //get each sentance in string
. map ( | what | what . words ( )
. map ( | s | s . to_owned ( ) ) . collect ::< Vec < _ > > ( ) ) ;
debug_assert! ( ! bounds . contains ( & 0 ) , "Cannot allow 0 size feeds" ) ;
for map in map { // feed each sentance seperately
if bounds . contains ( & map . len ( ) ) {
chain . feed ( map ) ;
}
else {
debug ! ( "Ignoring feed of invalid length {}" , map . len ( ) ) ;
}
}
} else {
cfg_if ! {
if #[ cfg(feature= " split-sentance " ) ] {
let map = Sentance ::new_iter ( & what ) //get each sentance in string
. map ( | what | what . words ( ) )
. flatten ( ) // add all into one buffer
. map ( | s | s . to_owned ( ) ) . collect ::< Vec < _ > > ( ) ;
} else {
let map : Vec < _ > = sanitise ::Word ::new_iter ( what . as_ref ( ) ) . map ( ToOwned ::to_owned )
. collect ( ) ;
}
}
debug_assert! ( ! bounds . contains ( & 0 ) , "Cannot allow 0 size feeds" ) ;
if bounds . contains ( & map . len ( ) ) {
chain . feed ( map ) ;
}
else {
debug ! ( "Ignoring feed of invalid length {}" , map . len ( ) ) ;
}
}
}
}
pub async fn full ( who : & IpAddr , state : State , body : impl Unpin + Stream < Item = Result < impl Buf , impl std ::error ::Error + ' static > > ) -> Result < usize , FillBodyError > {
let mut written = 0 usize ;
if_debug ! {
let timer = std ::time ::Instant ::now ( ) ;
}
macro_rules! feed {
( $chain :expr , $buffer :ident , $bounds :expr ) = > {
{
let buffer = $buffer ;
feed ( $chain , & buffer , $bounds )
}
}
}
cfg_if ! {
if #[ cfg(any(not(feature= " split-newlines " ), feature= " always-aggregate " )) ] {
let mut body = body ;
let mut buffer = Vec ::new ( ) ;
while let Some ( buf ) = body . next ( ) . await {
let mut body = buf . map_err ( | _ | FillBodyError ) ? ;
while body . has_remaining ( ) {
if body . bytes ( ) . len ( ) > 0 {
buffer . extend_from_slice ( body . bytes ( ) ) ;
let cnt = body . bytes ( ) . len ( ) ;
body . advance ( cnt ) ;
written + = cnt ;
}
}
}
let buffer = std ::str ::from_utf8 ( & buffer [ .. ] ) . map_err ( | _ | FillBodyError ) ? ;
let buffer = state . inbound_filter ( ) . filter_cow ( buffer ) ;
info ! ( "{} -> {:?}" , who , buffer ) ;
let mut chain = state . chain ( ) . write ( ) . await ;
cfg_if ! {
if #[ cfg(feature= " split-newlines " ) ] {
for buffer in buffer . split ( '\n' ) . filter ( | line | ! line . trim ( ) . is_empty ( ) ) {
feed ! ( & mut chain , buffer , FEED_BOUNDS ) ;
}
} else {
feed ! ( & mut chain , buffer , FEED_BOUNDS ) ;
}
}
} else {
use tokio ::prelude ::* ;
let reader = chunking ::StreamReader ::new ( body . filter_map ( | x | x . map ( | mut x | x . to_bytes ( ) ) . ok ( ) ) ) ;
let mut lines = reader . lines ( ) ;
#[ cfg(feature= " hog-buffer " ) ]
let mut chain = state . chain ( ) . write ( ) . await ;
while let Some ( line ) = lines . next_line ( ) . await . map_err ( | _ | FillBodyError ) ? {
let line = state . inbound_filter ( ) . filter_cow ( & line ) ;
let line = line . trim ( ) ;
if ! line . is_empty ( ) {
#[ cfg(not(feature= " hog-buffer " )) ]
let mut chain = state . chain ( ) . write ( ) . await ; // Acquire mutex once per line? Is this right?
feed ! ( & mut chain , line , FEED_BOUNDS ) ;
info ! ( "{} -> {:?}" , who , line ) ;
}
written + = line . len ( ) ;
}
}
}
if_debug ! {
trace ! ( "Write took {}ms" , timer . elapsed ( ) . as_millis ( ) ) ;
}
state . notify_save ( ) ;
Ok ( written )
}
#[ derive(Debug) ]
pub struct FillBodyError ;
impl error ::Error for FillBodyError { }
impl warp ::reject ::Reject for FillBodyError { }
impl fmt ::Display for FillBodyError
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
write! ( f , "failed to feed chain with this data" )
}
}