start sentance, sanitise

serve
Avril 4 years ago
parent a4fd1ddfbb
commit 51a5d0aeba
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -8,7 +8,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["compress-chain", "split-newlines"] default = ["compress-chain", "split-newlines", "api"]
# Compress the chain data file when saved to disk # Compress the chain data file when saved to disk
compress-chain = ["async-compression"] compress-chain = ["async-compression"]

@ -2,5 +2,5 @@ bindpoint = '127.0.0.1:8001'
file = 'chain.dat' file = 'chain.dat'
max_content_length = 4194304 max_content_length = 4194304
max_gen_size = 256 max_gen_size = 256
#save_interval_secs = 2 #save_interval_secs = 15
trust_x_forwarded_for = false trust_x_forwarded_for = false

@ -1,99 +0,0 @@
//! For API calls if enabled
use super::*;
use std::{
fmt,
error,
iter,
convert::Infallible,
};
use futures::{
stream::{
self,
BoxStream,
StreamExt,
},
};
#[inline] fn aggregate(mut body: impl Buf) -> Result<String, std::str::Utf8Error>
{
/*let mut output = Vec::new();
while body.has_remaining() {
let bytes = body.bytes();
output.extend_from_slice(&bytes[..]);
let cnt = bytes.len();
body.advance(cnt);
}*/
std::str::from_utf8(&body.to_bytes()).map(ToOwned::to_owned)
}
pub async fn single(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<impl warp::Reply, warp::reject::Rejection>
{
single_stream(host, num, body).await
.map(|rx| Response::new(Body::wrap_stream(rx.map(move |x| {
info!("{} <- {:?}", host, x);
x
}))))
.map_err(warp::reject::custom)
}
//TODO: Change to stream impl like normal `feed` has, instead of taking aggregate?
async fn single_stream(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<BoxStream<'static, Result<String, Infallible>>, ApiError>
{
let body = aggregate(body)?;
info!("{} <- {:?}", host, &body[..]);
let mut chain = Chain::new();
if_debug! {
let timer = std::time::Instant::now();
}
cfg_if! {
if #[cfg(feature="split-newlines")] {
for body in body.split('\n').filter(|line| !line.trim().is_empty()) {
feed::feed(&mut chain, body, 1..);
}
}else {
feed::feed(&mut chain, body, 1..);
}
}
if_debug!{
trace!("Write took {}ms", timer.elapsed().as_millis());
}
match num {
None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()),
Some(num) => {
let (mut tx, rx) = mpsc::channel(num);
tokio::spawn(async move {
for string in chain.str_iter_for(num) {
tx.send(string).await.expect("Failed to send string to body");
}
});
Ok(StreamExt::map(rx, |x| Ok::<_, Infallible>(x)).boxed())
}
}
}
#[derive(Debug)]
pub enum ApiError {
Body,
}
impl warp::reject::Reject for ApiError{}
impl error::Error for ApiError{}
impl std::fmt::Display for ApiError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Body => write!(f, "invalid data in request body"),
}
}
}
impl From<std::str::Utf8Error> for ApiError
{
fn from(_: std::str::Utf8Error) -> Self
{
Self::Body
}
}

@ -0,0 +1,55 @@
//! API errors
//use super::*;
use std::{
error,
fmt,
};
use warp::{
Rejection,
Reply,
};
#[derive(Debug)]
pub enum ApiError {
Body,
}
impl ApiError
{
#[inline] fn error_code(&self) -> warp::http::StatusCode
{
status!(match self {
Self::Body => 422,
})
}
}
impl warp::reject::Reject for ApiError{}
impl error::Error for ApiError{}
impl std::fmt::Display for ApiError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Body => write!(f, "invalid data in request body"),
}
}
}
impl From<std::str::Utf8Error> for ApiError
{
fn from(_: std::str::Utf8Error) -> Self
{
Self::Body
}
}
// Handles API rejections
pub async fn rejection(err: Rejection) -> Result<impl Reply, Rejection>
{
if let Some(api) = err.find::<ApiError>() {
Ok(warp::reply::with_status(format!("ApiError: {}\n", api), api.error_code()))
} else {
Err(err)
}
}

