cleanup, added more stuffs

master
Avril 4 years ago
parent fc6c45baf2
commit ec0bf8e24a
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -7,13 +7,22 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["threaded"] default = ["threaded",
"watcher_timeout"]
# Run with threads # Run with threads (TODO: rename to `threads`. What does this do?? We need `rt-threaded` for fs watcher to work...)
threaded = ["tokio/rt-threaded"] threaded = []
# FS watcher will have infinite backlog, instead of ignoring if it goes over its backlog.
# This can help DoS, but potentially cause OOM.
watcher_unlimited = []
# FS watcher hooks have a defined timeout
# This can help with DoS, and hooks that hog resources
watcher_timeout = []
[dependencies] [dependencies]
tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "fs"]} tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "rt-threaded", "fs"]}
notify = "5.0.0-pre.3" notify = "5.0.0-pre.3"
futures= "0.3" futures= "0.3"
sexp = "1.1" sexp = "1.1"

@ -2,12 +2,10 @@
use super::*; use super::*;
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
marker::Unpin,
}; };
use tokio::{ use tokio::{
prelude::*, prelude::*,
fs::File, fs::File,
io::AsyncRead,
}; };
use sexp::{ use sexp::{
Sexp, Sexp,

@ -41,7 +41,6 @@ use tokio::{
}, },
task::{ task::{
JoinHandle, JoinHandle,
self,
}, },
time, time,
}; };
@ -60,6 +59,11 @@ use notify::{
/// An event to be passed to `Context`. /// An event to be passed to `Context`.
pub type Event = event::EventKind; pub type Event = event::EventKind;
/// Max event backlog (not used if `unlimited_watcher` is set)
const GLOBAL_BACKLOG: usize = 100;
/// Timeout for hook dispatches in seconds. Only set if `watcher_timeout` feature is enabled.
const GLOBAL_TIMEOUT: u64 = 5;
/// Decontruct a notify event into event kind and full paths. /// Decontruct a notify event into event kind and full paths.
#[inline] #[inline]
fn deconstruct_event<P: AsRef<Path>>(root: P,event: event::Event) -> (Vec<PathBuf>, Event) fn deconstruct_event<P: AsRef<Path>>(root: P,event: event::Event) -> (Vec<PathBuf>, Event)
@ -111,7 +115,10 @@ where P: AsRef<Path>,
impl Hook { impl Hook {
/// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly. /// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly.
const DISPATCH_TIMEOUT: Option<time::Duration> = Some(time::Duration::from_secs(5)); #[cfg(feature="watcher_timeout")]
const DISPATCH_TIMEOUT: Option<time::Duration> = Some(time::Duration::from_secs(GLOBAL_TIMEOUT));
#[cfg(not(feature="watcher_timeout"))]
const DISPATCH_TIMEOUT: Option<time::Duration> = None;
const DISPATCH_MAX_BACKLOG: usize = 16; const DISPATCH_MAX_BACKLOG: usize = 16;
/// Create a new `Hook` object, with a path, and an optional filter function. /// Create a new `Hook` object, with a path, and an optional filter function.
@ -245,11 +252,21 @@ pub fn watch(path: impl AsRef<Path>) -> Oneesan
let path=std::fs::canonicalize(&path).unwrap_or_else(|_| path.clone()); let path=std::fs::canonicalize(&path).unwrap_or_else(|_| path.clone());
let hooks = Arc::clone(&hooks); let hooks = Arc::clone(&hooks);
tokio::spawn(async move { tokio::spawn(async move {
#[cfg(feature="watcher_unlimited")]
let (tx,mut rx) = mpsc::unbounded_channel(); let (tx,mut rx) = mpsc::unbounded_channel();
#[cfg(not(feature="watcher_unlimited"))]
let (tx,mut rx) = mpsc::channel(GLOBAL_BACKLOG);
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| { let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
#[cfg(not(feature="watcher_unlimited"))]
let mut tx = tx.clone();
match res { match res {
Ok(event) => if let Err(err) = tx.send(event) {warn!("Watcher failed to pass event: {}", err)}, Ok(event) => if let Err(err) = {
#[cfg(feature="watcher_unlimited")] {tx.send(event)}
#[cfg(not(feature="watcher_unlimited"))] {tx.try_send(event)}
}
{warn!("Watcher failed to pass event: {}", err)},
Err(e) => error!("Watcher returned error: {}", e), Err(e) => error!("Watcher returned error: {}", e),
} }
}).expect("Failed to initialise watcher"); }).expect("Failed to initialise watcher");

@ -11,7 +11,7 @@ use std::{
Write, Write,
}, },
borrow::Borrow, borrow::Borrow,
sync::{Arc,RwLock}, sync::{RwLock},
}; };
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
@ -99,7 +99,7 @@ impl Trace
write!(&mut out, ":?")?; write!(&mut out, ":?")?;
} }
if let Some(column) = self.column { if let Some(column) = self.column {
write!(&mut out, ":{}>", column)?; write!(&mut out, ":{}", column)?;
} else { } else {
write!(&mut out, ":?")?; write!(&mut out, ":?")?;
} }

