//! Live config reloader. Handles hooks and filesystem wathcers
//!
//! # Notes
//! - The path passed to `watch()` MUST exist, or child will panic (TODO). But the path can be removed afterwards.
//! - When `Oneesan` is dropped, it signals child worker to gracefully shutdown
//! - When child worker panics, waiting on `Oneesan` can cause deadlock.
//! - The filename passed to `watch()` is canonicalised.
//! ## Hook format
//! - File paths are all relative to their root.
//! - Appending a directory path with `/` will cause events in file within the directory path being dispatched to also be dispatched to this hook. Otherwise, only events on the directory itself will dispatch.
//! - The empty string matches only the root directory
//! - A single `/` string will match everything.
//! # TODO
//! - Make child not panic if `watch()` is called on non-existant path
//! - Change hook from just string path prefix to pattern match with optional regex capability. (For finding and dispatching on global/local config files differently)
#![ allow(unused_imports) ] // For when hotreload feature disabled
use super ::* ;
use std ::{
path ::{
PathBuf ,
Path ,
} ,
sync ::Arc ,
fmt ,
marker ::{
Send , Sync ,
} ,
ops ::{
Deref ,
} ,
} ;
use tokio ::{
sync ::{
RwLock ,
Mutex ,
oneshot ,
mpsc ::{
self ,
error ::SendTimeoutError ,
} ,
} ,
task ::{
JoinHandle ,
} ,
time ,
} ;
use futures ::{
future ::{
join_all ,
} ,
} ;
use notify ::{
Watcher ,
RecursiveMode ,
RecommendedWatcher ,
event ,
} ;
/// An event to be passed to `Context`.
pub type Event = event ::EventKind ;
/// Max event backlog (not used if `unlimited_watcher` is set)
const GLOBAL_BACKLOG : usize = config ::build ::hot ::BACKLOG ; //100;
/// Timeout for hook dispatches in seconds. Only set if `watcher_timeout` feature is enabled.
const GLOBAL_TIMEOUT : u64 = config ::build ::hot ::TIMEOUT ; //5
/// Decontruct a notify event into event kind and full paths.
#[ inline ]
fn deconstruct_event < P : AsRef < Path > > ( root : P , event : event ::Event ) -> ( Vec < PathBuf > , Event )
{
let root = root . as_ref ( ) ;
( event . paths . into_iter ( ) . map ( | x | {
// Remove root
x . strip_prefix ( root ) . unwrap_or ( & x ) . to_owned ( )
} ) . collect ( ) , event . kind )
}
/// An event filter function
pub type EventFilterFn = Box < dyn Fn ( & Event ) -> bool + Send + Sync > ;
/// Represents one path hook
pub struct Hook {
path : PathBuf ,
sender : Mutex < mpsc ::Sender < Event > > ,
filter : Option < EventFilterFn > ,
}
impl std ::fmt ::Debug for Hook
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
write! ( f , "(path: {:?}, sender: {:?}, filter: {})" , self . path , self . sender , self . filter . is_some ( ) )
}
}
/// Check if `two` is a partial match of `one`.
#[ inline ]
fn is_path_match < P , Q > ( one : P , two : Q ) -> bool
where P : AsRef < Path > ,
Q : AsRef < Path >
{
let ( one , two ) = ( one . as_ref ( ) , two . as_ref ( ) ) ;
if one . as_os_str ( ) = = "/" {
return true ; //match-all case
}
//debug!("Checking {:?} ?== {:?}", one,two);
//println!(" {:?} {:?} {} {}", one, two, one.to_str().map(|x| x.ends_with("/")).unwrap_or(false), two.starts_with(one));
one = = two | | {
one . to_str ( ) . map ( | x | x . ends_with ( "/" ) ) . unwrap_or ( false ) & & two . starts_with ( one )
}
}
impl Hook {
/// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly.
#[ cfg(feature= " watcher_timeout " ) ]
const DISPATCH_TIMEOUT : Option < time ::Duration > = Some ( time ::Duration ::from_secs ( GLOBAL_TIMEOUT ) ) ;
#[ cfg(not(feature= " watcher_timeout " )) ]
const DISPATCH_TIMEOUT : Option < time ::Duration > = None ;
const DISPATCH_MAX_BACKLOG : usize = 16 ;
/// Create a new `Hook` object, with a path, and an optional filter function.
/// Return the hook and the receiver for the hook.
pub fn new < F , P > ( path : P , filter : Option < F > ) -> ( Self , mpsc ::Receiver < Event > )
where F : Fn ( & Event ) -> bool + Send + Sync + ' static ,
P : AsRef < Path >
{
let ( tx , rx ) = mpsc ::channel ( Self ::DISPATCH_MAX_BACKLOG ) ;
( Self {
path : path . as_ref ( ) . to_owned ( ) , //std::fs::canonicalize(&path).unwrap_or(path.as_ref().to_owned()),
sender : Mutex ::new ( tx ) ,
filter : filter . map ( | f | Box ::new ( f ) as EventFilterFn ) ,
} , rx )
}
/// Force an event dispatch on this hook.
pub async fn dispatch ( & self , event : Event ) -> Result < ( ) , SendTimeoutError < Event > >
{
debug ! ( " dispatching on {:?}" , & self . path ) ;
let mut lock = self . sender . lock ( ) . await ;
match & Self ::DISPATCH_TIMEOUT {
Some ( timeout ) = > lock . send_timeout ( event , timeout . clone ( ) ) . await ? ,
None = > lock . send ( event ) . await . map_err ( | e | mpsc ::error ::SendTimeoutError ::Closed ( e . 0 ) ) ? ,
} ;
Ok ( ( ) )
}
/// Try to dispatch on this hook, if needed.
pub async fn try_dispatch ( & self , path : impl AsRef < Path > , event : & Event ) -> Result < bool , SendTimeoutError < Event > >
{
Ok ( if is_path_match ( & self . path , path ) {
self . dispatch ( event . to_owned ( ) ) . await ? ;
true
} else {
false
} )
}
/// Try to dispatch on one of many paths
pub async fn try_dispatch_many < T , P > ( & self , paths : T , event : & Event ) -> Result < bool , SendTimeoutError < Event > >
where T : IntoIterator < Item = P > ,
P : AsRef < Path >
{
for path in paths . into_iter ( ) {
if is_path_match ( & self . path , path ) {
self . dispatch ( event . to_owned ( ) ) . await ? ;
return Ok ( true ) ;
}
}
Ok ( false )
}
}
/// A hook container for `Oneesan`
#[ derive(Debug, Clone) ]
pub struct HookContainer ( Arc < RwLock < Vec < Hook > > > ) ; //TODO: Change to Arena
impl HookContainer
{
/// Add a hook to this container. The worker will be pushed the new hook.
///
/// Dropping the receiver will remove the hook.
/// # Note
/// This function does not check the path, and may add hooks that are never fired.
pub async fn hook < F , P > ( & self , path : P , filter : Option < F > ) -> mpsc ::Receiver < Event >
where F : Fn ( & Event ) -> bool + Send + Sync + ' static ,
P : AsRef < Path >
{
let mut hooks = self . 0. write ( ) . await ;
let ( hook , recv ) = Hook ::new ( path , filter ) ;
hooks . push ( hook ) ;
recv
}
}
/// A watcher context, we hook specific paths here, to be dispatched to on file change
#[ derive(Debug) ]
pub struct Oneesan {
path : PathBuf ,
hooks : HookContainer ,
shutdown : PhantomDrop < oneshot ::Sender < ( ) > , fn ( oneshot ::Sender < ( ) > ) -> ( ) > ,
handle : JoinHandle < ( ) > ,
}
impl Deref for Oneesan
{
type Target = HookContainer ;
fn deref ( & self ) -> & Self ::Target
{
& self . hooks
}
}
impl Oneesan
{
/// Add a hook to this container if `path` is inside this instance. The worker will be pushed the new hook if the check passes, if not, will return `None`.
///
/// Dropping the receiver will remove the hook.
pub async fn try_hook < F , P > ( & self , path : P , filter : Option < F > ) -> Option < mpsc ::Receiver < Event > >
where F : Fn ( & Event ) -> bool + Send + Sync + ' static ,
P : AsRef < Path >
{
let path = path . as_ref ( ) ;
if self . path . starts_with ( & path ) {
Some ( self . hook ( path , filter ) . await )
} else {
None
}
}
/// Join this instance
pub fn shutdown ( self ) -> JoinHandle < ( ) >
{
self . handle
}
}
#[ cfg(feature= " watcher " ) ]
/// Start watching this path for changes of files
pub fn watch ( path : impl AsRef < Path > ) -> Oneesan
{
let path = path . as_ref ( ) . to_owned ( ) ;
let hooks = Arc ::new ( RwLock ::new ( Vec ::new ( ) ) ) ;
let ( shutdown , mut shutdown_rx ) = oneshot ::channel ( ) ;
let handle = {
let path = std ::fs ::canonicalize ( & path ) . unwrap_or_else ( | _ | path . clone ( ) ) ;
let hooks = Arc ::clone ( & hooks ) ;
tokio ::spawn ( async move {
#[ cfg(feature= " watcher_unlimited " ) ]
let ( tx , mut rx ) = mpsc ::unbounded_channel ( ) ;
#[ cfg(not(feature= " watcher_unlimited " )) ]
let ( tx , mut rx ) = mpsc ::channel ( GLOBAL_BACKLOG ) ;
let mut watcher : RecommendedWatcher = Watcher ::new_immediate ( move | res | {
#[ cfg(not(feature= " watcher_unlimited " )) ]
let mut tx = tx . clone ( ) ;
match res {
Ok ( event ) = > if let Err ( err ) = {
#[ cfg(feature= " watcher_unlimited " ) ] { tx . send ( event ) }
#[ cfg(not(feature= " watcher_unlimited " )) ] { tx . try_send ( event ) }
}
{ warn ! ( "Watcher failed to pass event: {}" , err ) } ,
Err ( e ) = > error ! ( "Watcher returned error: {}" , e ) ,
}
} ) . expect ( "Failed to initialise watcher" ) ;
debug ! ( "Watcher initialised, starting for path {:?}" , & path ) ;
watcher . watch ( & path , RecursiveMode ::Recursive ) . expect ( "Failed to start watcher" ) ;
let work = async {
while let Some ( event ) = rx . recv ( ) . await {
//debug!(" -> {:?}", event);
let ( paths , event ) = deconstruct_event ( & path , event ) ;
let closed : Vec < usize > = { //XXX: Change to generational arena
let hooks = hooks . read ( ) . await ;
let mut i = 0 ;
join_all ( hooks . iter ( ) . map ( | hook : & Hook | hook . try_dispatch_many ( & paths [ .. ] , & event ) ) ) . await . into_iter ( ) . filter_map ( | res | {
( match res {
Err ( SendTimeoutError ::Closed ( _ ) ) = > Some ( i ) , //If channel closed, hook is ded, remove it.
_ = > None ,
} , i + = 1 ) . 0
} ) . collect ( )
} ;
if closed . len ( ) > 0 {
let mut hooks = hooks . write ( ) . await ;
for index in closed . into_iter ( ) {
debug ! ( "Closing dead hook {}" , index ) ;
hooks . remove ( index ) ;
}
}
}
} ;
tokio ::pin ! ( work ) ;
info ! ( "Watcher up" ) ;
tokio ::select ! {
_ = & mut shutdown_rx = > {
info ! ( "Received shutdown, closing gracefully" ) ;
} ,
_ = work = > { debug ! ( "Worker stopped on it's own, notify watcher has been dropped." ) } ,
} ;
debug ! ( "Watcher going down for shutdown now" ) ;
} )
} ;
let output = Oneesan {
hooks : HookContainer ( hooks ) ,
path : std ::fs ::canonicalize ( & path ) . unwrap_or ( path ) ,
shutdown : PhantomDrop ::new ( shutdown , | shutdown | {
if ! shutdown . is_closed ( )
{
shutdown . send ( ( ) ) . expect ( "mpsc fatal" ) ;
}
} ) ,
handle ,
} ;
output
}
pub mod filter
{
use super ::* ;
#[ inline(always) ]
const fn all ( _ : & Event ) -> bool { true }
pub const ALL : Option < fn ( & Event ) -> bool > = None ;
}