You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
9.8 KiB

//! Live config reloader. Handles hooks and filesystem wathcers
//!
//! # Notes
//! - The path passed to `watch()` MUST exist, or child will panic (TODO). But the path can be removed afterwards.
//! - When `Oneesan` is dropped, it signals child worker to gracefully shutdown
//! - When child worker panics, waiting on `Oneesan` can cause deadlock.
//! - The filename passed to `watch()` is canonicalised.
//! ## Hook format
//! - File paths are all relative to their root.
//! - Appending a directory path with `/` will cause events in file within the directory path being dispatched to also be dispatched to this hook. Otherwise, only events on the directory itself will dispatch.
//! - The empty string matches only the root directory
//! - A single `/` string will match everything.
//! # TODO
//! - Make child not panic if `watch()` is called on non-existant path
//! - Change hook from just string path prefix to pattern match with optional regex capability. (For finding and dispatching on global/local config files differently)
#![allow(unused_imports)] // For when hotreload feature disabled
use super::*;
use std::{
path::{
PathBuf,
Path,
},
sync::Arc,
fmt,
marker::{
Send,Sync,
},
ops::{
Deref,
},
};
use tokio::{
sync::{
RwLock,
Mutex,
oneshot,
mpsc::{
self,
error::SendTimeoutError,
},
},
task::{
JoinHandle,
},
time,
};
use futures::{
future::{
join_all,
},
};
use notify::{
Watcher,
RecursiveMode,
RecommendedWatcher,
event,
};
/// An event to be passed to `Context`.
pub type Event = event::EventKind;
/// Max event backlog (not used if `unlimited_watcher` is set)
const GLOBAL_BACKLOG: usize = config::build::hot::BACKLOG; //100;
/// Timeout for hook dispatches in seconds. Only set if `watcher_timeout` feature is enabled.
const GLOBAL_TIMEOUT: u64 = config::build::hot::TIMEOUT; //5
/// Decontruct a notify event into event kind and full paths.
#[inline]
fn deconstruct_event<P: AsRef<Path>>(root: P,event: event::Event) -> (Vec<PathBuf>, Event)
{
let root = root.as_ref();
(event.paths.into_iter().map(|x| {
// Remove root
x.strip_prefix(root).unwrap_or(&x).to_owned()
}).collect(), event.kind)
}
/// An event filter function
pub type EventFilterFn = Box<dyn Fn(&Event) -> bool + Send + Sync>;
/// Represents one path hook
pub struct Hook {
path: PathBuf,
sender: Mutex<mpsc::Sender<Event>>,
filter: Option<EventFilterFn>,
}
impl std::fmt::Debug for Hook
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "(path: {:?}, sender: {:?}, filter: {})", self.path, self.sender, self.filter.is_some())
}
}
/// Check if `two` is a partial match of `one`.
#[inline]
fn is_path_match<P,Q>(one: P, two: Q) -> bool
where P: AsRef<Path>,
Q: AsRef<Path>
{
let (one, two) = (one.as_ref(), two.as_ref());
if one.as_os_str() == "/" {
return true; //match-all case
}
//debug!("Checking {:?} ?== {:?}", one,two);
//println!(" {:?} {:?} {} {}", one, two, one.to_str().map(|x| x.ends_with("/")).unwrap_or(false), two.starts_with(one));
one == two || {
one.to_str().map(|x| x.ends_with("/")).unwrap_or(false) && two.starts_with(one)
}
}
impl Hook {
/// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly.
#[cfg(feature="watcher_timeout")]
const DISPATCH_TIMEOUT: Option<time::Duration> = Some(time::Duration::from_secs(GLOBAL_TIMEOUT));
#[cfg(not(feature="watcher_timeout"))]
const DISPATCH_TIMEOUT: Option<time::Duration> = None;
const DISPATCH_MAX_BACKLOG: usize = 16;
/// Create a new `Hook` object, with a path, and an optional filter function.
/// Return the hook and the receiver for the hook.
pub fn new<F,P>(path: P, filter: Option<F>) -> (Self, mpsc::Receiver<Event>)
where F: Fn(&Event) -> bool + Send + Sync + 'static,
P: AsRef<Path>
{
let (tx, rx) = mpsc::channel(Self::DISPATCH_MAX_BACKLOG);
(Self{
path: path.as_ref().to_owned(),//std::fs::canonicalize(&path).unwrap_or(path.as_ref().to_owned()),
sender: Mutex::new(tx),
filter: filter.map(|f| Box::new(f) as EventFilterFn),
}, rx)
}
/// Force an event dispatch on this hook.
pub async fn dispatch(&self, event: Event) -> Result<(), SendTimeoutError<Event>>
{
debug!(" dispatching on {:?}", &self.path);
let mut lock =self.sender.lock().await;
match &Self::DISPATCH_TIMEOUT {
Some(timeout) => lock.send_timeout(event, timeout.clone()).await?,
None => lock.send(event).await.map_err(|e| mpsc::error::SendTimeoutError::Closed(e.0))?,
};
Ok(())
}
/// Try to dispatch on this hook, if needed.
pub async fn try_dispatch(&self, path: impl AsRef<Path>, event: &Event) -> Result<bool, SendTimeoutError<Event>>
{
Ok(if is_path_match(&self.path, path) {
self.dispatch(event.to_owned()).await?;
true
} else {
false
})
}
/// Try to dispatch on one of many paths
pub async fn try_dispatch_many<T,P>(&self, paths: T, event: &Event) -> Result<bool, SendTimeoutError<Event>>
where T: IntoIterator<Item=P>,
P: AsRef<Path>
{
for path in paths.into_iter() {
if is_path_match(&self.path, path) {
self.dispatch(event.to_owned()).await?;
return Ok(true);
}
}
Ok(false)
}
}
/// A hook container for `Oneesan`
#[derive(Debug, Clone)]
pub struct HookContainer(Arc<RwLock<Vec<Hook>>>); //TODO: Change to Arena
impl HookContainer
{
/// Add a hook to this container. The worker will be pushed the new hook.
///
/// Dropping the receiver will remove the hook.
/// # Note
/// This function does not check the path, and may add hooks that are never fired.
pub async fn hook<F,P>(&self, path: P, filter: Option<F>) -> mpsc::Receiver<Event>
where F: Fn(&Event) -> bool + Send + Sync + 'static,
P: AsRef<Path>
{
let mut hooks = self.0.write().await;
let (hook, recv) = Hook::new(path,filter);
hooks.push(hook);
recv
}
}
/// A watcher context, we hook specific paths here, to be dispatched to on file change
#[derive(Debug)]
pub struct Oneesan {
path: PathBuf,
hooks: HookContainer,
shutdown: PhantomDrop<oneshot::Sender<()>, fn (oneshot::Sender<()>) -> ()>,
handle: JoinHandle<()>,
}
impl Deref for Oneesan
{
type Target = HookContainer;
fn deref(&self) -> &Self::Target
{
&self.hooks
}
}
impl Oneesan
{
/// Add a hook to this container if `path` is inside this instance. The worker will be pushed the new hook if the check passes, if not, will return `None`.
///
/// Dropping the receiver will remove the hook.
pub async fn try_hook<F,P>(&self, path: P, filter: Option<F>) -> Option<mpsc::Receiver<Event>>
where F: Fn(&Event) -> bool + Send + Sync + 'static,
P: AsRef<Path>
{
let path = path.as_ref();
if self.path.starts_with(&path) {
Some(self.hook(path, filter).await)
} else {
None
}
}
/// Join this instance
pub fn shutdown(self) -> JoinHandle<()>
{
self.handle
}
}
#[cfg(feature="watcher")]
/// Start watching this path for changes of files
pub fn watch(path: impl AsRef<Path>) -> Oneesan
{
let path = path.as_ref().to_owned();
let hooks = Arc::new(RwLock::new(Vec::new()));
let (shutdown, mut shutdown_rx) = oneshot::channel();
let handle = {
let path=std::fs::canonicalize(&path).unwrap_or_else(|_| path.clone());
let hooks = Arc::clone(&hooks);
tokio::spawn(async move {
#[cfg(feature="watcher_unlimited")]
let (tx,mut rx) = mpsc::unbounded_channel();
#[cfg(not(feature="watcher_unlimited"))]
let (tx,mut rx) = mpsc::channel(GLOBAL_BACKLOG);
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
#[cfg(not(feature="watcher_unlimited"))]
let mut tx = tx.clone();
match res {
Ok(event) => if let Err(err) = {
#[cfg(feature="watcher_unlimited")] {tx.send(event)}
#[cfg(not(feature="watcher_unlimited"))] {tx.try_send(event)}
} {
warn!("Watcher failed to pass event: {}", err);
},
Err(e) => error!("Watcher returned error: {}", e),
}
}).expect("Failed to initialise watcher");
debug!("Watcher initialised, starting for path {:?}", &path);
watcher.watch(&path, RecursiveMode::Recursive).expect("Failed to start watcher");
let work = async {
while let Some(event) = rx.recv().await {
//debug!(" -> {:?}", event);
let (paths, event) = deconstruct_event(&path, event);
let closed: Vec<usize> = { //XXX: Change to generational arena
let hooks = hooks.read().await;
let mut i=0;
join_all(hooks.iter().map(|hook: &Hook| hook.try_dispatch_many(&paths[..], &event))).await.into_iter().filter_map(|res| {
(match res {
Err(SendTimeoutError::Closed(_)) => Some(i), //If channel closed, hook is ded, remove it.
_ => None,
}, i+=1).0
}).collect()
};
if closed.len() > 0 {
let mut hooks = hooks.write().await;
for index in closed.into_iter() {
debug!("Closing dead hook {}", index);
hooks.remove(index);
}
}
}
};
tokio::pin!(work);
info!("Watcher up");
tokio::select!{
_ = &mut shutdown_rx => {
info!("Received shutdown, closing gracefully");
},
_ = work => {debug!("Worker stopped on it's own, notify watcher has been dropped.")},
};
debug!("Watcher going down for shutdown now");
})
};
let output = Oneesan {
hooks: HookContainer(hooks),
path: std::fs::canonicalize(&path).unwrap_or(path),
shutdown: PhantomDrop::new(shutdown, |shutdown| {
if !shutdown.is_closed()
{
shutdown.send(()).expect("mpsc fatal");
}
}),
handle,
};
output
}
pub mod filter
{
use super::*;
#[inline(always)]
const fn all(_: &Event) -> bool {true}
pub const ALL: Option<fn (&Event) -> bool> = None;
}