//! Live config reloader. Handles hooks and filesystem wathcers //! //! # Notes //! - The path passed to `watch()` MUST exist, or child will panic (TODO). But the path can be removed afterwards. //! - When `Oneesan` is dropped, it signals child worker to gracefully shutdown //! - When child worker panics, waiting on `Oneesan` can cause deadlock. //! - The filename passed to `watch()` is canonicalised. //! ## Hook format //! - File paths are all relative to their root. //! - Appending a directory path with `/` will cause events in file within the directory path being dispatched to also be dispatched to this hook. Otherwise, only events on the directory itself will dispatch. //! - The empty string matches only the root directory //! - A single `/` string will match everything. //! # TODO //! - Make child not panic if `watch()` is called on non-existant path //! - Change hook from just string path prefix to pattern match with optional regex capability. (For finding and dispatching on global/local config files differently) #![allow(unused_imports)] // For when hotreload feature disabled use super::*; use std::{ path::{ PathBuf, Path, }, sync::Arc, fmt, marker::{ Send,Sync, }, ops::{ Deref, }, }; use tokio::{ sync::{ RwLock, Mutex, oneshot, mpsc::{ self, error::SendTimeoutError, }, }, task::{ JoinHandle, }, time, }; use futures::{ future::{ join_all, }, }; use notify::{ Watcher, RecursiveMode, RecommendedWatcher, event, }; /// An event to be passed to `Context`. pub type Event = event::EventKind; /// Max event backlog (not used if `unlimited_watcher` is set) const GLOBAL_BACKLOG: usize = config::build::hot::BACKLOG; //100; /// Timeout for hook dispatches in seconds. Only set if `watcher_timeout` feature is enabled. const GLOBAL_TIMEOUT: u64 = config::build::hot::TIMEOUT; /// Decontruct a notify event into event kind and full paths. #[inline] fn deconstruct_event>(root: P,event: event::Event) -> (Vec, Event) { let root = root.as_ref(); (event.paths.into_iter().map(|x| { // Remove root x.strip_prefix(root).unwrap_or(&x).to_owned() }).collect(), event.kind) } /// An event filter function pub type EventFilterFn = Box bool + Send + Sync>; /// Represents one path hook pub struct Hook { path: PathBuf, sender: Mutex>, filter: Option, } impl std::fmt::Debug for Hook { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "(path: {:?}, sender: {:?}, filter: {})", self.path, self.sender, self.filter.is_some()) } } /// Check if `two` is a partial match of `one`. #[inline] fn is_path_match(one: P, two: Q) -> bool where P: AsRef, Q: AsRef { let (one, two) = (one.as_ref(), two.as_ref()); if one.as_os_str() == "/" { return true; //match-all case } //debug!("Checking {:?} ?== {:?}", one,two); //println!(" {:?} {:?} {} {}", one, two, one.to_str().map(|x| x.ends_with("/")).unwrap_or(false), two.starts_with(one)); one == two || { one.to_str().map(|x| x.ends_with("/")).unwrap_or(false) && two.starts_with(one) } } impl Hook { /// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly. #[cfg(feature="watcher_timeout")] const DISPATCH_TIMEOUT: Option = Some(time::Duration::from_secs(GLOBAL_TIMEOUT)); #[cfg(not(feature="watcher_timeout"))] const DISPATCH_TIMEOUT: Option = None; const DISPATCH_MAX_BACKLOG: usize = 16; /// Create a new `Hook` object, with a path, and an optional filter function. /// Return the hook and the receiver for the hook. pub fn new(path: P, filter: Option) -> (Self, mpsc::Receiver) where F: Fn(&Event) -> bool + Send + Sync + 'static, P: AsRef { let (tx, rx) = mpsc::channel(Self::DISPATCH_MAX_BACKLOG); (Self{ path: path.as_ref().to_owned(),//std::fs::canonicalize(&path).unwrap_or(path.as_ref().to_owned()), sender: Mutex::new(tx), filter: filter.map(|f| Box::new(f) as EventFilterFn), }, rx) } /// Force an event dispatch on this hook. pub async fn dispatch(&self, event: Event) -> Result<(), SendTimeoutError> { debug!(" dispatching on {:?}", &self.path); let mut lock =self.sender.lock().await; match &Self::DISPATCH_TIMEOUT { Some(timeout) => lock.send_timeout(event, timeout.clone()).await?, None => lock.send(event).await.map_err(|e| mpsc::error::SendTimeoutError::Closed(e.0))?, }; Ok(()) } /// Try to dispatch on this hook, if needed. pub async fn try_dispatch(&self, path: impl AsRef, event: &Event) -> Result> { Ok(if is_path_match(&self.path, path) { self.dispatch(event.to_owned()).await?; true } else { false }) } /// Try to dispatch on one of many paths pub async fn try_dispatch_many(&self, paths: T, event: &Event) -> Result> where T: IntoIterator, P: AsRef { for path in paths.into_iter() { if is_path_match(&self.path, path) { self.dispatch(event.to_owned()).await?; return Ok(true); } } Ok(false) } } /// A hook container for `Oneesan` #[derive(Debug, Clone)] pub struct HookContainer(Arc>>); impl HookContainer { /// Add a hook to this container. The worker will be pushed the new hook. /// /// Dropping the receiver will remove the hook. /// # Note /// This function does not check the path, and may add hooks that are never fired. pub async fn hook(&self, path: P, filter: Option) -> mpsc::Receiver where F: Fn(&Event) -> bool + Send + Sync + 'static, P: AsRef { let mut hooks = self.0.write().await; let (hook, recv) = Hook::new(path,filter); hooks.push(hook); recv } } /// A watcher context, we hook specific paths here, to be dispatched to on file change #[derive(Debug)] pub struct Oneesan { path: PathBuf, hooks: HookContainer, shutdown: PhantomDrop, fn (oneshot::Sender<()>) -> ()>, handle: JoinHandle<()>, } impl Deref for Oneesan { type Target = HookContainer; fn deref(&self) -> &Self::Target { &self.hooks } } impl Oneesan { /// Add a hook to this container if `path` is inside this instance. The worker will be pushed the new hook if the check passes, if not, will return `None`. /// /// Dropping the receiver will remove the hook. pub async fn try_hook(&self, path: P, filter: Option) -> Option> where F: Fn(&Event) -> bool + Send + Sync + 'static, P: AsRef { let path = path.as_ref(); if self.path.starts_with(&path) { Some(self.hook(path, filter).await) } else { None } } /// Join this instance pub fn shutdown(self) -> JoinHandle<()> { self.handle } } #[cfg(feature="watcher")] /// Start watching this path for changes of files pub fn watch(path: impl AsRef) -> Oneesan { let path = path.as_ref().to_owned(); let hooks = Arc::new(RwLock::new(Vec::new())); let (shutdown, mut shutdown_rx) = oneshot::channel(); let handle = { let path=std::fs::canonicalize(&path).unwrap_or_else(|_| path.clone()); let hooks = Arc::clone(&hooks); tokio::spawn(async move { #[cfg(feature="watcher_unlimited")] let (tx,mut rx) = mpsc::unbounded_channel(); #[cfg(not(feature="watcher_unlimited"))] let (tx,mut rx) = mpsc::channel(GLOBAL_BACKLOG); let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| { #[cfg(not(feature="watcher_unlimited"))] let mut tx = tx.clone(); match res { Ok(event) => if let Err(err) = { #[cfg(feature="watcher_unlimited")] {tx.send(event)} #[cfg(not(feature="watcher_unlimited"))] {tx.try_send(event)} } {warn!("Watcher failed to pass event: {}", err)}, Err(e) => error!("Watcher returned error: {}", e), } }).expect("Failed to initialise watcher"); debug!("Watcher initialised, starting for path {:?}", &path); watcher.watch(&path, RecursiveMode::Recursive).expect("Failed to start watcher"); let work = async { while let Some(event) = rx.recv().await { //debug!(" -> {:?}", event); let (paths, event) = deconstruct_event(&path, event); let closed: Vec = { //XXX: Change to generational arena let hooks = hooks.read().await; let mut i=0; join_all(hooks.iter().map(|hook: &Hook| hook.try_dispatch_many(&paths[..], &event))).await.into_iter().filter_map(|res| { (match res { Err(SendTimeoutError::Closed(_)) => Some(i), //If channel closed, hook is ded, remove it. _ => None, }, i+=1).0 }).collect() }; if closed.len() > 0 { let mut hooks = hooks.write().await; for index in closed.into_iter() { debug!("Closing dead hook {}", index); hooks.remove(index); } } } }; tokio::pin!(work); info!("Watcher up"); tokio::select!{ _ = &mut shutdown_rx => { info!("Received shutdown, closing gracefully"); }, _ = work => {debug!("Worker stopped on it's own, notify watcher has been dropped.")}, }; debug!("Watcher going down for shutdown now"); }) }; let output = Oneesan { hooks: HookContainer(hooks), path: std::fs::canonicalize(&path).unwrap_or(path), shutdown: PhantomDrop::new(shutdown, |shutdown| { if !shutdown.is_closed() { shutdown.send(()).expect("mpsc fatal"); } }), handle, }; output } pub mod filter { use super::*; #[inline(always)] const fn all(_: &Event) -> bool {true} pub const ALL: Option bool> = None; }