added /sentance/ chunking

serve
Avril 4 years ago
parent 31ee30db9e
commit 633466b901
Signed by: flanchan
GPG Key ID: 284488987C31F630

5
Cargo.lock generated

@ -650,6 +650,7 @@ dependencies = [
"log", "log",
"lzzzz", "lzzzz",
"markov 1.1.0", "markov 1.1.0",
"once_cell",
"pin-project", "pin-project",
"pretty_env_logger", "pretty_env_logger",
"serde", "serde",
@ -1269,9 +1270,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]] [[package]]
name = "smallmap" name = "smallmap"
version = "1.1.3" version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2adda73259bbc3ff84f711425ebfb8c90e9dd32a12b05c6528dd49244ea8230f" checksum = "97ce78b988fb0df3b438d106942c0c2438849ecf40e3418af55044f96d27514d"
dependencies = [ dependencies = [
"rustc_version", "rustc_version",
] ]

@ -1,6 +1,6 @@
[package] [package]
name = "markov" name = "markov"
version = "0.5.4" version = "0.6"
description = "Generate string of text from Markov chain fed by stdin" description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"
@ -63,5 +63,6 @@ toml = "0.5.6"
async-compression = {version = "0.3.5", features=["tokio-02", "bzip2"], optional=true} async-compression = {version = "0.3.5", features=["tokio-02", "bzip2"], optional=true}
pin-project = "0.4.26" pin-project = "0.4.26"
libc = "0.2.79" libc = "0.2.79"
smallmap = "1.1.3" smallmap = "1.1.5"
lazy_static = "1.4.0" lazy_static = "1.4.0"
once_cell = "1.4.1"

@ -2,5 +2,8 @@ bindpoint = '127.0.0.1:8001'
file = 'chain.dat' file = 'chain.dat'
max_content_length = 4194304 max_content_length = 4194304
max_gen_size = 256 max_gen_size = 256
#save_interval_secs = 15 #save_interval_secs = 2
trust_x_forwarded_for = false trust_x_forwarded_for = false
[filter]
exclude = "<>)([]/"

@ -1,2 +0,0 @@
//! /sentance/
use super::*;

@ -25,6 +25,28 @@ pub struct Config
pub max_gen_size: usize, pub max_gen_size: usize,
pub save_interval_secs: Option<NonZeroU64>, pub save_interval_secs: Option<NonZeroU64>,
pub trust_x_forwarded_for: bool, pub trust_x_forwarded_for: bool,
#[serde(default)]
pub filter: FilterConfig,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct FilterConfig
{
exclude: String,
}
impl FilterConfig
{
pub fn get_filter(&self) -> sanitise::filter::Filter
{
let filt: sanitise::filter::Filter = self.exclude.parse().unwrap();
if !filt.is_empty()
{
warn!("Loaded exclude filter: {:?}", filt.iter().collect::<String>());
}
filt
}
} }
impl Default for Config impl Default for Config
@ -39,6 +61,7 @@ impl Default for Config
max_gen_size: 256, max_gen_size: 256,
save_interval_secs: Some(unsafe{NonZeroU64::new_unchecked(2)}), save_interval_secs: Some(unsafe{NonZeroU64::new_unchecked(2)}),
trust_x_forwarded_for: false, trust_x_forwarded_for: false,
filter: Default::default(),
} }
} }
} }

@ -0,0 +1,27 @@
//! Extensions
use std::{
iter,
};
pub trait StringJoinExt: Sized
{
fn join<P: AsRef<str>>(self, sep: P) -> String;
}
impl<I,T> StringJoinExt for I
where I: IntoIterator<Item=T>,
T: AsRef<str>
{
fn join<P: AsRef<str>>(self, sep: P) -> String
{
let mut string = String::new();
for (first, s) in iter::successors(Some(true), |_| Some(false)).zip(self.into_iter())
{
if !first {
string.push_str(sep.as_ref());
}
string.push_str(s.as_ref());
}
string
}
}

