From 5592bc5d1b10a26a2acc72b2dd7948e9ed1ece7f Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 9 Feb 2021 18:58:20 +0000 Subject: [PATCH] walking --- src/config.rs | 41 ++++++++++++ src/data/mod.rs | 16 +++++ src/ext.rs | 9 +++ src/main.rs | 3 + src/state.rs | 76 ++++++++++++++++++++++ src/work.rs | 166 ++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 311 insertions(+) create mode 100644 src/config.rs create mode 100644 src/state.rs create mode 100644 src/work.rs diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..8050da0 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,41 @@ +use std::path::PathBuf; +use std::num::NonZeroUsize; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Recursion +{ + None, + Limited(usize), + Unlimited, +} + +impl Default for Recursion +{ + #[inline] + fn default() -> Self + { + Self::Unlimited + } +} + +impl Recursion +{ + /// Can we run at this depth? + pub fn can_run(&self, depth: usize) -> bool + { + match self { + Self::None => depth == 1, + Self::Limited(limit) => depth <= *limit, + Self::Unlimited => true + } + } +} + +/// Configuration for this run +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct Config +{ + pub paths: Vec, + pub recursive: Recursion, + pub max_tasks: Option, +} diff --git a/src/data/mod.rs b/src/data/mod.rs index 8585b43..301cc6c 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -71,6 +71,22 @@ pub enum FsInfo Directory, } +impl FsInfo +{ + /// Is this entry a directory + #[inline] pub fn is_dir(&self) -> bool + { + if let Self::Directory = self + { + true + } + else { + false + } + } +} + + #[cfg(test)] mod tests { diff --git a/src/ext.rs b/src/ext.rs index e9d0387..4ecdd30 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -32,6 +32,15 @@ impl INodeExt for std::fs::Metadata } } +impl INodeExt for tokio::fs::DirEntry +{ + #[inline] fn inode(&self) -> data::INode { + use std::os::unix::fs::DirEntryExt as _; + + unsafe { data::INode::new_unchecked(self.ino()) } + } +} + /// A gated stream that releases every N items from the backing stream. #[pin_project] #[derive(Debug)] diff --git a/src/main.rs b/src/main.rs index 947b354..2c9100d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,9 @@ pub use ext::prelude::*; mod data; +mod config; +mod state; +mod work; #[tokio::main] async fn main() { diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..bc802e7 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,76 @@ +use super::*; +use std::sync::Arc; + +use data::Cache; +use config::Config; + +/// Program state +#[derive(Debug, Clone)] +pub struct State +{ + config: Arc, + cache: Cache, + depth: usize, +} + +impl State +{ + /// Create a new state + pub fn new(cfg: Config) -> Self + { + Self { + config: Arc::new(cfg), + cache: Cache::new(), + depth: 1, + } + } + + /// Current depth of this tree + pub fn depth(&self) -> usize + { + self.depth + } + + /// Clone into increased depth, if config allows a deeper run. + pub fn deeper(&self) -> Option + { + if self.config.recursive.can_run(self.depth+1) { + Some(Self{ + depth: self.depth + 1, + ..self.clone() + }) + } else { + None + } + } + + /// The configuration for this run + pub fn config(&self) -> &Config + { + &self.config + } + + /// A reference to the cache of this run + pub fn cache(&self) -> &Cache + { + &self.cache + } + + /// Subscribe to this state's cache + pub fn cache_sub(&self) -> Cache + { + self.cache.clone() + } + + /// Try to consume the state into the cache. + /// + /// Fails if there are other references of this state alive. + pub fn try_into_cache(self) -> Result + { + match Arc::try_unwrap(self.config) + { + Ok(_) => Ok(self.cache), + Err(config) => Err(Self{config, ..self}), + } + } +} diff --git a/src/work.rs b/src/work.rs new file mode 100644 index 0000000..a40ae43 --- /dev/null +++ b/src/work.rs @@ -0,0 +1,166 @@ +use super::*; +use std::path::{Path, PathBuf}; +use std::collections::HashMap; +use std::io; + +use futures::prelude::*; +use futures::future::join_all; +use futures::future::BoxFuture; + +use tokio::task::JoinHandle; +use tokio::fs; + +use data::INode; +use data::FsInfo; +use state::State; + +/// Join a root path onto this hashmap. +fn join_root<'a>(root: impl AsRef + 'a, map: HashMap) -> impl Iterator + 'a +{ + map.into_iter().map(move |(k, v)| (root.as_ref().join(k), v)) +} + +async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result +{ + let meta = entry.metadata().await?; + if meta.is_dir() + { + Ok(FsInfo::Directory) + } else if meta.is_file() + { + Ok(FsInfo::File(meta.len(), parent)) + } else + { + Err(io::Error::new(io::ErrorKind::Other, "Unknown file type")) + } +} + +/// Contains a graph of all paths and inodes that were successfully stat +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct INodeInfoGraph +{ + inodes: HashMap, // FsInfo `file` contains parent INode that can be used to look up again in this table + paths: HashMap, // map absolute paths to INodes to be looked up in `inodes` table. +} + +impl INodeInfoGraph +{ + //TODO: Order by largest size +} + +/// Walk on all paths in this state, then return a joined map of all +/// +/// # Panics +/// If there are any more held references to `state`. +pub async fn work_on_all(state: State) -> INodeInfoGraph +{ + let comp_children = join_all(state.config().paths.iter().map(|path| { + let path = path.clone(); + async { + match tokio::fs::symlink_metadata(&path).await { + Ok(meta) => { + let inode = meta.inode(); + tokio::spawn(walk(state.clone(), path.clone(), inode)).await + .ok() + .map(move |res| (res, path)) + }, + Err(err) => { + eprintln!("Failed to stat root {:?}: {}", path, err); + None + }, + } + } + })).await; + + // All children have completed here. Unwrap cache + let ino_map = { + let cache = state.try_into_cache().unwrap(); + cache.try_complete().await.unwrap() + }; + let mut output = HashMap::with_capacity(ino_map.len()); + + for path_comp in comp_children + { + if let Some((res, root)) = path_comp + { + for (path, ino) in join_root(&root, res) + { + if let Some(_) = ino_map.get(&ino) { + output.insert(path, ino); + } else { + eprintln!("No ino entry for {:?} ({:?})", path, ino); + } + } + } + } + + INodeInfoGraph { + inodes: ino_map, + paths: output, + } +} + +/// Walk this directory. +/// +/// # Returns +/// A *unjoined* map of relative paths and `INode`s inserted into the state's `Cache`. +/// The caller must join its `root` with these paths to provide a normalized map. +/// Recusrive calls to `walk` handle this automatically. +fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, HashMap> +{ + let mut output = HashMap::new(); + let mut children: Vec>> = Vec::new(); + async move { + match fs::read_dir(&root).await + { + Ok(mut dir) => { + while let Some(entry) = dir.next().await + { + match entry + { + Ok(entry) => { + let ino = entry.inode(); + // Check cache for this + if state.cache().get(&ino).await.is_none() { + // Not added, process. + match process_entry(&entry, root_ino).await { + Ok(fsinfo) => { + if fsinfo.is_dir() + { + if let Some(next) = state.deeper() + { + children.push(tokio::spawn( + walk(next, entry.path(), ino) + )); + } + } + let mut cache = state.cache_sub(); + cache.insert(ino, fsinfo).await; + }, + Err(err) => eprintln!("Failed to stat {:?}: {}", entry.path(), err), + } + } + }, + Err(err) => eprintln!("Walking {:?} failed: {}", root, err), + } + } + }, + Err(err) => eprintln!("Failed to walk {:?}: {}", root, err), + } + + // Join all children + for child in join_all(children.into_iter()).await + { + if let Ok(map) = child + { + output.extend(join_root(&root, map)); + } else { + eprintln!("Child panic"); + } + } + + output + }.boxed() +} + +