From d158a3d3478b47bf22b5c1cd5b6d9b5fbb5bbd3f Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 1 Nov 2020 18:31:57 +0000 Subject: [PATCH] fixed walking dir --- src/delete.rs | 90 ++++++++++++++------------------------------------- 1 file changed, 24 insertions(+), 66 deletions(-) diff --git a/src/delete.rs b/src/delete.rs index 99d3622..e2e891c 100644 --- a/src/delete.rs +++ b/src/delete.rs @@ -15,6 +15,7 @@ use std::{ }; use tokio::{ fs, + task::JoinHandle, }; use futures::{ prelude::*, @@ -36,84 +37,41 @@ async fn process_single(state: Arc, path: impl AsRef) -> eyr let _g = state.lock().await; debug!("{:?} Processing", path); //TODO: Actual processing - if path.as_os_str().len() > 25 { - panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!"); + if path.as_os_str().len() > 100 { + //panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!"); return Err(eyre!("Test termination")); } Ok(()) } -fn walk_dir<'a, T, E, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result>> -where F: FnMut(&fs::DirEntry) -> Fut + Clone + 'static + Send + Sync, - P: AsRef + Send + Sync + 'static, - T: Send + 'static, - E: Send + 'static + Sync, - Fut: Future> + Send + Sync, - eyre::Report: From -{ - #[cfg(debug_assertions)] { - let path = path.as_ref(); - if !path.is_dir() { - panic!("walk_dir() expected a directory, but {:?} is not one.", path); - } - } - trace!("{:?} Spawning children", path.as_ref()); - async move { - let se_path = || format!("{:?}", path.as_ref()).header("Path was"); - let mut dir = fs::read_dir(&path).await - .wrap_err(eyre!("Failed to read directory contents")) - .with_section(se_path)?; - let mut voutput: Vec = util::alloc_stream_hint(&dir); - let mut workers: Vec<_> = util::alloc_stream_hint(&dir); - let work = async { - while let Some(entry) = dir.next_entry().await - .wrap_err(eyre!("Failed to read entry from directory contents")) - .with_section(se_path)? - { - let path = entry.path(); - if path.is_dir() { - let output = output.clone(); - workers.push(tokio::spawn(walk_dir(path.clone(), output))); - } else if path.is_file() { - voutput.push(output(&entry).await - .map_err(eyre::Report::from) - .wrap_err(eyre!("Failed processing file")) - .with_section(move || format!("{:?}", entry.path()).header("Path was"))?); - } - } - Ok::<_, eyre::Report>(()) - }; - let resw = work.await; - for res in future::try_join_all(workers) - .await.wrap_err(eyre!("One or more child workers exited abnormally"))?.into_iter() - { - voutput.extend(res?); - } - resw?; - Ok(voutput) - }.boxed() -} - /// Process this path /// /// This will not return until all its children finish too (if any) -pub async fn process(state: Arc, path: impl AsRef) -> eyre::Result<()> +pub async fn process<'a, P>(state: Arc, path: P) -> eyre::Result<()> +where P: 'a + Send + AsRef { let path = path.as_ref(); - let se_path = || format!("{:?}", path).header("Path was"); if path.is_dir() { - debug!("Processed {} children", walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await - .map(|x| x.len()) - .wrap_err(eyre!("Processing dir failed")) - .with_section(se_path)?); - Ok(()) + let read = fs::read_dir(path).await?; + fn proc_dir(state: Arc, mut read: fs::ReadDir) -> BoxFuture<'static, JoinHandle>> + { + async move { + tokio::spawn(async move { + while let Some(entry) = read.next_entry().await? + { + process(Arc::clone(&state), entry.path()).await?; + } + Ok::<_, eyre::Report>(()) + }) + }.boxed() + } + let handle = proc_dir(state, read).await; + let res = handle.await + .wrap_err(eyre!("Child exited abnormally"))?; + res.wrap_err(eyre!("Failed to process children")) } else if path.is_file() { process_single(state, path).await - .wrap_err(eyre!("Processing file failed")) - .with_section(se_path) } else { - error!("{:?} is not a recognised FS object", path); - Err(eyre!("Unsupported FS object")) - .with_section(se_path) - } + Err(eyre!("Invalid/unsupported FSO")) + }.with_section(|| format!("{:?}", path).header("Path was")) }