From 6ae2afa8c32fc3f2d1d29f30b6d9bc12bf252266 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 2 Aug 2020 19:43:56 +0100 Subject: [PATCH] watcher works TODO: Make it handle paths better, relative ones and such --- Cargo.lock | 45 +++++++-- Cargo.toml | 3 +- src/live.rs | 269 ++++++++++++++++++++++++++++++++++++++++++++++++++-- src/log.rs | 135 +++++++++++++++++++++++--- src/main.rs | 34 +++++-- src/util.rs | 85 +++++++++++++++++ 6 files changed, 534 insertions(+), 37 deletions(-) create mode 100644 src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 7980158..6accdc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "anymap" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33954243bd79057c2de7338850b85983a44588021f8a5fee574a8888c6de4344" + [[package]] name = "atty" version = "0.2.14" @@ -46,6 +52,27 @@ dependencies = [ "time", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "filetime" version = "0.2.10" @@ -66,9 +93,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "fsevent" -version = "0.4.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" +checksum = "97f347202c95c98805c216f9e1df210e8ebaec9fdb2365700a43c10797a35e63" dependencies = [ "bitflags", "fsevent-sys", @@ -76,9 +103,9 @@ dependencies = [ [[package]] name = "fsevent-sys" -version = "2.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +checksum = "77a29c77f1ca394c3e73a9a5d24cfcabb734682d9634fc398f2204a63c994120" dependencies = [ "libc", ] @@ -205,9 +232,9 @@ dependencies = [ [[package]] name = "inotify" -version = "0.7.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +checksum = "46dd0a94b393c730779ccfd2a872b67b1eb67be3fc33082e733bdb38b5fde4d4" dependencies = [ "bitflags", "inotify-sys", @@ -352,11 +379,13 @@ dependencies = [ [[package]] name = "notify" -version = "4.0.15" +version = "5.0.0-pre.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80ae4a7688d1fab81c5bf19c64fc8db920be8d519ce6336ed4e7efe024724dbd" +checksum = "77d03607cf88b4b160ba0e9ed425fff3cee3b55ac813f0c685b3a3772da37d0e" dependencies = [ + "anymap", "bitflags", + "crossbeam-channel", "filetime", "fsevent", "fsevent-sys", diff --git a/Cargo.toml b/Cargo.toml index 4a87add..214b668 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,11 +9,12 @@ edition = "2018" [features] default = ["threaded"] +# Run with threads threaded = ["tokio/rt-threaded"] [dependencies] tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "fs"]} -notify = "4.0" +notify = "5.0.0-pre.3" futures= "0.3" sexp = "1.1" once_cell = "1.4" diff --git a/src/live.rs b/src/live.rs index 5978163..fe2fe38 100644 --- a/src/live.rs +++ b/src/live.rs @@ -1,28 +1,283 @@ -//! Live config reloader +//! Live config reloader. Handles hooks and filesystem wathcers use super::*; use std::{ path::{ PathBuf, + Path, }, + sync::Arc, + fmt, + marker::{ + Send,Sync, + }, + ops::{ + Deref, + }, +}; +use tokio::{ + sync::{ + RwLock, + Mutex, + oneshot, + mpsc::{ + self, + error::SendTimeoutError, + }, + }, + task::{ + JoinHandle, + self, + }, + time, +}; +use futures::{ + future::{ + join_all, + }, +}; +use notify::{ + Watcher, + RecursiveMode, + RecommendedWatcher, + event, }; /// An event to be passed to `Context`. -#[derive(Debug)] -pub enum Event +pub type Event = event::EventKind; + +/// Decontruct a notify event into event kind and full paths. +#[inline] +fn deconstruct_event(event: event::Event) -> (Vec, Event) { + // Dunno if this is needed + (event.paths.into_iter().map(|x| std::fs::canonicalize(&x).unwrap_or(x)).collect(), event.kind) +} + +/// An event filter function +pub type EventFilterFn = Box bool + Send + Sync>; + +/// Represents one path hook +pub struct Hook { + path: PathBuf, + sender: Mutex>, + filter: Option, +} + +impl std::fmt::Debug for Hook +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "(path: {:?}, sender: {:?}, filter: {})", self.path, self.sender, self.filter.is_some()) + } +} + + +impl Hook { + /// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly. + const DISPATCH_TIMEOUT: Option = Some(time::Duration::from_secs(5)); + const DISPATCH_MAX_BACKLOG: usize = 16; + + /// Create a new `Hook` object, with a path, and an optional filter function. + /// Return the hook and the receiver for the hook. + pub fn new(path: P, filter: Option) -> (Self, mpsc::Receiver) + where F: Fn(&Event) -> bool + Send + Sync + 'static, + P: AsRef + { + let (tx, rx) = mpsc::channel(Self::DISPATCH_MAX_BACKLOG); + + (Self{ + path: std::fs::canonicalize(&path).unwrap_or(path.as_ref().to_owned()), + sender: Mutex::new(tx), + filter: filter.map(|f| Box::new(f) as EventFilterFn), + }, rx) + } + /// Force an event dispatch on this hook. + pub async fn dispatch(&self, event: Event) -> Result<(), SendTimeoutError> + { + debug!(" dispatching on {:?}", &self.path); + let mut lock =self.sender.lock().await; + + match &Self::DISPATCH_TIMEOUT { + Some(timeout) => lock.send_timeout(event, timeout.clone()).await?, + None => lock.send(event).await.map_err(|e| mpsc::error::SendTimeoutError::Closed(e.0))?, + }; + Ok(()) + } + + /// Try to dispatch on this hook, if needed. + pub async fn try_dispatch(&self, path: impl AsRef, event: &Event) -> Result> + { + Ok(if self.path == path.as_ref() { + self.dispatch(event.to_owned()).await?; + true + } else { + false + }) + } + + /// Try to dispatch on one of many paths + pub async fn try_dispatch_many(&self, paths: T, event: &Event) -> Result> + where T: AsRef<[P]>, + P: AsRef + { + for path in paths.as_ref().iter() { + if self.path == path.as_ref() { + self.dispatch(event.to_owned()).await?; + return Ok(true); + } + } + Ok(false) + } +} + +/// A hook container for `Oneesan` +#[derive(Debug, Clone)] +pub struct HookContainer(Arc>>); + +impl HookContainer +{ + /// Add a hook to this container. The worker will be pushed the new hook. + /// + /// Dropping the receiver will remove the hook. + /// # Note + /// This function does not check the path, and may add hooks that are never fired. + pub async fn hook(&self, path: P, filter: Option) -> mpsc::Receiver + where F: Fn(&Event) -> bool + Send + Sync + 'static, + P: AsRef + + { + let mut hooks = self.0.write().await; + let (hook, recv) = Hook::new(path,filter); + hooks.push(hook); + recv + } } -/// A watcher context, we hook specific `Context`s here, to be dispatched to on file change +/// A watcher context, we hook specific paths here, to be dispatched to on file change #[derive(Debug)] pub struct Oneesan { path: PathBuf, - //TODO: Hooks + + hooks: HookContainer, + shutdown: PhantomDrop, fn (oneshot::Sender<()>) -> ()>, + handle: JoinHandle<()>, + +} + +impl Deref for Oneesan +{ + type Target = HookContainer; + fn deref(&self) -> &Self::Target + { + &self.hooks + } +} + +impl Oneesan +{ + /// Add a hook to this container if `path` is inside this instance. The worker will be pushed the new hook if the check passes, if not, will return `None`. + /// + /// Dropping the receiver will remove the hook. + pub async fn try_hook(&self, path: P, filter: Option) -> Option> + where F: Fn(&Event) -> bool + Send + Sync + 'static, + P: AsRef + { + let path = path.as_ref(); + if self.path.starts_with(&path) { + Some(self.hook(path, filter).await) + } else { + None + } + } + /// Join this instance + pub fn shutdown(self) -> JoinHandle<()> + { + self.handle + } } /// Start watching this path for changes of files -pub fn watch(path: PathBuf) -> Oneesan +pub fn watch(path: impl AsRef) -> Oneesan { - todo!() + let path = path.as_ref().to_owned(); + let hooks = Arc::new(RwLock::new(Vec::new())); + let (shutdown, mut shutdown_rx) = oneshot::channel(); + + let handle = { + let path=path.clone(); + let hooks = Arc::clone(&hooks); + tokio::spawn(async move { + let (tx,mut rx) = mpsc::unbounded_channel(); + + let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| { + match res { + Ok(event) => if let Err(err) = tx.send(event) {warn!("Watcher failed to pass event: {}", err)}, + Err(e) => error!("Watcher returned error: {}", e), + } + }).expect("Failed to initialise watcher"); + + debug!("Watcher initialised, starting for path {:?}", &path); + watcher.watch(path, RecursiveMode::Recursive).expect("Failed to start watcher"); + + let work = async { + while let Some(event) = rx.recv().await { + debug!(" -> {:?}", event); + let (paths, event) = deconstruct_event(event); + + let closed: Vec = { //XXX: Change to generational arena + let hooks = hooks.read().await; + + let mut i=0; + join_all(hooks.iter().map(|hook: &Hook| hook.try_dispatch_many(&paths[..], &event))).await.into_iter().filter_map(|res| { + (match res { + Err(SendTimeoutError::Closed(_)) => Some(i), //If channel closed, hook is ded, remove it. + _ => None, + }, i+=1).0 + }).collect() + }; + if closed.len() > 0 { + let mut hooks = hooks.write().await; + for index in closed.into_iter() { + debug!("Closing dead hook {}", index); + hooks.remove(index); + } + } + } + }; + tokio::pin!(work); + info!("Watcher up"); + tokio::select!{ + _ = &mut shutdown_rx => { + info!("Received shutdown, closing gracefully"); + }, + _ = work => {debug!("Worker stopped on it's own, notify watcher has been dropped.")}, + }; + debug!("Watcher going down for shutdown now"); + }) + }; + + let output = Oneesan { + hooks: HookContainer(hooks), + path: std::fs::canonicalize(&path).unwrap_or(path), + shutdown: PhantomDrop::new(shutdown, |shutdown| { + if !shutdown.is_closed() + { + shutdown.send(()).expect("mpsc fatal"); + } + }), + handle, + }; + + + output } +pub mod filter +{ + use super::*; + + #[inline(always)] + const fn all(_: &Event) -> bool {true} + pub const ALL: Option bool> = None; +} diff --git a/src/log.rs b/src/log.rs index bcb7dce..07e03cd 100644 --- a/src/log.rs +++ b/src/log.rs @@ -9,6 +9,7 @@ use std::{ self, Write, }, + borrow::Borrow, }; use once_cell::sync::OnceCell; @@ -23,6 +24,22 @@ pub enum Level Debug, } +/// Append this trait to allow you to +pub trait AsLevel: fmt::Display + Borrow{ + /// Used for derived levels + fn prefix(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "{}/", Borrow::::borrow(self)) + } +} + +impl AsLevel for Level{ + #[inline] fn prefix(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result{Ok(())} +} +impl AsLevel for &Level{ + #[inline] fn prefix(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result{Ok(())} +} + impl Default for Level { #[inline] @@ -88,7 +105,10 @@ impl Logger Logger::global() } - pub fn println(&self, mut to: impl Write, level: Level, what: impl Display) -> io::Result<()> + pub fn println(&self, mut to: W, level: L, what: D) -> io::Result<()> + where W: Write, + L: AsLevel, + D: Display, { //lol enum Date { @@ -125,17 +145,33 @@ impl Logger } - if self.level >= level { + if &self.level >= level.borrow() { let now: Date = if self.use_local_time { chrono::offset::Local::now().into() } else { chrono::offset::Utc::now().into() }; - + + write!(to, "{} [", now)?; + + struct Prefix<'a,L: AsLevel>(&'a L); + + impl<'a,L: AsLevel> std::fmt::Display for Prefix<'a, L> + { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + self.0.prefix(f) + } + } + + write!(to, "{}", Prefix(&level))?; //what a mess... + write!(to, "{}", level)?; + write!(to, "]")?; if self.title.len() > 0 { - write!(to, "{} [{}] <{}>: ", now, level, self.title)?; + write!(to, " <{}>: ", self.title)?; } else { - write!(to, "{} [{}]: ", now, level)?; + write!(to, ": ")?; } writeln!(to, "{}", what)?; } @@ -148,11 +184,11 @@ impl Logger { let stdout = std::io::stdout(); let stdout = stdout.lock(); - $crate::log::Logger::global().println(stdout, $crate::log::Level::Debug, $obj).expect("i/o error") + $crate::log::Logger::global().println(stdout, $crate::log::Level::Debug, $obj).expect("i/o error"); } }; ($fmt:literal, $($args:expr),*) => { - debug!(lazy_format::lazy_format!($fmt, $($args,)*)) + debug!(format!($fmt, $($args,)*)); }; } @@ -161,11 +197,46 @@ impl Logger { let stdout = std::io::stdout(); let stdout = stdout.lock(); - $crate::log::Logger::global().println(stdout, $crate::log::Level::Info, $obj).expect("i/o error") + $crate::log::Logger::global().println(stdout, $crate::log::Level::Info, $obj).expect("i/o error"); } }; ($fmt:literal, $($args:expr),*) => { - info!(lazy_format::lazy_format!($fmt, $($args,)*)) + info!(format!($fmt, $($args,)*)); + }; +} + +#[macro_export] macro_rules! important { + ($obj:expr) => { + { + struct Important; + use std::{ + borrow::Borrow, + fmt, + }; + impl Borrow<$crate::log::Level> for Important { + fn borrow(&self) -> &$crate::log::Level + { + &$crate::log::Level::Info + } + } + impl fmt::Display for Important + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + use recolored::Colorize; + write!(f, "{}", "Important".bright_blue()) + } + } + + impl $crate::log::AsLevel for Important{} + + let stdout = std::io::stdout(); + let stdout = stdout.lock(); + $crate::log::Logger::global().println(stdout, Important, $obj).expect("i/o error"); + } + }; + ($fmt:literal, $($args:expr),*) => { + important!(format!($fmt, $($args,)*)); }; } @@ -175,11 +246,47 @@ impl Logger { let stderr = std::io::stderr(); let stderr = stderr.lock(); - $crate::log::Logger::global().println(stderr, $crate::log::Level::Warn, $obj).expect("i/o error") + $crate::log::Logger::global().println(stderr, $crate::log::Level::Warn, $obj).expect("i/o error"); + } + }; + ($fmt:literal, $($args:expr),*) => { + warn!(format!($fmt, $($args,)*)); + }; +} + + +#[macro_export] macro_rules! dangerous { + ($obj:expr) => { + { + struct Dangerous; + use std::{ + borrow::Borrow, + fmt, + }; + impl Borrow<$crate::log::Level> for Dangerous { + fn borrow(&self) -> &$crate::log::Level + { + &$crate::log::Level::Warn + } + } + impl fmt::Display for Dangerous + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + use recolored::Colorize; + write!(f, "{}", "Dangerous".bright_yellow()) + } + } + + impl $crate::log::AsLevel for Dangerous{} + + let stderr = std::io::stderr(); + let stderr = stderr.lock(); + $crate::log::Logger::global().println(stderr, Dangerous, $obj).expect("i/o error"); } }; ($fmt:literal, $($args:expr),*) => { - warn!(lazy_format::lazy_format!($fmt, $($args,)*)) + dangerous!(format!($fmt, $($args,)*)); }; } @@ -188,11 +295,11 @@ impl Logger { let stderr = std::io::stderr(); let stderr = stderr.lock(); - $crate::log::Logger::global().println(stderr, $crate::log::Level::Error, $obj).expect("i/o error") + $crate::log::Logger::global().println(stderr, $crate::log::Level::Error, $obj).expect("i/o error"); } }; ($fmt:literal, $($args:expr),*) => { - error!(lazy_format::lazy_format!($fmt, $($args,)*)) + error!(format!($fmt, $($args,)*)); }; } @@ -207,7 +314,7 @@ impl Logger } }; ($fmt:literal, $($args:expr),*) => { - error!(lazy_format::lazy_format!($fmt, $($args,)*)) + error!(format!($fmt, $($args,)*)); }; } diff --git a/src/main.rs b/src/main.rs index 2ddd9c6..73b9738 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,6 @@ #![cfg_attr(nightly, int_error_matching)] +#![cfg_attr(nightly, const_fn)] + #![allow(dead_code)] @@ -14,6 +16,9 @@ use tokio::{ task, }; +mod util; +use util::*; + mod log; mod interval; @@ -29,7 +34,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo let (tx, mut rx) = mpsc::channel(16); let handle = tokio::spawn(async move { - println!("starting?"); + println!("starting?"); loop { let mut tick = interval.tick(); @@ -58,18 +63,33 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo #[tokio::main] async fn main() -> Result<(), Box> { - log::init(Default::default()); + log::init(log::Level::Debug); debug!("Initialised"); + let oneesan = live::watch("."); + + { + let mut recv = oneesan.hook("src/main.rs", live::filter::ALL).await; + while let Some(event) = recv.recv().await + { + important!("Got ev {:?}", event); + break; + } + } + + oneesan.shutdown().await.expect("oneesan panic"); + println!("{:?}", config::parse_global_single("example.rori").await.expect("Waaaaaah")); + dangerous!("Wheeeee"); //let (mut tx, h) = do_thing_every().await?; - // loop { - // time::delay_for(Duration::from_secs(6)).await; - // tx.send(()).await?; - // } - // h.await?; + // loop { + // time::delay_for(Duration::from_secs(6)).await; + // tx.send(()).await?; + // } + // h.await?; + Ok(()) } diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..0764e23 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,85 @@ +use std::{ + ops::{ + Drop, + Deref, + DerefMut, + }, + mem::replace, + fmt, +}; + +/// Allow for running function on drop, to support moving out of a larger structure. +pub struct PhantomDrop(Option<(T,F)>) +where F: FnOnce(T); + + +impl fmt::Debug for PhantomDrop +where F:FnOnce(T), + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + self.deref().fmt(f) + } +} + +impl Drop for PhantomDrop +where F:FnOnce(T) +{ + fn drop(&mut self) { + if let Some((value, func)) = replace(&mut self.0, None) + { + func(value); + } + } +} + + +impl PhantomDrop +where F:FnOnce(T) +{ + /// Create a new `PhantomDrop` with a drop closure and a value. + #[cfg(nightly)] + pub const fn new(value: T, func: F) -> Self + { + Self(Some((value,func))) + } + + /// Create a new `PhantomDrop` with a drop closure and a value. + #[cfg(not(nightly))] + pub fn new(value: T, func: F) -> Self + { + Self(Some((value,func))) + } +} + + +impl Deref for PhantomDrop +where F:FnOnce(T) +{ + type Target = T; + fn deref(&self) -> &Self::Target + { + if let Some((t, _)) = &self.0 + { + t + } else { + panic!("Double drop?") + } + } +} + + +impl DerefMut for PhantomDrop +where F:FnOnce(T) +{ + fn deref_mut(&mut self) -> &mut ::Target + { + if let Some((t, _)) = &mut self.0 + { + t + } else { + panic!("Double drop?") + } + } +}