You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dirstat/src/work.rs

150 lines
3.6 KiB

use super::*;
use std::path::PathBuf;
use std::collections::HashMap;
use std::io;
use futures::prelude::*;
use futures::future::join_all;
use futures::future::BoxFuture;
use tokio::task::JoinHandle;
use tokio::fs;
use data::INode;
use data::FsInfo;
use state::State;
use data::INodeInfoGraph;
use config::Config;
async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result<FsInfo>
{
let meta = entry.metadata().await?;
if meta.is_dir()
{
Ok(FsInfo::Directory(parent))
} else if meta.is_file()
{
Ok(FsInfo::File(meta.len(), parent))
} else
{
Err(io::Error::new(io::ErrorKind::Other, "Unknown file type"))
}
}
/// Walk on all paths in this state, then return a joined map of all
///
/// # Panics
/// If there are any more held references to `state`.
pub async fn work_on_all(state: State) -> (INodeInfoGraph, Config)
{
let comp_children = join_all(state.config().paths.iter().map(|path| {
let path = path.clone();
async {
match tokio::fs::symlink_metadata(&path).await {
Ok(meta) => {
let inode = meta.inode();
tokio::spawn(walk(state.clone(), path, inode)).await
.ok()
},
Err(err) => {
cfg_eprintln!(state.config(), "Failed to stat root {:?}: {}", path, err);
None
},
}
}
})).await;
// All children have completed here. Unwrap cache
let (ino_map, cfg) = {
let (cache, cfg) = state.try_into_cache().unwrap();
(cache.try_complete().await.unwrap(), cfg)
};
let mut output = HashMap::with_capacity(ino_map.len());
for path_comp in comp_children
{
if let Some(res) = path_comp
{
output.extend(res);
}
}
(INodeInfoGraph::new(
ino_map,
output,
), cfg)
}
/// Walk this directory.
///
/// # Returns
/// A *unjoined* map of relative paths and `INode`s inserted into the state's `Cache`.
/// The caller must join its `root` with these paths to provide a normalized map.
/// Recusrive calls to `walk` handle this automatically.
fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, HashMap<PathBuf, INode>>
{
let mut output = HashMap::new();
let mut children: Vec<JoinHandle<HashMap<PathBuf, INode>>> = Vec::new();
async move {
{
let _guard = state.lock().enter().await;
cfg_println!(state.config(), " -> {:?}", root); //TODO: Flag to disable this?
match fs::read_dir(&root).await
{
Ok(mut dir) => {
while let Some(entry) = dir.next().await
{
match entry
{
Ok(entry) => {
let ino = entry.inode();
// Check cache for this
if state.cache().get(&ino).await.is_none() {
// Not added, process.
match process_entry(&entry, root_ino).await {
Ok(fsinfo) => {
let path = entry.path();
if fsinfo.is_dir()
{
if let Some(next) = state.deeper()
{
children.push(tokio::spawn(
walk(next, path.clone(), ino)
));
}
}
output.insert(path, ino);
let mut cache = state.cache_sub();
cache.insert(ino, fsinfo).await;
},
Err(err) => cfg_eprintln!(state.config(), "Failed to stat {:?}: {}", entry.path(), err),
}
}
},
Err(err) => cfg_eprintln!(state.config(), "Walking {:?} failed: {}", root, err),
}
}
},
Err(err) => cfg_eprintln!(state.config(), "Failed to walk {:?}: {}", root, err),
}
// drop work guard here
}
// Join all children
for child in join_all(children.into_iter()).await
{
if let Ok(map) = child
{
output.extend(map);
} else {
cfg_eprintln!(state.config(), "Child panic");
}
}
output
}.boxed()
}