//! Datatypes for the program use super::*; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{ RwLock, mpsc, watch, }; use std::fs::Metadata; /// A raw file or directory inode number /// /// Ususally created from the `.inode()` extension method on `fs::Metadata` found in prelude. /// Can also be created with `new()` from a `fs::Metadata` reference, or created unsafely from an arbitrary `u64` with `new_unchecked`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)] #[repr(transparent)] pub struct INode(u64); impl INode { /// Create a new `INode` wrapper from any `u64` without checking if it is a real inode. #[inline] pub const unsafe fn new_unchecked(ino: u64) -> Self { Self(ino) } /// Get `ino` from an `fs::Metadata` object. #[inline] pub fn new(meta: &Metadata) -> Self { use std::os::unix::fs::MetadataExt as _; Self(meta.ino()) } /// Convert into raw `u64` inode number. #[inline] pub const fn into_inner(self) -> u64 { self.0 } } impl<'a> From<&'a Metadata> for INode { #[inline] fn from(from: &'a Metadata) -> Self { from.inode() } } impl From for u64 { #[inline] fn from(from: INode) -> Self { from.0 } } /// A valid file system info /// /// # Note /// This does not contains the INode of the fs object itself, that is considered that key to the table. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum FsInfo { File(u64, INode), //Size, parent dir inode Directory, } #[derive(Debug)] struct INodeCache { cache: RwLock>, } /// Cache of `INode`s and their associated infos #[derive(Debug, Clone)] pub struct Cache(Arc, mpsc::Sender<(INode, FsInfo)>, watch::Receiver); impl Cache { /// Lookup this `INode` in the cache. /// /// # Usage /// Should be used to check if directory `INode`s have already been added, if so, they don't need to be walked. pub async fn get(&self, node: &INode) -> Option { self.0.cache.read().await.get(node).map(Clone::clone) } /// Insert an `INode`'s information into the cache. pub async fn insert(&mut self, node: INode, val: FsInfo) { self.1.send((node, val)).await.expect("Failed to send to caching task: Receiver dropped or closed"); } /// Create a new, empty, cache. pub fn new() -> Self { let inner = Arc::new(INodeCache { cache: RwLock::new(HashMap::new()), }); let (tx, rx) = mpsc::channel(24); let (e_tx, e_rx) = watch::channel(true); tokio::spawn({ let cache = Arc::clone(&inner); async move { use futures::prelude::*; { let mut rx = rx .gate_with_timeout(60, duration!(100 ms)) .lag(duration!(10 ms)); while let Some(elements) = rx.next().await { let mut writer = cache.cache.write().await; writer.extend(elements.into_iter()); } drop(cache); // drop the Arc before broadcasting we're done } let _ = e_tx.broadcast(false); } }); Self(inner, tx, e_rx) } /// Clone the current cache table. /// /// # Notes /// This table may be out of date. Use `try_complete` to wait for a completed version of the table and return that without cloning. pub async fn clone_current_table(&self) -> HashMap { self.0.cache.read().await.clone() } /// Consume into a `Future` that returns the completely filled `HashMap` when background task has finished inserting elements. /// /// # Note /// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`. pub fn try_complete(self) -> impl futures::Future, Self>> + 'static { async move { if Arc::strong_count(&self.0) > 2 { return Err(self); // there is another other than the background task holding the shared cache reference } let Cache(arc, sender, mut watch) = self; drop(sender); // drop sender, allowing background task to finish. while Arc::strong_count(&arc) > 1 && *watch.borrow() { match watch.recv().await { Some(false) | None => { // background task has closed drop(watch); break; }, Some(true) => continue, } } match Arc::try_unwrap(arc) { Ok(inner) => { Ok(inner.cache.into_inner()) }, #[cold] Err(_) => unreachable!("Arc should have only 1 strong reference now. This should never happend"), } } } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn cache_insert_and_consume() { let mut cache = Cache::new(); for x in 0..500 { cache.insert(unsafe { INode::new_unchecked(x) }, FsInfo::Directory).await; } let output = cache.try_complete().await.unwrap(); assert_eq!(output.len(), 500); for x in 0..500 { assert_eq!(output.get(&unsafe { INode::new_unchecked(x) }).unwrap(), &FsInfo::Directory); } } }