diff --git a/Cargo.toml b/Cargo.toml index 1ad7862..991f24c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,13 +7,22 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["threaded"] +default = ["threaded", + "watcher_timeout"] -# Run with threads -threaded = ["tokio/rt-threaded"] +# Run with threads (TODO: rename to `threads`. What does this do?? We need `rt-threaded` for fs watcher to work...) +threaded = [] + +# FS watcher will have infinite backlog, instead of ignoring if it goes over its backlog. +# This can help DoS, but potentially cause OOM. +watcher_unlimited = [] + +# FS watcher hooks have a defined timeout +# This can help with DoS, and hooks that hog resources +watcher_timeout = [] [dependencies] -tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "fs"]} +tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "rt-threaded", "fs"]} notify = "5.0.0-pre.3" futures= "0.3" sexp = "1.1" diff --git a/src/config/parse.rs b/src/config/parse.rs index dda2592..f960afe 100644 --- a/src/config/parse.rs +++ b/src/config/parse.rs @@ -2,12 +2,10 @@ use super::*; use std::{ convert::TryFrom, - marker::Unpin, }; use tokio::{ prelude::*, fs::File, - io::AsyncRead, }; use sexp::{ Sexp, diff --git a/src/live.rs b/src/live.rs index 570aeea..446d53d 100644 --- a/src/live.rs +++ b/src/live.rs @@ -41,7 +41,6 @@ use tokio::{ }, task::{ JoinHandle, - self, }, time, }; @@ -60,6 +59,11 @@ use notify::{ /// An event to be passed to `Context`. pub type Event = event::EventKind; +/// Max event backlog (not used if `unlimited_watcher` is set) +const GLOBAL_BACKLOG: usize = 100; +/// Timeout for hook dispatches in seconds. Only set if `watcher_timeout` feature is enabled. +const GLOBAL_TIMEOUT: u64 = 5; + /// Decontruct a notify event into event kind and full paths. #[inline] fn deconstruct_event>(root: P,event: event::Event) -> (Vec, Event) @@ -111,7 +115,10 @@ where P: AsRef, impl Hook { /// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly. - const DISPATCH_TIMEOUT: Option = Some(time::Duration::from_secs(5)); + #[cfg(feature="watcher_timeout")] + const DISPATCH_TIMEOUT: Option = Some(time::Duration::from_secs(GLOBAL_TIMEOUT)); + #[cfg(not(feature="watcher_timeout"))] + const DISPATCH_TIMEOUT: Option = None; const DISPATCH_MAX_BACKLOG: usize = 16; /// Create a new `Hook` object, with a path, and an optional filter function. @@ -245,11 +252,21 @@ pub fn watch(path: impl AsRef) -> Oneesan let path=std::fs::canonicalize(&path).unwrap_or_else(|_| path.clone()); let hooks = Arc::clone(&hooks); tokio::spawn(async move { + #[cfg(feature="watcher_unlimited")] let (tx,mut rx) = mpsc::unbounded_channel(); - + #[cfg(not(feature="watcher_unlimited"))] + let (tx,mut rx) = mpsc::channel(GLOBAL_BACKLOG); + + let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| { + #[cfg(not(feature="watcher_unlimited"))] + let mut tx = tx.clone(); match res { - Ok(event) => if let Err(err) = tx.send(event) {warn!("Watcher failed to pass event: {}", err)}, + Ok(event) => if let Err(err) = { + #[cfg(feature="watcher_unlimited")] {tx.send(event)} + #[cfg(not(feature="watcher_unlimited"))] {tx.try_send(event)} + } + {warn!("Watcher failed to pass event: {}", err)}, Err(e) => error!("Watcher returned error: {}", e), } }).expect("Failed to initialise watcher"); diff --git a/src/log.rs b/src/log.rs index 5f90ef8..02705a0 100644 --- a/src/log.rs +++ b/src/log.rs @@ -11,7 +11,7 @@ use std::{ Write, }, borrow::Borrow, - sync::{Arc,RwLock}, + sync::{RwLock}, }; use once_cell::sync::OnceCell; @@ -99,7 +99,7 @@ impl Trace write!(&mut out, ":?")?; } if let Some(column) = self.column { - write!(&mut out, ":{}>", column)?; + write!(&mut out, ":{}", column)?; } else { write!(&mut out, ":?")?; } diff --git a/src/main.rs b/src/main.rs index 1fec57a..57e96dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,7 +29,7 @@ mod live; mod context; mod job; -//test +//This is a test function, when we have a real job server, remove it. async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Box> { let mut interval = time::interval(Duration::from_secs(10)); @@ -40,7 +40,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo println!("starting?"); loop { - let mut tick = interval.tick(); + let tick = interval.tick(); tokio::pin!(tick); loop { select!{ @@ -49,7 +49,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo println!("yes"); break; } - command = rx.recv() => { + _command = rx.recv() => { // We got interrupt, interpret `command` here. // `continue` to continue waiting on this interval, break to go to next, return to stop println!("no"); @@ -65,17 +65,40 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo fn print_stats() { + use recolored::Colorize; lazy_static! { static ref AUTHORS: String = env!("CARGO_PKG_AUTHORS").replace( ":", ", "); + }; - status!("This is the lolicron daemon version {} by {}", env!("CARGO_PKG_VERSION"), &AUTHORS[..]); + #[cfg(debug_assertions)] + lazy_static!{ + static ref BUILD_IDENT: recolored::ColoredString = "debug".bright_blue(); + } + #[cfg(not(debug_assertions))] + lazy_static!{ + static ref BUILD_IDENT: recolored::ColoredString = "release".bright_red(); + } + + use std::ops::Deref; + status!("This is the lolicron daemon version {} by {} ({} build)", env!("CARGO_PKG_VERSION"), &AUTHORS[..], BUILD_IDENT.deref()); status!("---"); - status!("Compiled with:"); - #[cfg(nightly)] status!(" +nightly"); - #[cfg(debug_assertions)] status!(" +debug_assertions"); + status!("Compiled with ({}, {}, {}, {}):", "on".bright_red(), "default on".red(), "off".bright_blue(), "default off".blue()); + + #[cfg(nightly)] status!(" +nightly".bright_red()); + #[cfg(debug_assertions)] status!(" +debug_assertions".red()); + status!("features:"); - #[cfg(feature="threaded")] status!(" +threaded"); + + #[cfg(feature="threaded")] status!(" +threaded".red()); + #[cfg(not(feature="threaded"))] status!(" -threaded".bright_blue()); + + #[cfg(feature="watcher_unlimited")] status!(" +watcher_unlimited".bright_red()); + #[cfg(not(feature="watcher_unlimited"))] status!(" -watcher_unlimited".blue()); + + #[cfg(feature="watcher_timeout")] status!(" +watcher_timeout".red()); + #[cfg(not(feature="watcher_timeout"))] status!(" -watcher_timeout".bright_blue()); + status!(""); status!("GPl'd with <3"); status!("Please enjoy"); @@ -94,7 +117,7 @@ async fn main() -> Result<(), Box> { let oneesan = live::watch("."); { - let mut recv = oneesan.hook("main.rs", live::filter::ALL).await; + let mut recv = oneesan.hook("src/main.rs", live::filter::ALL).await; while let Some(event) = recv.recv().await { important!("Got ev {:?}", event);