redo-gragh
Avril 4 years ago
parent 6a105ea154
commit 5592bc5d1b
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -0,0 +1,41 @@
use std::path::PathBuf;
use std::num::NonZeroUsize;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Recursion
{
None,
Limited(usize),
Unlimited,
}
impl Default for Recursion
{
#[inline]
fn default() -> Self
{
Self::Unlimited
}
}
impl Recursion
{
/// Can we run at this depth?
pub fn can_run(&self, depth: usize) -> bool
{
match self {
Self::None => depth == 1,
Self::Limited(limit) => depth <= *limit,
Self::Unlimited => true
}
}
}
/// Configuration for this run
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Config
{
pub paths: Vec<PathBuf>,
pub recursive: Recursion,
pub max_tasks: Option<NonZeroUsize>,
}

@ -71,6 +71,22 @@ pub enum FsInfo
Directory, Directory,
} }
impl FsInfo
{
/// Is this entry a directory
#[inline] pub fn is_dir(&self) -> bool
{
if let Self::Directory = self
{
true
}
else {
false
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests mod tests
{ {

@ -32,6 +32,15 @@ impl INodeExt for std::fs::Metadata
} }
} }
impl INodeExt for tokio::fs::DirEntry
{
#[inline] fn inode(&self) -> data::INode {
use std::os::unix::fs::DirEntryExt as _;
unsafe { data::INode::new_unchecked(self.ino()) }
}
}
/// A gated stream that releases every N items from the backing stream. /// A gated stream that releases every N items from the backing stream.
#[pin_project] #[pin_project]
#[derive(Debug)] #[derive(Debug)]

@ -7,6 +7,9 @@
pub use ext::prelude::*; pub use ext::prelude::*;
mod data; mod data;
mod config;
mod state;
mod work;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {

@ -0,0 +1,76 @@
use super::*;
use std::sync::Arc;
use data::Cache;
use config::Config;
/// Program state
#[derive(Debug, Clone)]
pub struct State
{
config: Arc<Config>,
cache: Cache,
depth: usize,
}
impl State
{
/// Create a new state
pub fn new(cfg: Config) -> Self
{
Self {
config: Arc::new(cfg),
cache: Cache::new(),
depth: 1,
}
}
/// Current depth of this tree
pub fn depth(&self) -> usize
{
self.depth
}
/// Clone into increased depth, if config allows a deeper run.
pub fn deeper(&self) -> Option<Self>
{
if self.config.recursive.can_run(self.depth+1) {
Some(Self{
depth: self.depth + 1,
..self.clone()
})
} else {
None
}
}
/// The configuration for this run
pub fn config(&self) -> &Config
{
&self.config
}
/// A reference to the cache of this run
pub fn cache(&self) -> &Cache
{
&self.cache
}
/// Subscribe to this state's cache
pub fn cache_sub(&self) -> Cache
{
self.cache.clone()
}
/// Try to consume the state into the cache.
///
/// Fails if there are other references of this state alive.
pub fn try_into_cache(self) -> Result<Cache, Self>
{
match Arc::try_unwrap(self.config)
{
Ok(_) => Ok(self.cache),
Err(config) => Err(Self{config, ..self}),
}
}
}

@ -0,0 +1,166 @@
use super::*;
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::io;
use futures::prelude::*;
use futures::future::join_all;
use futures::future::BoxFuture;
use tokio::task::JoinHandle;
use tokio::fs;
use data::INode;
use data::FsInfo;
use state::State;
/// Join a root path onto this hashmap.
fn join_root<'a>(root: impl AsRef<Path> + 'a, map: HashMap<PathBuf, INode>) -> impl Iterator<Item=(PathBuf, INode)> + 'a
{
map.into_iter().map(move |(k, v)| (root.as_ref().join(k), v))
}
async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result<FsInfo>
{
let meta = entry.metadata().await?;
if meta.is_dir()
{
Ok(FsInfo::Directory)
} else if meta.is_file()
{
Ok(FsInfo::File(meta.len(), parent))
} else
{
Err(io::Error::new(io::ErrorKind::Other, "Unknown file type"))
}
}
/// Contains a graph of all paths and inodes that were successfully stat
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct INodeInfoGraph
{
inodes: HashMap<INode, FsInfo>, // FsInfo `file` contains parent INode that can be used to look up again in this table
paths: HashMap<PathBuf, INode>, // map absolute paths to INodes to be looked up in `inodes` table.
}
impl INodeInfoGraph
{
//TODO: Order by largest size
}
/// Walk on all paths in this state, then return a joined map of all
///
/// # Panics
/// If there are any more held references to `state`.
pub async fn work_on_all(state: State) -> INodeInfoGraph
{
let comp_children = join_all(state.config().paths.iter().map(|path| {
let path = path.clone();
async {
match tokio::fs::symlink_metadata(&path).await {
Ok(meta) => {
let inode = meta.inode();
tokio::spawn(walk(state.clone(), path.clone(), inode)).await
.ok()
.map(move |res| (res, path))
},
Err(err) => {
eprintln!("Failed to stat root {:?}: {}", path, err);
None
},
}
}
})).await;
// All children have completed here. Unwrap cache
let ino_map = {
let cache = state.try_into_cache().unwrap();
cache.try_complete().await.unwrap()
};
let mut output = HashMap::with_capacity(ino_map.len());
for path_comp in comp_children
{
if let Some((res, root)) = path_comp
{
for (path, ino) in join_root(&root, res)
{
if let Some(_) = ino_map.get(&ino) {
output.insert(path, ino);
} else {
eprintln!("No ino entry for {:?} ({:?})", path, ino);
}
}
}
}
INodeInfoGraph {
inodes: ino_map,
paths: output,
}
}
/// Walk this directory.
///
/// # Returns
/// A *unjoined* map of relative paths and `INode`s inserted into the state's `Cache`.
/// The caller must join its `root` with these paths to provide a normalized map.
/// Recusrive calls to `walk` handle this automatically.
fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, HashMap<PathBuf, INode>>
{
let mut output = HashMap::new();
let mut children: Vec<JoinHandle<HashMap<PathBuf, INode>>> = Vec::new();
async move {
match fs::read_dir(&root).await
{
Ok(mut dir) => {
while let Some(entry) = dir.next().await
{
match entry
{
Ok(entry) => {
let ino = entry.inode();
// Check cache for this
if state.cache().get(&ino).await.is_none() {
// Not added, process.
match process_entry(&entry, root_ino).await {
Ok(fsinfo) => {
if fsinfo.is_dir()
{
if let Some(next) = state.deeper()
{
children.push(tokio::spawn(
walk(next, entry.path(), ino)
));
}
}
let mut cache = state.cache_sub();
cache.insert(ino, fsinfo).await;
},
Err(err) => eprintln!("Failed to stat {:?}: {}", entry.path(), err),
}
}
},
Err(err) => eprintln!("Walking {:?} failed: {}", root, err),
}
}
},
Err(err) => eprintln!("Failed to walk {:?}: {}", root, err),
}
// Join all children
for child in join_all(children.into_iter()).await
{
if let Ok(map) = child
{
output.extend(join_root(&root, map));
} else {
eprintln!("Child panic");
}
}
output
}.boxed()
}
Loading…
Cancel
Save