You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
3.7 KiB
159 lines
3.7 KiB
//! File-watching
|
|
//!
|
|
//! When serving a directory not in oneshot-mode, this can be used to update listings.
|
|
use super::*;
|
|
|
|
use std::{path::PathBuf, time::Duration};
|
|
use std::sync::Arc;
|
|
use std::ops::Deref;
|
|
use notify::{
|
|
Watcher,
|
|
RecursiveMode,
|
|
watcher,
|
|
//TODO: set up wrapper around the notify callback thread that puts events into a async tokio::mpsc (or broadcast?) sender.
|
|
};
|
|
use tokio::sync::{
|
|
broadcast,
|
|
mpsc,
|
|
};
|
|
//use tokio_uring // Don't do this here, have a seperate thread using this (if we end up using it, we probably should since we probably don't need multiple threads reading/writing files at once.)
|
|
|
|
pub trait Receiver<T>
|
|
{
|
|
type Error;
|
|
fn recv(&mut self) -> Result<T, Self::Error>;
|
|
}
|
|
|
|
pub trait Sender<T>
|
|
{
|
|
type Error;
|
|
fn send(&self, val: T) -> Result<(), Self::Error>;
|
|
}
|
|
|
|
pub trait Channel<T>: Sized
|
|
{
|
|
type Sender: Sender<T>;
|
|
type Receiver: Receiver<T>;
|
|
|
|
fn split(self) -> (Self::Sender, Self::Receiver);
|
|
}
|
|
|
|
impl<T, S,R> Channel<T> for (S, R)
|
|
where S: Sender<T>,
|
|
R: Receiver<T>
|
|
{
|
|
type Sender = S;
|
|
type Receiver = R;
|
|
#[inline(always)] fn split(self) -> (Self::Sender, Self::Receiver) {
|
|
self
|
|
}
|
|
}
|
|
|
|
impl<T> Sender<T> for broadcast::Sender<T>
|
|
{
|
|
type Error = broadcast::error::SendError<T>;
|
|
fn send(&self, val: T) -> Result<(), Self::Error> {
|
|
self.send(val)?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl<T> Receiver<T> for broadcast::Receiver<T>
|
|
where T: Clone
|
|
{
|
|
type Error = broadcast::error::TryRecvError;
|
|
fn recv(&mut self) -> Result<T, Self::Error> {
|
|
broadcast::Receiver::try_recv(self)
|
|
}
|
|
}
|
|
|
|
impl<T> Sender<T> for mpsc::Sender<T>
|
|
{
|
|
type Error = mpsc::error::TrySendError<T>;
|
|
fn send(&self, val: T) -> Result<(), Self::Error> {
|
|
self.try_send(val)
|
|
}
|
|
}
|
|
|
|
impl<T> Receiver<T> for mpsc::Receiver<T>
|
|
{
|
|
type Error = mpsc::error::TryRecvError;
|
|
fn recv(&mut self) -> Result<T, Self::Error> {
|
|
self.try_recv()
|
|
}
|
|
}
|
|
|
|
impl<T> Sender<T> for mpsc::UnboundedSender<T>
|
|
{
|
|
type Error = mpsc::error::SendError<T>;
|
|
fn send(&self, val: T) -> Result<(), Self::Error> {
|
|
self.send(val)
|
|
}
|
|
}
|
|
|
|
impl<T> Receiver<T> for mpsc::UnboundedReceiver<T>
|
|
{
|
|
type Error = mpsc::error::TryRecvError;
|
|
fn recv(&mut self) -> Result<T, Self::Error> {
|
|
self.try_recv()
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct WatchEvent(Arc<notify::DebouncedEvent>);
|
|
|
|
impl WatchEvent
|
|
{
|
|
#[inline(always)] pub fn debounced(&self) -> ¬ify::DebouncedEvent
|
|
{
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
impl AsRef<notify::DebouncedEvent> for WatchEvent
|
|
{
|
|
#[inline] fn as_ref(&self) -> ¬ify::DebouncedEvent
|
|
{
|
|
self.debounced()
|
|
}
|
|
}
|
|
|
|
impl Deref for WatchEvent
|
|
{
|
|
type Target = notify::DebouncedEvent;
|
|
#[inline] fn deref(&self) -> &Self::Target {
|
|
self.debounced()
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct Mode
|
|
{
|
|
pub recurse: RecursiveMode,
|
|
pub delay: Duration,
|
|
}
|
|
|
|
/// Start a new watcher thread.
|
|
///
|
|
/// # Returns
|
|
/// * A receiver that gets the events from the watcher
|
|
/// * A future that completes when the thread exits
|
|
pub fn watch<'a, C>(path: PathBuf, mode: Mode, chan: impl FnOnce() -> C + 'a) -> (C::Receiver, impl Future<Output = ()> + Send + Sync + 'static)
|
|
where C: Channel<WatchEvent>,
|
|
C::Sender: Send + 'static,
|
|
{
|
|
let (otx, orx) = chan().split();
|
|
let (stx, trx) = std::sync::mpsc::channel();
|
|
let mut watcher = watcher(stx, mode.delay).unwrap();
|
|
let passing = tokio::spawn(async move {
|
|
match trx.try_recv() {
|
|
Ok(ev) => otx.send(WatchEvent(Arc::new(ev))).map_err(|_| ()).unwrap(),
|
|
Err(_) => (),//tokio::time::sleep(mode.delay).await, FUCK, WHY can't we await here... ALL of this bullshit above with the traits is useless. just return the damn sync `Receiver`.
|
|
}
|
|
});
|
|
{
|
|
use futures::prelude::*;
|
|
(orx, passing.map(|_| ()))
|
|
}
|
|
}
|