#![allow(dead_code)] #[macro_use] extern crate log; use chain::{ Chain, }; use warp::{ Filter, Buf, reply::Response, }; use hyper::Body; use std::{ sync::Arc, fmt, error, net::{ SocketAddr, IpAddr, }, }; use tokio::{ sync::{ RwLock, mpsc, Notify, }, stream::{Stream,StreamExt,}, }; use serde::{ Serialize, Deserialize }; macro_rules! status { ($code:expr) => { ::warp::http::status::StatusCode::from_u16($code).unwrap() }; } mod config; mod state; use state::State; mod save; mod forwarded_list; use forwarded_list::XForwardedFor; mod feed; mod gen; #[tokio::main] async fn main() { pretty_env_logger::init(); let config = match config::load().await { Some(v) => v, _ => { let cfg = config::Config::default(); #[cfg(debug_assertions)] { if let Err(err) = cfg.save(config::DEFAULT_FILE_LOCATION).await { error!("Failed to create default config file: {}", err); } } cfg }, }; trace!("Using config {:?}", config); let chain = Arc::new(RwLock::new(match save::load(&config.file).await { Ok(chain) => { info!("Loaded chain from {:?}", config.file); chain }, Err(e) => { warn!("Failed to load chain, creating new"); trace!("Error: {}", e); Chain::new() }, })); { let (state, chain, saver) = { let save_when = Arc::new(Notify::new()); let state = State::new(config, Arc::clone(&chain), Arc::clone(&save_when)); let state2 = state.clone(); let saver = tokio::spawn(save::host(state.clone())); let chain = warp::any().map(move || state.clone()); (state2, chain, saver) }; let client_ip = if state.config().trust_x_forwarded_for { warp::header("x-forwarded-for") .map(|ip: XForwardedFor| ip) .and_then(|x: XForwardedFor| async move { x.into_first().ok_or_else(|| warp::reject::not_found()) }) .or(warp::filters::addr::remote() .and_then(|x: Option| async move { x.map(|x| x.ip()).ok_or_else(|| warp::reject::not_found()) })) .unify().boxed() } else { warp::filters::addr::remote().and_then(|x: Option| async move {x.map(|x| x.ip()).ok_or_else(|| warp::reject::not_found())}).boxed() }; let push = warp::put() .and(chain.clone()) .and(warp::path("put")) .and(client_ip.clone()) .and(warp::body::content_length_limit(state.config().max_content_length)) .and(warp::body::stream()) .and_then(|state: State, host: IpAddr, buf| { async move { feed::full(&host, state, buf).await .map(|_| warp::reply::with_status(warp::reply(), status!(201))) .map_err(warp::reject::custom) } }) .with(warp::log("markov::put")); let read = warp::get() .and(chain.clone()) .and(warp::path("get")) .and(client_ip.clone()) .and(warp::path::param().map(|opt: usize| Some(opt)).or(warp::any().map(|| Option::::None)).unify()) .and_then(|state: State, host: IpAddr, num: Option| { async move { let (tx, rx) = mpsc::channel(state.config().max_gen_size); tokio::spawn(gen::body(state, num, tx)); Ok::<_, std::convert::Infallible>(Response::new(Body::wrap_stream(rx.map(move |x| { info!("{} <- {:?}", host, x); Ok::<_, std::convert::Infallible>(x) })))) } }) .with(warp::log("markov::read")); let (addr, server) = warp::serve(push .or(read)) .bind_with_graceful_shutdown(state.config().bindpoint, async move { tokio::signal::ctrl_c().await.unwrap(); state.shutdown(); }); info!("Server bound on {:?}", addr); server.await; // Cleanup async move { trace!("Cleanup"); saver.await.expect("Saver panicked"); } }.await; info!("Shut down gracefully") }