use super::*; use std::path::PathBuf; use std::collections::HashMap; use std::io; use futures::prelude::*; use futures::future::join_all; use futures::future::BoxFuture; use tokio::task::JoinHandle; use tokio::fs; use data::INode; use data::FsInfo; use state::State; use data::INodeInfoGraph; use config::Config; async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result { let meta = entry.metadata().await?; if meta.is_dir() { Ok(FsInfo::Directory(parent)) } else if meta.is_file() { Ok(FsInfo::File(meta.len(), parent)) } else { Err(io::Error::new(io::ErrorKind::Other, "Unknown file type")) } } /// Walk on all paths in this state, then return a joined map of all /// /// # Panics /// If there are any more held references to `state`. pub async fn work_on_all(state: State) -> (INodeInfoGraph, Config) { let comp_children = join_all(state.config().paths.iter().map(|path| { let path = path.clone(); async { match tokio::fs::symlink_metadata(&path).await { Ok(meta) => { let inode = meta.inode(); tokio::spawn(walk(state.clone(), path, inode)).await .ok() }, Err(err) => { cfg_eprintln!(state.config(), "Failed to stat root {:?}: {}", path, err); None }, } } })).await; // All children have completed here. Unwrap cache let (ino_map, cfg) = { let (cache, cfg) = state.try_into_cache().unwrap(); (cache.try_complete().await.unwrap(), cfg) }; let mut output = HashMap::with_capacity(ino_map.len()); for path_comp in comp_children { if let Some(res) = path_comp { output.extend(res); } } (INodeInfoGraph::new( ino_map, output, ), cfg) } /// Walk this directory. /// /// # Returns /// A *unjoined* map of relative paths and `INode`s inserted into the state's `Cache`. /// The caller must join its `root` with these paths to provide a normalized map. /// Recusrive calls to `walk` handle this automatically. fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, HashMap> { let mut output = HashMap::new(); let mut children: Vec>> = Vec::new(); async move { { let _guard = state.lock().enter().await; cfg_println!(state.config(), " -> {:?}", root); //TODO: Flag to disable this? match fs::read_dir(&root).await { Ok(mut dir) => { while let Some(entry) = dir.next().await { match entry { Ok(entry) => { let ino = entry.inode(); // Check cache for this if state.cache().get(&ino).await.is_none() { // Not added, process. match process_entry(&entry, root_ino).await { Ok(fsinfo) => { let path = entry.path(); if fsinfo.is_dir() { if let Some(next) = state.deeper() { children.push(tokio::spawn( walk(next, path.clone(), ino) )); } } output.insert(path, ino); let mut cache = state.cache_sub(); cache.insert(ino, fsinfo).await; }, Err(err) => cfg_eprintln!(state.config(), "Failed to stat {:?}: {}", entry.path(), err), } } }, Err(err) => cfg_eprintln!(state.config(), "Walking {:?} failed: {}", root, err), } } }, Err(err) => cfg_eprintln!(state.config(), "Failed to walk {:?}: {}", root, err), } // drop work guard here } // Join all children for child in join_all(children.into_iter()).await { if let Ok(map) = child { output.extend(map); } else { cfg_eprintln!(state.config(), "Child panic"); } } output }.boxed() }