serve
Avril 4 years ago
parent d44fa5c40b
commit abd9d2c941
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,6 +1,6 @@
[package] [package]
name = "markov" name = "markov"
version = "0.3.4" version = "0.4.0"
description = "Generate string of text from Markov chain fed by stdin" description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"
@ -9,8 +9,13 @@ edition = "2018"
[features] [features]
default = ["compress-chain"] default = ["compress-chain"]
# Compress the chain data file when saved to disk
compress-chain = ["async-compression"] compress-chain = ["async-compression"]
# Enable the /api/ route
api = []
[profile.release] [profile.release]
opt-level = 3 opt-level = 3
lto = "fat" lto = "fat"

@ -0,0 +1,83 @@
//! For API calls if enabled
use super::*;
use std::{
fmt,
error,
iter,
convert::Infallible,
};
use futures::{
stream::{
self,
BoxStream,
StreamExt,
},
};
fn aggregate(mut body: impl Buf) -> Result<String, std::string::FromUtf8Error>
{
let mut output = Vec::new();
while body.has_remaining() {
let bytes = body.bytes();
output.extend_from_slice(&bytes[..]);
let cnt = bytes.len();
body.advance(cnt);
}
String::from_utf8(output)
}
pub async fn single(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<impl warp::Reply, warp::reject::Rejection>
{
single_stream(host, num, body).await
.map(|rx| Response::new(Body::wrap_stream(rx.map(move |x| {
info!("{} <- {:?}", host, x);
x
}))))
.map_err(warp::reject::custom)
}
async fn single_stream(host: IpAddr, num: Option<usize>, body: impl Buf) -> Result<BoxStream<'static, Result<String, Infallible>>, ApiError>
{
let body = aggregate(body)?;
info!("{} <- {:?}", host, &body[..]);
let mut chain = Chain::new();
feed::feed(&mut chain, body);
match num {
None => Ok(stream::iter(iter::once(Ok(chain.generate_str()))).boxed()),
Some(num) => {
let (mut tx, rx) = mpsc::channel(num);
tokio::spawn(async move {
for string in chain.str_iter_for(num) {
tx.send(string).await.expect("Failed to send string to body");
}
});
Ok(StreamExt::map(rx, |x| Ok::<_, Infallible>(x)).boxed())
}
}
}
#[derive(Debug)]
pub enum ApiError {
Body,
}
impl warp::reject::Reject for ApiError{}
impl error::Error for ApiError{}
impl std::fmt::Display for ApiError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Body => write!(f, "invalid data in request body"),
}
}
}
impl From<std::string::FromUtf8Error> for ApiError
{
fn from(_: std::string::FromUtf8Error) -> Self
{
Self::Body
}
}

@ -1,7 +1,7 @@
//! Feeding the chain //! Feeding the chain
use super::*; use super::*;
fn feed(chain: &mut Chain<String>, what: impl AsRef<str>) pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>)
{ {
chain.feed(what.as_ref().split_whitespace() chain.feed(what.as_ref().split_whitespace()
.filter(|word| !word.is_empty()) .filter(|word| !word.is_empty())

@ -39,14 +39,15 @@ use futures::{
join_all, join_all,
}, },
}; };
use cfg_if::cfg_if;
macro_rules! status { macro_rules! status {
($code:expr) => { ($code:expr) => {
::warp::http::status::StatusCode::from_u16($code).unwrap() ::warp::http::status::StatusCode::from_u16($code).unwrap()
}; };
} }
#[cfg(feature="api")]
mod api;
#[cfg(target_family="unix")] #[cfg(target_family="unix")]
mod signals; mod signals;
mod config; mod config;
@ -131,6 +132,37 @@ async fn main() {
}) })
.with(warp::log("markov::put")); .with(warp::log("markov::put"));
cfg_if!{
if #[cfg(feature="api")] {
let api = {
let api_single = {
let msz = state.config().max_gen_size;
warp::post()
.and(warp::path("single"))
.and(client_ip.clone())
.and(warp::path::param()
.map(move |sz: usize| {
if sz == 0 || (2..=msz).contains(&sz) {
Some(sz)
} else {
None
}
})
.or(warp::any().map(|| None))
.unify())
.and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::aggregate())
.and_then(api::single)
.with(warp::log("markov::api::single"))
};
warp::path("api")
.and(api_single)
};
}
}
let read = warp::get() let read = warp::get()
.and(chain.clone()) .and(chain.clone())
.and(warp::path("get")) .and(warp::path("get"))
@ -148,6 +180,8 @@ async fn main() {
}) })
.with(warp::log("markov::read")); .with(warp::log("markov::read"));
#[cfg(feature="api")]
let read = read.or(api);
#[cfg(target_family="unix")] #[cfg(target_family="unix")]
tasks.push(tokio::spawn(signals::handle(state.clone())).map(|res| res.expect("Signal handler panicked")).boxed()); tasks.push(tokio::spawn(signals::handle(state.clone())).map(|res| res.expect("Signal handler panicked")).boxed());

Loading…
Cancel
Save