diff --git a/src/data.rs b/src/data.rs index 524aa6e..88b4fd3 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,13 @@ //! Datatypes for the program use super::*; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{ + RwLock, + mpsc, + watch, +}; +use std::fs::Metadata; /// A raw file or directory inode number #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)] @@ -14,9 +22,127 @@ impl INode Self(ino) } + /// Get `ino` from an `fs::Metadata` object. + #[inline] pub fn new(meta: &Metadata) -> Self + { + use std::os::unix::fs::MetadataExt as _; + Self(meta.ino()) + } + /// Convert into raw `u64` inode number. #[inline] pub const fn into_inner(self) -> u64 { self.0 } } + +/// A valid file system info +/// +/// # Note +/// This does not contains the INode of the fs object itself, that is considered that key to the table. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum FsInfo +{ + File(u64, INode), //Size, parent dir inode + Directory, +} + +#[derive(Debug)] +struct INodeCache +{ + cache: RwLock>, +} + +/// Cache of `INode`s and their associated infos +#[derive(Debug, Clone)] +pub struct Cache(Arc, mpsc::Sender<(INode, FsInfo)>, watch::Receiver); + +impl Cache +{ + /// Lookup this `INode` in the cache. + /// + /// # Usage + /// Should be used to check if directory `INode`s have already been added, if so, they don't need to be walked. + pub async fn get(&self, node: &INode) -> Option + { + self.0.cache.read().await.get(node).map(Clone::clone) + } + + /// Insert an `INode`'s information into the cache. + pub async fn insert(&mut self, node: INode, val: FsInfo) + { + self.1.send((node, val)).await.expect("Failed to send to caching task: Receiver dropped or closed"); + } + + /// Create a new, empty, cache. + pub fn new() -> Self + { + let inner = Arc::new(INodeCache { + cache: RwLock::new(HashMap::new()), + }); + let (tx, rx) = mpsc::channel(24); + let (e_tx, e_rx) = watch::channel(true); + + tokio::spawn({ + let cache = Arc::clone(&inner); + async move { + use futures::prelude::*; + { + let mut rx = rx + .gate_with_timeout(60, duration!(100 ms)) + .lag(duration!(10 ms)); + while let Some(elements) = rx.next().await + { + let mut writer = cache.cache.write().await; + writer.extend(elements.into_iter()); + } + drop(cache); // drop the Arc before broadcasting we're done + } + let _ = e_tx.broadcast(false); + } + }); + + Self(inner, tx, e_rx) + } + + /// Clone the current cache table. + /// + /// # Notes + /// This table may be out of date. Use `try_complete` to wait for a completed version of the table and return that without cloning. + pub async fn clone_current_table(&self) -> HashMap + { + self.0.cache.read().await.clone() + } + + /// Consume into a `Future` that returns the completely filled `HashMap` when background task has finished inserting elements. + /// + /// # Note + /// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`. + pub fn try_complete(self) -> impl futures::Future, Self>> + 'static + { + async move { + if Arc::strong_count(&self.0) > 2 { + return Err(self); // there is another other than the background task holding the shared cache reference + } + let Cache(arc, sender, mut watch) = self; + drop(sender); // drop sender, allowing background task to finish. + while Arc::strong_count(&arc) > 1 && *watch.borrow() { + match watch.recv().await { + Some(false) | None => { + // background task has closed + drop(watch); + break; + }, + Some(true) => continue, + } + } + match Arc::try_unwrap(arc) + { + Ok(inner) => { + Ok(inner.cache.into_inner()) + }, + #[cold] Err(_) => unreachable!("Arc should have only 1 strong reference now. This should never happend"), + } + } + } +} diff --git a/src/ext.rs b/src/ext.rs index 40df186..70de0a3 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -18,6 +18,18 @@ pub mod prelude pub use super::StreamLagExt as _; } +pub trait INodeExt +{ + /// Get the `ino` of this fs object metadata. + fn inode(&self) -> data::INode; +} + +impl INodeExt for std::fs::Metadata +{ + #[inline] fn inode(&self) -> data::INode { + data::INode::new(self) + } +} /// A gated stream that releases every N items from the backing stream. #[pin_project] @@ -116,7 +128,7 @@ where S: Stream impl Stream for GatedStream where S: Stream { - type Item = Box<[S::Item]>; + type Item = Vec;//Box<[S::Item]>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { while self.buffer.len() < self.release_at @@ -162,7 +174,7 @@ where S: Stream impl Stream for TimedGatedStream where S: Stream { - type Item = Box<[S::Item]>; + type Item = Vec;//Box<[S::Item]>; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match this.interval.poll_next_unpin(cx)