You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
genmarkov/src/feed.rs

121 lines
3.4 KiB

4 years ago
//! Feeding the chain
use super::*;
4 years ago
use sanitise::Sentance;
4 years ago
4 years ago
const FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..; //TODO: Add to config somehow
4 years ago
pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>, bounds: impl std::ops::RangeBounds<usize>)
4 years ago
{
4 years ago
cfg_if! {
if #[cfg(feature="split-sentance")] {
let map = Sentance::new_iter(&what) //get each sentance in string
.map(|what| what.words()
4 years ago
.filter(|word| !word.is_empty())
.map(|s| s.to_owned()).collect::<Vec<_>>());
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
for map in map {// feed each sentance seperately
if bounds.contains(&map.len()) {
chain.feed(map);
}
else {
debug!("Ignoring feed of invalid length {}", map.len());
}
}
} else {
let map = Sentance::new_iter(&what) //get each sentance in string
.map(|what| what.words()
4 years ago
.filter(|word| !word.is_empty()))
.flatten() // add all into one buffer
.map(|s| s.to_owned()).collect::<Vec<_>>();
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
if bounds.contains(&map.len()) {
chain.feed(map);
}
else {
debug!("Ignoring feed of invalid length {}", map.len());
}
}
4 years ago
}
4 years ago
}
pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = Result<impl Buf, impl std::error::Error + 'static>>) -> Result<usize, FillBodyError> {
4 years ago
let mut written = 0usize;
if_debug! {
let timer = std::time::Instant::now();
4 years ago
}
cfg_if!{
if #[cfg(any(not(feature="split-newlines"), feature="always-aggregate"))] {
let mut body = body;
let mut buffer = Vec::new();
while let Some(buf) = body.next().await {
let mut body = buf.map_err(|_| FillBodyError)?;
while body.has_remaining() {
if body.bytes().len() > 0 {
buffer.extend_from_slice(body.bytes());
let cnt = body.bytes().len();
body.advance(cnt);
written += cnt;
}
}
}
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await;
cfg_if! {
if #[cfg(feature="split-newlines")] {
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
4 years ago
feed(&mut chain, buffer, FEED_BOUNDS);
}
} else {
4 years ago
feed(&mut chain, buffer, FEED_BOUNDS);
}
4 years ago
}
} else {
use tokio::prelude::*;
4 years ago
let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok()));
let mut lines = reader.lines();
#[cfg(feature="hog-buffer")]
let mut chain = state.chain().write().await;
while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? {
let line = line.trim();
if !line.is_empty() {
#[cfg(not(feature="hog-buffer"))]
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
4 years ago
feed(&mut chain, line, FEED_BOUNDS);
info!("{} -> {:?}", who, line);
}
written+=line.len();
}
4 years ago
}
}
if_debug!{
trace!("Write took {}ms", timer.elapsed().as_millis());
}
4 years ago
state.notify_save();
Ok(written)
4 years ago
}
#[derive(Debug)]
pub struct FillBodyError;
impl error::Error for FillBodyError{}
impl warp::reject::Reject for FillBodyError{}
impl fmt::Display for FillBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to feed chain with this data")
}
}