//! Directory walking use super::*; use std::num::NonZeroUsize; use std::{ sync::Arc, path::{ Path, PathBuf, }, }; use tokio::{ sync::{ RwLock, Semaphore, }, }; use futures::future::{ Future, OptionFuture, BoxFuture, }; lazy_static! { pub(super) static ref NUM_CPUS: usize = num_cpus::get(); } /// Default number of max walkers allowed to work at once on the thread pool. /// See `Config`. #[inline] pub fn default_max_walkers() -> Option { NonZeroUsize::new(*NUM_CPUS) } #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Config { // None: unlimited. 0: no recursion pub recursion_depth: Option, // None: Unlimited tasks. pub max_walkers: Option, } impl Default for Config { #[inline] fn default() -> Self { Self { recursion_depth: Some(0), max_walkers: default_max_walkers(), } } } #[derive(Debug, Clone)] struct UniqueState { // Decrease with each new depth until 0 is hit then do not proceed. recursion_depth: Option, output_sender: mpsc::Sender, } #[derive(Debug)] struct SharedState { worker_config: Arc, config: Config, new_worker_semaphore: Option, } #[derive(Debug, Clone)] struct State { /// Shared state for all iterations shared: Arc, //TODO: XXX: *another* Arc slowdown... /// State for current iteration current: UniqueState, } impl State { #[inline] pub fn create_file_info_generator(&self) -> impl Fn(PathBuf, std::fs::Metadata) -> work::FileInfo + '_ { work::FileInfo::factory(Arc::clone(&self.shared.worker_config)) //XXX: ANOTHER unneeded Arc clone.. } #[inline(always)] pub fn shared(&self) -> &SharedState { &self.shared } #[inline(always)] pub fn unique(&self) -> &UniqueState { &self.current } #[inline(always)] pub fn unique_mut(&mut self) -> &mut UniqueState { &mut self.current } /// Create a `State` for a recursed child walk. #[inline] pub fn create_child(&self) -> Option { let recursion_depth = match self.current.recursion_depth.map(|depth| depth.saturating_sub(1)) { // Prevent child creation if recursion bound hits 0. Some(0) => return None, x => x, }; Some(Self { shared: Arc::clone(&self.shared), current: UniqueState { recursion_depth, output_sender: self.current.output_sender.clone() }, }) } } /// Walk a single directory in the current task. Dispatch async results to `walk async fn walk_directory(state: &State, mut push_child: F, whence: impl AsRef) -> eyre::Result where F: FnMut(State, PathBuf) { //use futures::future::FutureExt; let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?; let sender = &state.current.output_sender; let file_info = state.create_file_info_generator(); let mut n = 0; while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")? { let metadata = match fso.metadata().await { Ok(metadata) => metadata, Err(e) => { error!("Failed to stat {:?}: {}", fso.file_name(), e); continue; } }; if metadata.is_file() { if let Err(_) = sender.send(file_info(fso.path(), metadata)).await { warn!("Worker shut down, stopping iteration"); break; } else { n += 1; } } else if metadata.is_dir() { let Some(state) = state.create_child() else { continue }; push_child(state, fso.path()); } // Ignore symlinks. } Ok(n) //todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks") } /// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules. /// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily. async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result { use futures::prelude::*; //async move { let backing_res = tokio::spawn(async move { let _permit = match &state.shared.new_worker_semaphore { Some(sem) => Some(sem.acquire().await.expect("Failed to acquire permit")), None => None, }; // `JoinHandle>`s to recursion children // XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found. let mut children = Vec::new(); // Number of *files* sent to tx from this iteration. let counted = if whence.is_dir() { walk_directory(&state, |state, whence| { fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result> { walk_inner(state, whence).boxed() } children.push(walk_inner2(state, whence)); }, &whence).await? } else { let metadata = tokio::fs::metadata(&whence).await.wrap_err("Failed to stat top-level file")?; if let Err(_) = state.current.output_sender.send(work::FileInfo::new(Arc::clone(&state.shared.worker_config), whence, metadata)).await { return Err(eyre!("Failed to send top-level file to backing sorter: Sorter closed.")); } 1 }; Ok((counted, children)) }).await.expect("Panic in backing walker thread"); trace!("Spawning and collecting child workers"); match backing_res { Ok((counted, children)) => { use futures::prelude::*; Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().filter_map(|res| match res { Ok(n) => Some(n), Err(e) => { error!("Child failed to walk: {}", e); None }, }).sum::()) }, Err(e) => Err(e), } } } #[inline] pub fn start_walk(cfg: Config, worker: Arc, whence: impl AsRef, whereto: mpsc::Sender) -> impl Future> + 'static { use futures::prelude::*; walk_inner(State { current: UniqueState { recursion_depth: cfg.recursion_depth, output_sender: whereto, }, shared: Arc::new(SharedState { new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())), config: cfg, worker_config: worker, }), }, whence.as_ref().to_owned()) //.flatten() }