#![ feature(split_inclusive) ]
#![ allow(dead_code) ]
#[ macro_use ] extern crate log ;
use chain ::{
Chain ,
} ;
use warp ::{
Filter ,
Buf ,
reply ::Response ,
} ;
use hyper ::Body ;
use std ::{
sync ::Arc ,
fmt ,
error ,
net ::{
SocketAddr ,
IpAddr ,
} ,
} ;
use tokio ::{
sync ::{
RwLock ,
mpsc ,
Notify ,
} ,
stream ::{ Stream , StreamExt , } ,
} ;
use serde ::{
Serialize ,
Deserialize
} ;
use futures ::{
future ::{
FutureExt ,
BoxFuture ,
join_all ,
} ,
} ;
use lazy_static ::lazy_static ;
use cfg_if ::cfg_if ;
macro_rules! if_debug {
( $( $tt :tt ) * ) = > {
cfg_if ::cfg_if ! {
if #[ cfg(debug_assertions) ] {
$( $tt ) *
}
}
}
}
macro_rules! status {
( $code :expr ) = > {
::warp ::http ::status ::StatusCode ::from_u16 ( $code ) . unwrap ( )
} ;
}
mod ext ;
use ext ::* ;
mod util ;
mod sanitise ;
mod bytes ;
mod chunking ;
#[ cfg(feature= " api " ) ]
mod api ;
#[ cfg(target_family= " unix " ) ]
mod signals ;
mod config ;
mod state ;
use state ::State ;
mod save ;
mod forwarded_list ;
use forwarded_list ::XForwardedFor ;
mod feed ;
mod gen ;
mod sentance ;
const DEFAULT_LOG_LEVEL : & str = "warn" ;
fn init_log ( )
{
let level = match std ::env ::var_os ( "RUST_LOG" ) {
None = > {
std ::env ::set_var ( "RUST_LOG" , DEFAULT_LOG_LEVEL ) ;
std ::borrow ::Cow ::Borrowed ( std ::ffi ::OsStr ::new ( DEFAULT_LOG_LEVEL ) )
} ,
Some ( w ) = > std ::borrow ::Cow ::Owned ( w ) ,
} ;
pretty_env_logger ::init ( ) ;
trace ! ( "Initialising `genmarkov` ({}) v{} with log level {:?}.\n\tMade by {} with <3.\n\tLicensed with GPL v3 or later" ,
std ::env ::args ( ) . next ( ) . unwrap ( ) ,
env! ( "CARGO_PKG_VERSION" ) ,
level ,
env! ( "CARGO_PKG_AUTHORS" ) ) ;
}
#[ tokio::main ]
async fn main ( ) {
init_log ( ) ;
let config = match config ::load ( ) . await {
Some ( v ) = > v ,
_ = > {
let cfg = config ::Config ::default ( ) ;
#[ cfg(debug_assertions) ]
{
if let Err ( err ) = cfg . save ( config ::DEFAULT_FILE_LOCATION ) . await {
error ! ( "Failed to create default config file: {}" , err ) ;
}
}
cfg
} ,
} ;
trace ! ( "Using config {:?}" , config ) ;
let chain = Arc ::new ( RwLock ::new ( match save ::load ( & config . file ) . await {
Ok ( chain ) = > {
info ! ( "Loaded chain from {:?}" , config . file ) ;
chain
} ,
Err ( e ) = > {
warn ! ( "Failed to load chain, creating new" ) ;
trace ! ( "Error: {}" , e ) ;
Chain ::new ( )
} ,
} ) ) ;
{
let mut tasks = Vec ::< BoxFuture < ' static , ( ) > > ::new ( ) ;
let ( state , chain ) = {
let save_when = Arc ::new ( Notify ::new ( ) ) ;
let state = State ::new ( config ,
Arc ::clone ( & chain ) ,
Arc ::clone ( & save_when ) ) ;
let state2 = state . clone ( ) ;
let saver = tokio ::spawn ( save ::host ( state . clone ( ) ) ) ;
let chain = warp ::any ( ) . map ( move | | state . clone ( ) ) ;
tasks . push ( saver . map ( | res | res . expect ( "Saver panicked" ) ) . boxed ( ) ) ;
( state2 , chain )
} ;
let client_ip = if state . config ( ) . trust_x_forwarded_for {
warp ::header ( "x-forwarded-for" )
. map ( | ip : XForwardedFor | ip )
. and_then ( | x : XForwardedFor | async move { x . into_first ( ) . ok_or_else ( | | warp ::reject ::not_found ( ) ) } )
. or ( warp ::filters ::addr ::remote ( )
. and_then ( | x : Option < SocketAddr > | async move { x . map ( | x | x . ip ( ) ) . ok_or_else ( | | warp ::reject ::not_found ( ) ) } ) )
. unify ( ) . boxed ( )
} else {
warp ::filters ::addr ::remote ( ) . and_then ( | x : Option < SocketAddr > | async move { x . map ( | x | x . ip ( ) ) . ok_or_else ( | | warp ::reject ::not_found ( ) ) } ) . boxed ( )
} ;
let push = warp ::put ( )
. and ( warp ::path ( "put" ) )
. and ( chain . clone ( ) )
. and ( client_ip . clone ( ) )
. and ( warp ::body ::content_length_limit ( state . config ( ) . max_content_length ) )
. and ( warp ::body ::stream ( ) )
. and_then ( | state : State , host : IpAddr , buf | {
async move {
feed ::full ( & host , state , buf ) . await
. map ( | _ | warp ::reply ::with_status ( warp ::reply ( ) , status ! ( 201 ) ) )
. map_err ( | _ | warp ::reject ::not_found ( ) ) //(warp::reject::custom) //TODO: Recover rejection filter down below for custom error return
}
} )
. with ( warp ::log ( "markov::put" ) ) ;
cfg_if ! {
if #[ cfg(feature= " api " ) ] {
let api = {
let single = {
let msz = state . config ( ) . max_gen_size ;
warp ::post ( )
. and ( warp ::path ( "single" ) )
. and ( client_ip . clone ( ) )
. and ( warp ::path ::param ( )
. map ( move | sz : usize | {
if sz = = 0 | | ( 2 ..= msz ) . contains ( & sz ) {
Some ( sz )
} else {
None
}
} )
. or ( warp ::any ( ) . map ( | | None ) )
. unify ( ) )
. and ( warp ::body ::content_length_limit ( state . config ( ) . max_content_length ) )
. and ( warp ::body ::aggregate ( ) )
. and_then ( api ::single )
. with ( warp ::log ( "markov::api::single" ) )
} ;
warp ::path ( "api" )
. and ( single )
. recover ( api ::error ::rejection )
} ;
}
}
let read = warp ::get ( )
. and ( chain . clone ( ) )
. and ( client_ip . clone ( ) )
. and ( warp ::path ::param ( ) . map ( | opt : usize | Some ( opt ) )
. or ( warp ::path ::end ( ) . map ( | | Option ::< usize > ::None ) ) . unify ( ) )
. and_then ( | state : State , host : IpAddr , num : Option < usize > | {
async move {
let ( tx , rx ) = mpsc ::channel ( state . config ( ) . max_gen_size ) ;
tokio ::spawn ( gen ::body ( state , num , tx ) ) ;
Ok ::< _ , std ::convert ::Infallible > ( Response ::new ( Body ::wrap_stream ( rx . filter_map ( move | mut x | {
if x . trim_in_place ( ) . len ( ) ! = 0 {
info ! ( "{} <- {:?}" , host , x ) ;
x . push ( '\n' ) ;
Some ( Ok ::< _ , std ::convert ::Infallible > ( x ) )
} else {
None
}
} ) ) ) )
}
} )
. with ( warp ::log ( "markov::read" ) ) ;
let sentance = warp ::get ( )
. and ( warp ::path ( "sentance" ) ) //TODO: sanitise::Sentance::new_iter the body line
. and ( chain . clone ( ) )
. and ( client_ip . clone ( ) )
. and ( warp ::path ::param ( ) . map ( | opt : usize | Some ( opt ) )
. or ( warp ::path ::end ( ) . map ( | | Option ::< usize > ::None ) ) . unify ( ) )
. and_then ( | state : State , host : IpAddr , num : Option < usize > | {
async move {
let ( tx , rx ) = mpsc ::channel ( state . config ( ) . max_gen_size ) ;
tokio ::spawn ( sentance ::body ( state , num , tx ) ) ;
Ok ::< _ , std ::convert ::Infallible > ( Response ::new ( Body ::wrap_stream ( rx . filter_map ( move | mut x | {
if x . trim_in_place ( ) . len ( ) ! = 0 {
info ! ( "{} (sentance) <- {:?}" , host , x ) ;
x . push ( ' ' ) ;
Some ( Ok ::< _ , std ::convert ::Infallible > ( x ) )
} else {
None
}
} ) ) ) )
}
} )
. with ( warp ::log ( "markov::read::sentance" ) ) ;
let read = warp ::path ( "get" ) . and ( read . or ( sentance ) ) ;
#[ cfg(feature= " api " ) ]
let read = read . or ( api ) ;
#[ cfg(target_family= " unix " ) ]
tasks . push ( tokio ::spawn ( signals ::handle ( state . clone ( ) ) ) . map ( | res | res . expect ( "Signal handler panicked" ) ) . boxed ( ) ) ;
require_send ( async {
let server = {
let s2 = AssertNotSend ::new ( state . clone ( ) ) ; //temp clone the Arcs here for shutdown if server fails to bind, assert they cannot remain cloned across an await boundary.
match bind ::try_serve ( warp ::serve ( push
. or ( read ) ) ,
state . config ( ) . bindpoint . clone ( ) ,
async move {
tokio ::signal ::ctrl_c ( ) . await . unwrap ( ) ;
state . shutdown ( ) ;
} ) {
Ok ( ( addr , server ) ) = > {
info ! ( "Server bound on {:?}" , addr ) ;
server
} ,
Err ( err ) = > {
error ! ( "Failed to bind server: {}" , err ) ;
s2 . into_inner ( ) . shutdown ( ) ;
return ;
} ,
}
} ;
server . await ;
} ) . await ;
// Cleanup
async move {
trace ! ( "Cleanup" ) ;
join_all ( tasks ) . await ;
}
} . await ;
info ! ( "Shut down gracefully" )
}
mod bind ;