@ -44,6 +44,15 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
if_debug! { if_debug! {
let timer = std::time::Instant::now(); let timer = std::time::Instant::now();
} }
macro_rules! feed {
($chain:expr, $buffer:ident, $bounds:expr) => {
{
let buffer = $buffer;
feed($chain, &buffer, $bounds)
}
}
}
cfg_if!{ cfg_if!{
if #[cfg(any(not(feature="split-newlines"), feature="always-aggregate"))] { if #[cfg(any(not(feature="split-newlines"), feature="always-aggregate"))] {
let mut body = body; let mut body = body;
@ -60,15 +69,17 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
} }
} }
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?; let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
let buffer = state.filter().filter_cow(buffer);
info!("{} -> {:?}", who, buffer); info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await; let mut chain = state.chain().write().await;
cfg_if! { cfg_if! {
if #[cfg(feature="split-newlines")] { if #[cfg(feature="split-newlines")] {
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) { for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
feed(&mut chain, buffer, FEED_BOUNDS); feed!(&mut chain, buffer, FEED_BOUNDS);
} }
} else { } else {
feed(&mut chain, buffer, FEED_BOUNDS); feed!(&mut chain, buffer, FEED_BOUNDS);
} }
} }
@ -81,12 +92,13 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
#[cfg(feature="hog-buffer")] #[cfg(feature="hog-buffer")]
let mut chain = state.chain().write().await; let mut chain = state.chain().write().await;
while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? { while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? {
let line = state.filter().filter_cow(&line);
let line = line.trim(); let line = line.trim();
if !line.is_empty() { if !line.is_empty() {
#[cfg(not(feature="hog-buffer"))] #[cfg(not(feature="hog-buffer"))]
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right? let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
feed(&mut chain, line, FEED_BOUNDS); feed!(&mut chain, line, FEED_BOUNDS);
info!("{} -> {:?}", who, line); info!("{} -> {:?}", who, line);
} }
written+=line.len(); written+=line.len();

@ -2,7 +2,7 @@
use super::*; use super::*;
#[derive(Debug)] #[derive(Debug)]
pub struct GenBodyError(String); pub struct GenBodyError(pub String);
impl error::Error for GenBodyError{} impl error::Error for GenBodyError{}
impl fmt::Display for GenBodyError impl fmt::Display for GenBodyError

@ -1,4 +1,5 @@
#![feature(split_inclusive)] #![feature(split_inclusive)]
#![feature(min_const_generics)]
#![allow(dead_code)] #![allow(dead_code)]
@ -60,6 +61,9 @@ macro_rules! status {
}; };
} }
mod ext;
use ext::*;
mod util;
mod sanitise; mod sanitise;
mod bytes; mod bytes;
mod chunking; mod chunking;
@ -76,6 +80,7 @@ use forwarded_list::XForwardedFor;
mod feed; mod feed;
mod gen; mod gen;
mod sentance;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -135,8 +140,8 @@ async fn main() {
}; };
let push = warp::put() let push = warp::put()
.and(chain.clone())
.and(warp::path("put")) .and(warp::path("put"))
.and(chain.clone())
.and(client_ip.clone()) .and(client_ip.clone())
.and(warp::body::content_length_limit(state.config().max_content_length)) .and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::stream()) .and(warp::body::stream())
@ -173,9 +178,6 @@ async fn main() {
.and_then(api::single) .and_then(api::single)
.with(warp::log("markov::api::single")) .with(warp::log("markov::api::single"))
}; };
let sentance = warp::post()
.and(warp::path("sentance")); //TODO: sanitise::Sentance::new_iter the body line
warp::path("api") warp::path("api")
.and(single) .and(single)
.recover(api::error::rejection) .recover(api::error::rejection)
@ -183,11 +185,12 @@ async fn main() {
} }
} }
let read = warp::get() let read = warp::get()
.and(chain.clone()) .and(chain.clone())
.and(warp::path("get"))
.and(client_ip.clone()) .and(client_ip.clone())
.and(warp::path::param().map(|opt: usize| Some(opt)).or(warp::any().map(|| Option::<usize>::None)).unify()) .and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| { .and_then(|state: State, host: IpAddr, num: Option<usize>| {
async move { async move {
let (tx, rx) = mpsc::channel(state.config().max_gen_size); let (tx, rx) = mpsc::channel(state.config().max_gen_size);
@ -199,7 +202,33 @@ async fn main() {
} }
}) })
.with(warp::log("markov::read")); .with(warp::log("markov::read"));
let sentance = warp::get()
.and(warp::path("sentance")) //TODO: sanitise::Sentance::new_iter the body line
.and(chain.clone())
.and(client_ip.clone())
.and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| {
async move {
let (tx, rx) = mpsc::channel(state.config().max_gen_size);
tokio::spawn(sentance::body(state, num, tx));
Ok::<_, std::convert::Infallible>(Response::new(Body::wrap_stream(rx.map(move |mut x| {
info!("{} (sentance) <- {:?}", host, x);
// match x.chars().last() {
// Some(chr) if sanitise::is_sentance_boundary(chr) => {
// x.push(' ');
// },
// _ => (),
// }
x.push(' ');
Ok::<_, std::convert::Infallible>(x)
}))))
}
})
.with(warp::log("markov::read::sentance"));
let read = warp::path("get").and(read.or(sentance));
#[cfg(feature="api")] #[cfg(feature="api")]
let read = read.or(api); let read = read.or(api);

