diff --git a/src/walk.rs b/src/walk.rs index 38631b9..29a2818 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -16,6 +16,7 @@ use tokio::{ use futures::future::{ Future, OptionFuture, + BoxFuture, }; #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Default)] @@ -27,18 +28,19 @@ pub struct Config pub max_walkers: Option, } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone)] struct UniqueState { // Decrease with each new depth until 0 is hit then do not proceed. recursion_depth: Option, + output_sender: mpsc::Sender, } #[derive(Debug)] struct SharedState { + worker_config: Arc, config: Config, - output_sender: mpsc::Sender, new_worker_semaphore: Option, } @@ -50,15 +52,79 @@ struct State { current: UniqueState, } +impl State +{ + #[inline] + pub fn create_file_info_generator(&self) -> impl Fn(PathBuf, std::fs::Metadata) -> work::FileInfo + '_ + { + work::FileInfo::factory(Arc::clone(&self.shared.worker_config)) //XXX: ANOTHER unneeded Arc clone.. + } + #[inline(always)] + pub fn shared(&self) -> &SharedState + { + &self.shared + } + #[inline(always)] + pub fn unique(&self) -> &UniqueState + { + &self.current + } + + #[inline(always)] + pub fn unique_mut(&mut self) -> &mut UniqueState + { + &mut self.current + } + + /// Create a `State` for a recursed child walk. + #[inline] + pub fn create_child(&self) -> Option + { + let recursion_depth = match self.current.recursion_depth.map(|depth| depth.saturating_sub(1)) { + // Prevent child creation if recursion bound hits 0. + Some(0) => return None, + x => x, + }; + Some(Self { + shared: Arc::clone(&self.shared), + current: UniqueState { recursion_depth, output_sender: self.current.output_sender.clone() }, + }) + } +} + /// Walk a single directory in the current task. Dispatch async results to `walk -async fn walk_directory(state: &State, whence: impl AsRef) -> eyre::Result +async fn walk_directory(state: &State, mut push_child: F, whence: impl AsRef) -> eyre::Result + where F: FnMut(State, PathBuf) { + //use futures::future::FutureExt; + let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?; + let sender = &state.current.output_sender; + let file_info = state.create_file_info_generator(); + while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")? + { + let metadata = match fso.metadata().await { + Ok(metadata) => metadata, + Err(e) => { + error!("Failed to stat {:?}: {}", fso.file_name(), e); + continue; + } + }; + if metadata.is_file() { + if let Err(_) = sender.send(file_info(fso.path(), metadata)).await { + warn!("Worker shut down, stopping iteration"); + break; + } + } else if metadata.is_dir() { + let Some(state) = state.create_child() else { continue }; + push_child(state, fso.path()); + } // Ignore symlinks. + } todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks") } /// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules. /// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily. -async fn walk_inner(state: State, whence: PathBuf) -> impl Future> + 'static +async fn walk_inner(state: State, whence: PathBuf) -> impl Future> + Send + Sync + 'static { // Acquire permit *before* spawning. let _permit = { @@ -85,6 +151,9 @@ async fn walk_inner(state: State, whence: PathBuf) -> impl Future>`s to recursion children // XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found. let mut children = Vec::new(); + let todo_res = walk_directory(&state, |state, whence| { + children.push(async move { walk_inner(state, whence).await }.boxed()); + }, &whence).await; todo!("counted += walk_directory(&state, &whence).await, etc..."); Ok((counted, children)) @@ -100,17 +169,18 @@ async fn walk_inner(state: State, whence: PathBuf) -> impl Future, whereto: mpsc::Sender) -> impl Future> + 'static +pub fn start_walk(cfg: Config, worker: Arc, whence: impl AsRef, whereto: mpsc::Sender) -> impl Future> + 'static { use futures::prelude::*; walk_inner(State { current: UniqueState { recursion_depth: cfg.recursion_depth, + output_sender: whereto, }, shared: Arc::new(SharedState { - output_sender: whereto, new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())), - config: cfg + config: cfg, + worker_config: worker, }), }, whence.as_ref().to_owned()) .flatten()