@ -29,7 +29,7 @@ mod live;
mod context; mod context;
mod job; mod job;
//test //This is a test function, when we have a real job server, remove it.
async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Box<dyn std::error::Error>> async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Box<dyn std::error::Error>>
{ {
let mut interval = time::interval(Duration::from_secs(10)); let mut interval = time::interval(Duration::from_secs(10));
@ -40,7 +40,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo
println!("starting?"); println!("starting?");
loop { loop {
let mut tick = interval.tick(); let tick = interval.tick();
tokio::pin!(tick); tokio::pin!(tick);
loop { loop {
select!{ select!{
@ -49,7 +49,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo
println!("yes"); println!("yes");
break; break;
} }
command = rx.recv() => { _command = rx.recv() => {
// We got interrupt, interpret `command` here. // We got interrupt, interpret `command` here.
// `continue` to continue waiting on this interval, break to go to next, return to stop // `continue` to continue waiting on this interval, break to go to next, return to stop
println!("no"); println!("no");
@ -65,17 +65,40 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo
fn print_stats() fn print_stats()
{ {
use recolored::Colorize;
lazy_static! { lazy_static! {
static ref AUTHORS: String = env!("CARGO_PKG_AUTHORS").replace( ":", ", "); static ref AUTHORS: String = env!("CARGO_PKG_AUTHORS").replace( ":", ", ");
}; };
status!("This is the lolicron daemon version {} by {}", env!("CARGO_PKG_VERSION"), &AUTHORS[..]); #[cfg(debug_assertions)]
lazy_static!{
static ref BUILD_IDENT: recolored::ColoredString = "debug".bright_blue();
}
#[cfg(not(debug_assertions))]
lazy_static!{
static ref BUILD_IDENT: recolored::ColoredString = "release".bright_red();
}
use std::ops::Deref;
status!("This is the lolicron daemon version {} by {} ({} build)", env!("CARGO_PKG_VERSION"), &AUTHORS[..], BUILD_IDENT.deref());
status!("---"); status!("---");
status!("Compiled with:"); status!("Compiled with ({}, {}, {}, {}):", "on".bright_red(), "default on".red(), "off".bright_blue(), "default off".blue());
#[cfg(nightly)] status!(" +nightly");
#[cfg(debug_assertions)] status!(" +debug_assertions"); #[cfg(nightly)] status!(" +nightly".bright_red());
#[cfg(debug_assertions)] status!(" +debug_assertions".red());
status!("features:"); status!("features:");
#[cfg(feature="threaded")] status!(" +threaded");
#[cfg(feature="threaded")] status!(" +threaded".red());
#[cfg(not(feature="threaded"))] status!(" -threaded".bright_blue());
#[cfg(feature="watcher_unlimited")] status!(" +watcher_unlimited".bright_red());
#[cfg(not(feature="watcher_unlimited"))] status!(" -watcher_unlimited".blue());
#[cfg(feature="watcher_timeout")] status!(" +watcher_timeout".red());
#[cfg(not(feature="watcher_timeout"))] status!(" -watcher_timeout".bright_blue());
status!(""); status!("");
status!("GPl'd with <3"); status!("GPl'd with <3");
status!("Please enjoy"); status!("Please enjoy");
@ -94,7 +117,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let oneesan = live::watch("."); let oneesan = live::watch(".");
{ {
let mut recv = oneesan.hook("main.rs", live::filter::ALL).await; let mut recv = oneesan.hook("src/main.rs", live::filter::ALL).await;
while let Some(event) = recv.recv().await while let Some(event) = recv.recv().await
{ {
important!("Got ev {:?}", event); important!("Got ev {:?}", event);

Loading…
Cancel
Save