diff --git a/src/delete.rs b/src/delete.rs index 5fb54dc..6419066 100644 --- a/src/delete.rs +++ b/src/delete.rs @@ -13,34 +13,84 @@ use std::{ Path, }, }; +use tokio::{ + fs, +}; +use futures::{ + prelude::*, + future::BoxFuture, +}; /// Process a single file. /// /// `path` is known to be a file at this point. -async fn process_single(state: Arc, path: &Path) -> eyre::Result<()> +async fn process_single(state: Arc, path: impl AsRef) -> eyre::Result<()> { + let path = path.as_ref(); let _g = state.lock().await; debug!("{:?} Processing", path); Ok(()) } +fn walk_dir<'a, T, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result>> +where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync, + P: AsRef + Send + Sync + 'static, + T: Send + 'static, + Fut: Future + Send + Sync, +{ + async move { + let se_path = || format!("{:?}", path.as_ref()).header("Path was"); + let mut dir = fs::read_dir(&path).await + .wrap_err(eyre!("Failed to read directory contents")) + .with_section(se_path)?; + let mut voutput: Vec = util::alloc_stream_hint(&dir); + let mut workers: Vec<_> = util::alloc_stream_hint(&dir); + while let Some(entry) = dir.next_entry().await + .wrap_err(eyre!("Failed to read entry from directory contents")) + .with_section(se_path)? + { + let path = entry.path(); + if path.is_dir() { + workers.push(tokio::spawn(walk_dir(path, output.clone()))); + } else if path.is_file() { + voutput.push(output(entry).await); + } + } + voutput.extend(future::join_all(workers) + .map(|x| x.into_iter().filter_map(Result::ok).flatten()) + .await + .into_iter() + .flatten()); + Ok(voutput) + }.boxed() +} + /// Process this path /// /// This will not return until all its children finish too (if any) pub async fn process(state: Arc, path: impl AsRef) -> eyre::Result<()> { let path = path.as_ref(); + let se_path = || format!("{:?}", path).header("Path was"); if path.is_dir() { trace!("{:?} Spawning children", path); - //TODO! Walk dir tree - todo!() + for res in walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await? + .into_iter() + .filter_map(Result::err) + { + error!("{:?} Failed to process child: {}", path, res); + return Err(res) + .wrap_err(eyre!("Failed to process child")) + .with_section(se_path); + } + Ok(()) } else if path.is_file() { process_single(state, path).await .wrap_err(eyre!("Processing file failed")) - .with_section(|| format!("{:?}", path).header("Path was")) + .with_section(se_path) } else { error!("{:?} is not a recognised FS object", path); return Err(eyre!("Unsupported FS object")) - .with_section(|| format!("{:?}", path).header("Path was")) + .with_section(se_path) } } diff --git a/src/util.rs b/src/util.rs index 698f2bd..a847e28 100644 --- a/src/util.rs +++ b/src/util.rs @@ -70,3 +70,54 @@ mod tests assert_eq!(from, &to[0..37]); } } + +pub trait NewWithCap: Sized +{ + fn new() -> Self; + fn with_capacity(cap: usize) -> Self; +} + +impl NewWithCap for Vec +{ + #[inline(always)] fn new() -> Self + { + Self::new() + } + + #[inline(always)] fn with_capacity(cap: usize) -> Self + { + Self::with_capacity(cap) + } +} + + +impl NewWithCap for String +{ + #[inline(always)] fn new() -> Self + { + Self::new() + } + + #[inline(always)] fn with_capacity(cap: usize) -> Self + { + Self::with_capacity(cap) + } +} + +/// Allocate with capacity based on iterator `size_hint` +#[inline] pub fn alloc_iterator_hint(hint: &I) -> T +{ + match hint.size_hint() { + (0, None) | (_, Some(0)) => T::new(), + (_, Some(x)) | (x, None) => T::with_capacity(x), + } +} + +/// Allocate with capacity based on stream `size_hint` +#[inline] pub fn alloc_stream_hint(hint: &I) -> T +{ + match hint.size_hint() { + (0, None) | (_, Some(0)) => T::new(), + (_, Some(x)) | (x, None) => T::with_capacity(x), + } +}