diff --git a/src/delete.rs b/src/delete.rs index 73b82cb..99d3622 100644 --- a/src/delete.rs +++ b/src/delete.rs @@ -32,16 +32,24 @@ async fn process_single(state: Arc, path: impl AsRef) -> eyr panic!("process_single() expected a file, but {:?} is not one.", path); } } + let _g = state.lock().await; debug!("{:?} Processing", path); + //TODO: Actual processing + if path.as_os_str().len() > 25 { + panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!"); + return Err(eyre!("Test termination")); + } Ok(()) } -fn walk_dir<'a, T, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result>> -where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync, +fn walk_dir<'a, T, E, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result>> +where F: FnMut(&fs::DirEntry) -> Fut + Clone + 'static + Send + Sync, P: AsRef + Send + Sync + 'static, T: Send + 'static, - Fut: Future + Send + Sync, + E: Send + 'static + Sync, + Fut: Future> + Send + Sync, + eyre::Report: From { #[cfg(debug_assertions)] { let path = path.as_ref(); @@ -57,23 +65,31 @@ where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync, .with_section(se_path)?; let mut voutput: Vec = util::alloc_stream_hint(&dir); let mut workers: Vec<_> = util::alloc_stream_hint(&dir); - while let Some(entry) = dir.next_entry().await - .wrap_err(eyre!("Failed to read entry from directory contents")) - .with_section(se_path)? - { - let path = entry.path(); - if path.is_dir() { - - workers.push(tokio::spawn(walk_dir(path, output.clone()))); - } else if path.is_file() { - voutput.push(output(entry).await); + let work = async { + while let Some(entry) = dir.next_entry().await + .wrap_err(eyre!("Failed to read entry from directory contents")) + .with_section(se_path)? + { + let path = entry.path(); + if path.is_dir() { + let output = output.clone(); + workers.push(tokio::spawn(walk_dir(path.clone(), output))); + } else if path.is_file() { + voutput.push(output(&entry).await + .map_err(eyre::Report::from) + .wrap_err(eyre!("Failed processing file")) + .with_section(move || format!("{:?}", entry.path()).header("Path was"))?); + } } + Ok::<_, eyre::Report>(()) + }; + let resw = work.await; + for res in future::try_join_all(workers) + .await.wrap_err(eyre!("One or more child workers exited abnormally"))?.into_iter() + { + voutput.extend(res?); } - voutput.extend(future::join_all(workers) - .map(|x| x.into_iter() - .filter_map(Result::ok).flatten()) - .await.into_iter() - .flatten()); + resw?; Ok(voutput) }.boxed() } @@ -86,15 +102,10 @@ pub async fn process(state: Arc, path: impl AsRef) -> eyre:: let path = path.as_ref(); let se_path = || format!("{:?}", path).header("Path was"); if path.is_dir() { - for res in walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await? - .into_iter() - .filter_map(Result::err) - { - error!("{:?} Failed to process child: {}", path, res); - return Err(res) - .wrap_err(eyre!("Failed to process child")) - .with_section(se_path); - } + debug!("Processed {} children", walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await + .map(|x| x.len()) + .wrap_err(eyre!("Processing dir failed")) + .with_section(se_path)?); Ok(()) } else if path.is_file() { process_single(state, path).await @@ -102,7 +113,7 @@ pub async fn process(state: Arc, path: impl AsRef) -> eyre:: .with_section(se_path) } else { error!("{:?} is not a recognised FS object", path); - return Err(eyre!("Unsupported FS object")) + Err(eyre!("Unsupported FS object")) .with_section(se_path) } } diff --git a/src/main.rs b/src/main.rs index dd4e91b..2cccd86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,7 +78,7 @@ async fn process(state: Arc, file: String) -> eyre::Result<()> { delete::process(state, &file).await .wrap_err(eyre!("Processing failed")) - .with_section(move || file.header("Path was"))?; + .with_section(move || file.header("Root path was"))?; Ok(()) } @@ -100,7 +100,8 @@ async fn begin() -> eyre::Result info!("Validated config OK"); if args::process(|file| { let state = Arc::clone(&state); - process(state, file) + use futures::future::TryFutureExt; + process(state, file).inspect_err(|err| eprintln!("{:?}", err)) }).await .wrap_err(eyre!("One or more child workers failed to complete successfully"))? .len() == 0