serve
Avril 4 years ago
parent ecc8854e44
commit 57c74bd411
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -0,0 +1,49 @@
//! Feeding the chain
use super::*;
fn feed(chain: &mut Chain<String>, what: impl AsRef<str>)
{
chain.feed(what.as_ref().split_whitespace()
.filter(|word| !word.is_empty())
.map(|s| s.to_owned()).collect::<Vec<_>>());
}
pub async fn full(who: &IpAddr, state: State, mut body: impl Unpin + Stream<Item = Result<impl Buf, impl std::error::Error + 'static>>) -> Result<usize, FillBodyError> {
let mut buffer = Vec::new();
let mut written = 0usize;
while let Some(buf) = body.next().await {
let mut body = buf.map_err(|_| FillBodyError)?;
while body.has_remaining() {
buffer.extend_from_slice(body.bytes());
let cnt = body.bytes().len();
body.advance(cnt);
written += cnt;
}
}
if !buffer.ends_with(&[b'\n']) { // probably useless eh?
buffer.push(b'\n');
}
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await;
feed(&mut chain, buffer);
state.notify_save();
Ok(written)
}
#[derive(Debug)]
pub struct FillBodyError;
impl error::Error for FillBodyError{}
impl warp::reject::Reject for FillBodyError{}
impl fmt::Display for FillBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to feed chain with this data")
}
}

@ -0,0 +1,33 @@
//! Generating the strings
use super::*;
#[derive(Debug)]
pub struct GenBodyError(String);
impl error::Error for GenBodyError{}
impl fmt::Display for GenBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to write {:?} to body", self.0)
}
}
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), GenBodyError>
{
let chain = state.chain().read().await;
if !chain.is_empty() {
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS `full_body` and writes, potentially.
for string in chain.str_iter_for(num) {
output.send(string).await.map_err(|e| GenBodyError(e.0))?;
}
},
_ => output.send(chain.generate_str()).await.map_err(|e| GenBodyError(e.0))?,
}
}
Ok(())
}

@ -46,74 +46,9 @@ mod save;
mod forwarded_list; mod forwarded_list;
use forwarded_list::XForwardedFor; use forwarded_list::XForwardedFor;
#[derive(Debug)] mod feed;
pub struct FillBodyError; mod gen;
impl error::Error for FillBodyError{}
impl warp::reject::Reject for FillBodyError{}
impl fmt::Display for FillBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to feed chain with this data")
}
}
async fn full_body(who: &IpAddr, state: State, mut body: impl Unpin + Stream<Item = Result<impl Buf, impl std::error::Error + 'static>>) -> Result<usize, FillBodyError> {
let mut buffer = Vec::new();
let mut written = 0usize;
while let Some(buf) = body.next().await {
let mut body = buf.map_err(|_| FillBodyError)?;
while body.has_remaining() {
buffer.extend_from_slice(body.bytes());
let cnt = body.bytes().len();
body.advance(cnt);
written += cnt;
}
}
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await;
chain.feed(&buffer.split_whitespace()
.filter(|word| !word.is_empty())
.map(|s| s.to_owned()).collect::<Vec<_>>());
state.notify_save();
Ok(written)
}
#[derive(Debug)]
pub struct GenBodyError(String);
impl error::Error for GenBodyError{}
impl fmt::Display for GenBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to write {:?} to body", self.0)
}
}
async fn gen_body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), GenBodyError>
{
let chain = state.chain().read().await;
if !chain.is_empty() {
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS `full_body` and writes, potentially.
for string in chain.str_iter_for(num) {
output.send(string).await.map_err(|e| GenBodyError(e.0))?;
}
},
_ => output.send(chain.generate_str()).await.map_err(|e| GenBodyError(e.0))?,
}
}
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
@ -176,7 +111,7 @@ async fn main() {
.and(warp::body::stream()) .and(warp::body::stream())
.and_then(|state: State, host: IpAddr, buf| { .and_then(|state: State, host: IpAddr, buf| {
async move { async move {
full_body(&host, state, buf).await feed::full(&host, state, buf).await
.map(|_| warp::reply::with_status(warp::reply(), status!(201))) .map(|_| warp::reply::with_status(warp::reply(), status!(201)))
.map_err(warp::reject::custom) .map_err(warp::reject::custom)
} }
@ -191,7 +126,7 @@ async fn main() {
.and_then(|state: State, host: IpAddr, num: Option<usize>| { .and_then(|state: State, host: IpAddr, num: Option<usize>| {
async move { async move {
let (tx, rx) = mpsc::channel(state.config().max_gen_size); let (tx, rx) = mpsc::channel(state.config().max_gen_size);
tokio::spawn(gen_body(state, num, tx)); tokio::spawn(gen::body(state, num, tx));
Ok::<_, std::convert::Infallible>(Response::new(Body::wrap_stream(rx.map(move |x| { Ok::<_, std::convert::Infallible>(Response::new(Body::wrap_stream(rx.map(move |x| {
info!("{} <- {:?}", host, x); info!("{} <- {:?}", host, x);
Ok::<_, std::convert::Infallible>(x) Ok::<_, std::convert::Infallible>(x)

Loading…
Cancel
Save