parent
7c86c6afda
commit
6ae2afa8c3
@ -1,28 +1,283 @@
|
|||||||
//! Live config reloader
|
//! Live config reloader. Handles hooks and filesystem wathcers
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::{
|
use std::{
|
||||||
path::{
|
path::{
|
||||||
PathBuf,
|
PathBuf,
|
||||||
|
Path,
|
||||||
},
|
},
|
||||||
|
sync::Arc,
|
||||||
|
fmt,
|
||||||
|
marker::{
|
||||||
|
Send,Sync,
|
||||||
|
},
|
||||||
|
ops::{
|
||||||
|
Deref,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
sync::{
|
||||||
|
RwLock,
|
||||||
|
Mutex,
|
||||||
|
oneshot,
|
||||||
|
mpsc::{
|
||||||
|
self,
|
||||||
|
error::SendTimeoutError,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
task::{
|
||||||
|
JoinHandle,
|
||||||
|
self,
|
||||||
|
},
|
||||||
|
time,
|
||||||
|
};
|
||||||
|
use futures::{
|
||||||
|
future::{
|
||||||
|
join_all,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use notify::{
|
||||||
|
Watcher,
|
||||||
|
RecursiveMode,
|
||||||
|
RecommendedWatcher,
|
||||||
|
event,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An event to be passed to `Context`.
|
/// An event to be passed to `Context`.
|
||||||
#[derive(Debug)]
|
pub type Event = event::EventKind;
|
||||||
pub enum Event
|
|
||||||
|
/// Decontruct a notify event into event kind and full paths.
|
||||||
|
#[inline]
|
||||||
|
fn deconstruct_event(event: event::Event) -> (Vec<PathBuf>, Event)
|
||||||
{
|
{
|
||||||
|
// Dunno if this is needed
|
||||||
|
(event.paths.into_iter().map(|x| std::fs::canonicalize(&x).unwrap_or(x)).collect(), event.kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An event filter function
|
||||||
|
pub type EventFilterFn = Box<dyn Fn(&Event) -> bool + Send + Sync>;
|
||||||
|
|
||||||
|
/// Represents one path hook
|
||||||
|
pub struct Hook {
|
||||||
|
path: PathBuf,
|
||||||
|
sender: Mutex<mpsc::Sender<Event>>,
|
||||||
|
filter: Option<EventFilterFn>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for Hook
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "(path: {:?}, sender: {:?}, filter: {})", self.path, self.sender, self.filter.is_some())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Hook {
|
||||||
|
/// Optional timeout for dispatches. None for infinite wait. A high or `None` value can cause event backlogs to overflow when receivers are not taking events properly.
|
||||||
|
const DISPATCH_TIMEOUT: Option<time::Duration> = Some(time::Duration::from_secs(5));
|
||||||
|
const DISPATCH_MAX_BACKLOG: usize = 16;
|
||||||
|
|
||||||
|
/// Create a new `Hook` object, with a path, and an optional filter function.
|
||||||
|
/// Return the hook and the receiver for the hook.
|
||||||
|
pub fn new<F,P>(path: P, filter: Option<F>) -> (Self, mpsc::Receiver<Event>)
|
||||||
|
where F: Fn(&Event) -> bool + Send + Sync + 'static,
|
||||||
|
P: AsRef<Path>
|
||||||
|
{
|
||||||
|
let (tx, rx) = mpsc::channel(Self::DISPATCH_MAX_BACKLOG);
|
||||||
|
|
||||||
|
(Self{
|
||||||
|
path: std::fs::canonicalize(&path).unwrap_or(path.as_ref().to_owned()),
|
||||||
|
sender: Mutex::new(tx),
|
||||||
|
filter: filter.map(|f| Box::new(f) as EventFilterFn),
|
||||||
|
}, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Force an event dispatch on this hook.
|
||||||
|
pub async fn dispatch(&self, event: Event) -> Result<(), SendTimeoutError<Event>>
|
||||||
|
{
|
||||||
|
debug!(" dispatching on {:?}", &self.path);
|
||||||
|
let mut lock =self.sender.lock().await;
|
||||||
|
|
||||||
|
match &Self::DISPATCH_TIMEOUT {
|
||||||
|
Some(timeout) => lock.send_timeout(event, timeout.clone()).await?,
|
||||||
|
None => lock.send(event).await.map_err(|e| mpsc::error::SendTimeoutError::Closed(e.0))?,
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to dispatch on this hook, if needed.
|
||||||
|
pub async fn try_dispatch(&self, path: impl AsRef<Path>, event: &Event) -> Result<bool, SendTimeoutError<Event>>
|
||||||
|
{
|
||||||
|
Ok(if self.path == path.as_ref() {
|
||||||
|
self.dispatch(event.to_owned()).await?;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to dispatch on one of many paths
|
||||||
|
pub async fn try_dispatch_many<T,P>(&self, paths: T, event: &Event) -> Result<bool, SendTimeoutError<Event>>
|
||||||
|
where T: AsRef<[P]>,
|
||||||
|
P: AsRef<Path>
|
||||||
|
{
|
||||||
|
for path in paths.as_ref().iter() {
|
||||||
|
if self.path == path.as_ref() {
|
||||||
|
self.dispatch(event.to_owned()).await?;
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A hook container for `Oneesan`
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct HookContainer(Arc<RwLock<Vec<Hook>>>);
|
||||||
|
|
||||||
|
impl HookContainer
|
||||||
|
{
|
||||||
|
/// Add a hook to this container. The worker will be pushed the new hook.
|
||||||
|
///
|
||||||
|
/// Dropping the receiver will remove the hook.
|
||||||
|
/// # Note
|
||||||
|
/// This function does not check the path, and may add hooks that are never fired.
|
||||||
|
pub async fn hook<F,P>(&self, path: P, filter: Option<F>) -> mpsc::Receiver<Event>
|
||||||
|
where F: Fn(&Event) -> bool + Send + Sync + 'static,
|
||||||
|
P: AsRef<Path>
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut hooks = self.0.write().await;
|
||||||
|
let (hook, recv) = Hook::new(path,filter);
|
||||||
|
hooks.push(hook);
|
||||||
|
recv
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A watcher context, we hook specific `Context`s here, to be dispatched to on file change
|
/// A watcher context, we hook specific paths here, to be dispatched to on file change
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Oneesan {
|
pub struct Oneesan {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
//TODO: Hooks
|
|
||||||
|
hooks: HookContainer,
|
||||||
|
shutdown: PhantomDrop<oneshot::Sender<()>, fn (oneshot::Sender<()>) -> ()>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for Oneesan
|
||||||
|
{
|
||||||
|
type Target = HookContainer;
|
||||||
|
fn deref(&self) -> &Self::Target
|
||||||
|
{
|
||||||
|
&self.hooks
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Oneesan
|
||||||
|
{
|
||||||
|
/// Add a hook to this container if `path` is inside this instance. The worker will be pushed the new hook if the check passes, if not, will return `None`.
|
||||||
|
///
|
||||||
|
/// Dropping the receiver will remove the hook.
|
||||||
|
pub async fn try_hook<F,P>(&self, path: P, filter: Option<F>) -> Option<mpsc::Receiver<Event>>
|
||||||
|
where F: Fn(&Event) -> bool + Send + Sync + 'static,
|
||||||
|
P: AsRef<Path>
|
||||||
|
{
|
||||||
|
let path = path.as_ref();
|
||||||
|
if self.path.starts_with(&path) {
|
||||||
|
Some(self.hook(path, filter).await)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Join this instance
|
||||||
|
pub fn shutdown(self) -> JoinHandle<()>
|
||||||
|
{
|
||||||
|
self.handle
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start watching this path for changes of files
|
/// Start watching this path for changes of files
|
||||||
pub fn watch(path: PathBuf) -> Oneesan
|
pub fn watch(path: impl AsRef<Path>) -> Oneesan
|
||||||
{
|
{
|
||||||
todo!()
|
let path = path.as_ref().to_owned();
|
||||||
|
let hooks = Arc::new(RwLock::new(Vec::new()));
|
||||||
|
let (shutdown, mut shutdown_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let handle = {
|
||||||
|
let path=path.clone();
|
||||||
|
let hooks = Arc::clone(&hooks);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (tx,mut rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
|
||||||
|
match res {
|
||||||
|
Ok(event) => if let Err(err) = tx.send(event) {warn!("Watcher failed to pass event: {}", err)},
|
||||||
|
Err(e) => error!("Watcher returned error: {}", e),
|
||||||
|
}
|
||||||
|
}).expect("Failed to initialise watcher");
|
||||||
|
|
||||||
|
debug!("Watcher initialised, starting for path {:?}", &path);
|
||||||
|
watcher.watch(path, RecursiveMode::Recursive).expect("Failed to start watcher");
|
||||||
|
|
||||||
|
let work = async {
|
||||||
|
while let Some(event) = rx.recv().await {
|
||||||
|
debug!(" -> {:?}", event);
|
||||||
|
let (paths, event) = deconstruct_event(event);
|
||||||
|
|
||||||
|
let closed: Vec<usize> = { //XXX: Change to generational arena
|
||||||
|
let hooks = hooks.read().await;
|
||||||
|
|
||||||
|
let mut i=0;
|
||||||
|
join_all(hooks.iter().map(|hook: &Hook| hook.try_dispatch_many(&paths[..], &event))).await.into_iter().filter_map(|res| {
|
||||||
|
(match res {
|
||||||
|
Err(SendTimeoutError::Closed(_)) => Some(i), //If channel closed, hook is ded, remove it.
|
||||||
|
_ => None,
|
||||||
|
}, i+=1).0
|
||||||
|
}).collect()
|
||||||
|
};
|
||||||
|
if closed.len() > 0 {
|
||||||
|
let mut hooks = hooks.write().await;
|
||||||
|
for index in closed.into_iter() {
|
||||||
|
debug!("Closing dead hook {}", index);
|
||||||
|
hooks.remove(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tokio::pin!(work);
|
||||||
|
info!("Watcher up");
|
||||||
|
tokio::select!{
|
||||||
|
_ = &mut shutdown_rx => {
|
||||||
|
info!("Received shutdown, closing gracefully");
|
||||||
|
},
|
||||||
|
_ = work => {debug!("Worker stopped on it's own, notify watcher has been dropped.")},
|
||||||
|
};
|
||||||
|
debug!("Watcher going down for shutdown now");
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let output = Oneesan {
|
||||||
|
hooks: HookContainer(hooks),
|
||||||
|
path: std::fs::canonicalize(&path).unwrap_or(path),
|
||||||
|
shutdown: PhantomDrop::new(shutdown, |shutdown| {
|
||||||
|
if !shutdown.is_closed()
|
||||||
|
{
|
||||||
|
shutdown.send(()).expect("mpsc fatal");
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
handle,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
output
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod filter
|
||||||
|
{
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
const fn all(_: &Event) -> bool {true}
|
||||||
|
pub const ALL: Option<fn (&Event) -> bool> = None;
|
||||||
|
}
|
||||||
|
@ -0,0 +1,85 @@
|
|||||||
|
use std::{
|
||||||
|
ops::{
|
||||||
|
Drop,
|
||||||
|
Deref,
|
||||||
|
DerefMut,
|
||||||
|
},
|
||||||
|
mem::replace,
|
||||||
|
fmt,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Allow for running function on drop, to support moving out of a larger structure.
|
||||||
|
pub struct PhantomDrop<T,F>(Option<(T,F)>)
|
||||||
|
where F: FnOnce(T);
|
||||||
|
|
||||||
|
|
||||||
|
impl<T,F> fmt::Debug for PhantomDrop<T,F>
|
||||||
|
where F:FnOnce(T),
|
||||||
|
T: fmt::Debug,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
self.deref().fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T,F> Drop for PhantomDrop<T,F>
|
||||||
|
where F:FnOnce(T)
|
||||||
|
{
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some((value, func)) = replace(&mut self.0, None)
|
||||||
|
{
|
||||||
|
func(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<T,F> PhantomDrop<T,F>
|
||||||
|
where F:FnOnce(T)
|
||||||
|
{
|
||||||
|
/// Create a new `PhantomDrop` with a drop closure and a value.
|
||||||
|
#[cfg(nightly)]
|
||||||
|
pub const fn new(value: T, func: F) -> Self
|
||||||
|
{
|
||||||
|
Self(Some((value,func)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new `PhantomDrop` with a drop closure and a value.
|
||||||
|
#[cfg(not(nightly))]
|
||||||
|
pub fn new(value: T, func: F) -> Self
|
||||||
|
{
|
||||||
|
Self(Some((value,func)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<T,F> Deref for PhantomDrop<T,F>
|
||||||
|
where F:FnOnce(T)
|
||||||
|
{
|
||||||
|
type Target = T;
|
||||||
|
fn deref(&self) -> &Self::Target
|
||||||
|
{
|
||||||
|
if let Some((t, _)) = &self.0
|
||||||
|
{
|
||||||
|
t
|
||||||
|
} else {
|
||||||
|
panic!("Double drop?")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<T,F> DerefMut for PhantomDrop<T,F>
|
||||||
|
where F:FnOnce(T)
|
||||||
|
{
|
||||||
|
fn deref_mut(&mut self) -> &mut <Self as Deref>::Target
|
||||||
|
{
|
||||||
|
if let Some((t, _)) = &mut self.0
|
||||||
|
{
|
||||||
|
t
|
||||||
|
} else {
|
||||||
|
panic!("Double drop?")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in new issue