@ -0,0 +1 @@
avril@eientei.880:1602382403

@ -0,0 +1,260 @@
//! Filter out characters and such
use smallmap::Map as SmallMap;
use std::{
borrow::Cow,
fmt,
iter::{
self,
FromIterator,
},
str,
};
use once_cell::sync::OnceCell;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Filter(SmallMap<char, ()>);
impl<const N: usize> From<[char; N]> for Filter
{
fn from(from: [char; N]) -> Self
{
let mut map = SmallMap::with_capacity(1 + (N / 256));
for &chr in from.iter()
{
map.insert(chr, ());
}
Self(map)
}
}
impl<'a> From<&'a [char]> for Filter
{
fn from(from: &'a [char]) -> Self
{
let mut map = SmallMap::new();
for &chr in from.iter()
{
map.insert(chr, ());
}
Self(map)
}
}
impl<'a> From<&'a str> for Filter
{
fn from(from: &'a str) -> Self
{
let mut output = Self::new();
output.insert(from.chars());
output
}
}
impl str::FromStr for Filter
{
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self::from(s))
}
}
impl fmt::Display for Filter
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
use std::fmt::Write;
for chr in self.iter()
{
f.write_char(chr)?;
}
Ok(())
}
}
pub struct FilterKeyIter<'a>(smallmap::iter::Iter<'a, char, ()>, usize);
impl<'a> Iterator for FilterKeyIter<'a>
{
type Item = char;
fn next(&mut self) -> Option<Self::Item>
{
self.0.next().map(|&(x, _)| x)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.1, Some(self.1))
}
}
impl<'a> iter::FusedIterator for FilterKeyIter<'a>{}
impl<'a> iter::ExactSizeIterator for FilterKeyIter<'a>{}
impl Filter
{
pub fn new() -> Self
{
Self(SmallMap::new())
}
pub fn insert<I: IntoIterator<Item=char>>(&mut self, from: I)
{
for from in from.into_iter()
{
self.0.insert(from, ());
}
}
pub fn remove<I: IntoIterator<Item=char>>(&mut self, from: I)
{
for from in from.into_iter()
{
self.0.remove(&from);
}
}
pub fn len(&self) -> usize
{
self.0.len()
}
pub fn is_empty(&self) -> bool
{
//TODO: impl this in smallmap itself
self.len() == 0
}
pub fn iter(&self) -> impl Iterator<Item=char> + '_
{
self.0.iter()
.copied()
.map(|(x, _)| x)
//FilterKeyIter(self.0.iter(), self.0.len())
}
/// Should this character be filtered?
#[inline] pub fn check(&self, chr: char) -> bool
{
self.0.get(&chr).is_some()
}
pub fn filter<'a, I: IntoIterator<Item=char>>(&'a self, from_iter: I) -> FilterIter<'a, I::IntoIter>
where I::IntoIter: 'a
{
FilterIter(&self, from_iter.into_iter().fuse())
}
pub fn filter_cow<'a>(&self, string: &'a (impl AsRef<str> + 'a + ?Sized)) -> Cow<'a, str>
{
let string = string.as_ref();
if self.is_empty() {
return Cow::Borrowed(string);
}
let mut output = Cow::Borrowed(string);
let mut i=0;
for chr in string.chars()
{
if self.check(chr) {
output.to_mut().remove(i);
} else {
i+=1;
}
}
output
}
pub fn filter_str<'a, T: AsRef<str>+'a>(&'a self, string: &'a T) -> FilterStr<'a>
{
FilterStr(string.as_ref(), self, OnceCell::new())
}
}
impl FromIterator<char> for Filter
{
fn from_iter<I: IntoIterator<Item=char>>(iter: I) -> Self
{
let mut output= Self::new();
output.insert(iter);
output
}
}
impl<'a> FilterStr<'a>
{
pub fn as_str(&self) -> &str
{
fn fmt(this: &FilterStr<'_>) -> String
{
let chars = this.0.chars();
let mut f: String = crate::util::hint_cap(&chars);
for chr in chars {
if !this.1.check(chr) {
f.push(chr);
}
}
f
}
&self.2.get_or_init(|| fmt(&self))[..]
}
}
pub struct FilterStr<'a>(&'a str, &'a Filter, OnceCell<String>);
impl<'a> fmt::Display for FilterStr<'a>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.as_str())
}
}
impl<'a> FilterStr<'a>
{
pub fn filter(&self) -> &Filter
{
&self.1
}
}
pub struct FilterIter<'a, I>(&'a Filter, iter::Fuse<I>);
impl<'a, I: Iterator<Item=char>> Iterator for FilterIter<'a, I>
{
type Item = char;
fn next(&mut self) -> Option<Self::Item>
{
loop {
break match self.1.next() {
Some(chr) if !self.0.check(chr) => Some(chr),
None => None,
_ => continue,
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (_, high) = self.1.size_hint();
(0, high)
}
}
impl<'a, I> FilterIter<'a, I>
{
pub fn filter(&self) -> &Filter
{
self.0
}
}
impl<'a, I: Iterator<Item=char>> iter::FusedIterator for FilterIter<'a, I>{}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn filter_cow()
{
let filter: Filter = " hi".chars().collect();
let string = "abcdef ghi jk1\nhian";
assert_eq!(filter.filter_str(&string).to_string(), filter.filter_cow(&string).to_string());
assert_eq!(filter.filter_cow(&string).to_string(), filter.filter(string.chars()).collect::<String>());
}
}

@ -8,6 +8,9 @@ mod sentance;
pub use sentance::*; pub use sentance::*;
mod word; mod word;
pub use word::*; pub use word::*;
pub mod filter;
/* /*
pub fn take_sentance<T: AsyncBufRead+ ?Sized + Unpin, U: AsyncWrite + ?Sized + Unpin>(from: &mut T, to: &mut U) -> Result<usize, Error> pub fn take_sentance<T: AsyncBufRead+ ?Sized + Unpin, U: AsyncWrite + ?Sized + Unpin>(from: &mut T, to: &mut U) -> Result<usize, Error>
{ {

@ -0,0 +1,25 @@
//! /sentance/
use super::*;
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), gen::GenBodyError>
{
let string = {
let chain = state.chain().read().await;
if chain.is_empty() {
return Ok(());
}
match num {
None => chain.generate_str(),
Some(num) => (0..num).map(|_| chain.generate_str()).join("\n"),
}
};
debug!("Taking {:?} from {:?}" ,num, string);
for sen in sanitise::Sentance::new_iter(&string).take(num.unwrap_or(1))
{
output.send(sen.to_owned()).await.map_err(|e| gen::GenBodyError(e.0))?;
}
Ok(())
}

@ -11,6 +11,7 @@ use config::Config;
pub struct State pub struct State
{ {
config: Arc<Config>, //to avoid cloning config config: Arc<Config>, //to avoid cloning config
exclude: Arc<sanitise::filter::Filter>,
chain: Arc<RwLock<Chain<String>>>, chain: Arc<RwLock<Chain<String>>>,
save: Arc<Notify>, save: Arc<Notify>,
@ -20,11 +21,17 @@ pub struct State
impl State impl State
{ {
pub fn filter(&self) -> &sanitise::filter::Filter
{
&self.exclude
}
pub fn new(config: Config, chain: Arc<RwLock<Chain<String>>>, save: Arc<Notify>) -> Self pub fn new(config: Config, chain: Arc<RwLock<Chain<String>>>, save: Arc<Notify>) -> Self
{ {
let (shutdown, shutdown_recv) = watch::channel(false); let (shutdown, shutdown_recv) = watch::channel(false);
Self { Self {
config: Arc::new(config), exclude: Arc::new(config.filter.get_filter()),
config: Arc::new(config),
chain, chain,
save, save,
shutdown: Arc::new(shutdown), shutdown: Arc::new(shutdown),

@ -0,0 +1,41 @@
//! Utils
pub trait NewCapacity: Sized
{
fn new() -> Self;
fn with_capacity(cap: usize) -> Self;
}
impl NewCapacity for String
{
fn new() -> Self
{
Self::new()
}
fn with_capacity(cap: usize) -> Self
{
Self::with_capacity(cap)
}
}
impl<T> NewCapacity for Vec<T>
{
fn new() -> Self
{
Self::new()
}
fn with_capacity(cap: usize) -> Self
{
Self::with_capacity(cap)
}
}
pub fn hint_cap<T: NewCapacity, I: Iterator>(iter: &I) -> T
{
match iter.size_hint() {
(0, Some(0)) | (0, None) => T::new(),
(_, Some(x)) | (x, _) => T::with_capacity(x)
}
}
Loading…
Cancel
Save