@ -0,0 +1,32 @@
//! For API calls if enabled
use super::*;
use std::{
iter,
convert::Infallible,
};
use futures::{
stream::{
self,
BoxStream,
StreamExt,
},
};
pub mod error;
use error::ApiError;
mod single;
#[inline] fn aggregate(mut body: impl Buf) -> Result<String, std::str::Utf8Error>
{
std::str::from_utf8(&body.to_bytes()).map(ToOwned::to_owned)
}
pub async fn single(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<impl warp::Reply, warp::reject::Rejection>
{
single::single_stream(host, num, body).await
.map(|rx| Response::new(Body::wrap_stream(rx.map(move |x| {
info!("{} <- {:?}", host, x);
x
}))))
.map_err(warp::reject::custom)
}

@ -0,0 +1,2 @@
//! /sentance/
use super::*;

@ -0,0 +1,47 @@
//! Handler for /single/
use super::*;
//TODO: Change to stream impl like normal `feed` has, instead of taking aggregate?
pub async fn single_stream(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<BoxStream<'static, Result<String, Infallible>>, ApiError>
{
let body = aggregate(body)?;
info!("{} <- {:?}", host, &body[..]);
let mut chain = Chain::new();
if_debug! {
let timer = std::time::Instant::now();
}
cfg_if! {
if #[cfg(feature="split-newlines")] {
for body in body.split('\n').filter(|line| !line.trim().is_empty()) {
feed::feed(&mut chain, body, 1..);
}
}else {
feed::feed(&mut chain, body, 1..);
}
}
if_debug!{
trace!("Write took {}ms", timer.elapsed().as_millis());
}
if chain.is_empty() {
Ok(stream::empty().boxed())
} else {
match num {
None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()),
Some(num) => {
let (mut tx, rx) = mpsc::channel(num);
tokio::spawn(async move {
for string in chain.str_iter_for(num) {
if let Err(e) = tx.send(string).await {
error!("Failed to send string to body, aborting: {:?}", e.0);
break;
}
}
});
Ok(StreamExt::map(rx, |x| Ok::<_, Infallible>(x)).boxed())
}
}
}
}

@ -57,7 +57,7 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
} else { } else {
use tokio::prelude::*; use tokio::prelude::*;
let reader = chunking::StreamReader::new(body.map(|x| x.map(|mut x| x.to_bytes()).unwrap_or_default())); let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok()));
let mut lines = reader.lines(); let mut lines = reader.lines();
#[cfg(feature="hog-buffer")] #[cfg(feature="hog-buffer")]

@ -57,6 +57,7 @@ macro_rules! status {
}; };
} }
mod sanitise;
mod bytes; mod bytes;
mod chunking; mod chunking;
#[cfg(feature="api")] #[cfg(feature="api")]
@ -149,7 +150,7 @@ async fn main() {
cfg_if!{ cfg_if!{
if #[cfg(feature="api")] { if #[cfg(feature="api")] {
let api = { let api = {
let api_single = { let single = {
let msz = state.config().max_gen_size; let msz = state.config().max_gen_size;
warp::post() warp::post()
.and(warp::path("single")) .and(warp::path("single"))
@ -169,9 +170,12 @@ async fn main() {
.and_then(api::single) .and_then(api::single)
.with(warp::log("markov::api::single")) .with(warp::log("markov::api::single"))
}; };
let sentance = warp::post()
.and(warp::path("sentance"));
warp::path("api") warp::path("api")
.and(api_single) .and(single)
.recover(api::error::rejection)
}; };
} }
} }

@ -0,0 +1,37 @@
//! Sanitisers
use super::*;
use std::{
marker::Unpin,
error,
fmt,
};
use tokio::{
prelude::*,
io::{
AsyncRead,
AsyncBufRead
},
};
mod word;
pub use word::*;
pub fn take_sentance<T: AsyncBufRead+ ?Sized + Unpin, U: AsyncWrite + ?Sized + Unpin>(from: &mut T, to: &mut U) -> Result<usize, Error>
{
todo!()
}
#[derive(Debug)]
pub enum Error {
}
impl error::Error for Error{}
impl fmt::Display for Error
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
Ok(())
}
}

@ -0,0 +1,45 @@
//! Word splitting
use super::*;
use std::{
borrow::Borrow,
};
#[derive(Debug, PartialEq, Eq)]
#[repr(transparent)]
pub struct Word(str);
impl Word
{
pub fn new<'a>(from: &'a str) -> &'a Self
{
unsafe {
std::mem::transmute(from)
}
}
}
impl<'a> From<&'a str> for &'a Word
{
fn from(from: &'a str) -> Self
{
Word::new(from)
}
}
impl AsRef<str> for Word
{
fn as_ref(&self) -> &str
{
&self.0
}
}
impl AsRef<Word> for str
{
fn as_ref(&self) -> &Word
{
Word::new(self)
}
}
//impl Borrow<>
Loading…
Cancel
Save