|
|
|
@ -32,16 +32,24 @@ async fn process_single(state: Arc<state::State>, path: impl AsRef<Path>) -> eyr
|
|
|
|
|
panic!("process_single() expected a file, but {:?} is not one.", path);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _g = state.lock().await;
|
|
|
|
|
debug!("{:?} Processing", path);
|
|
|
|
|
//TODO: Actual processing
|
|
|
|
|
if path.as_os_str().len() > 25 {
|
|
|
|
|
panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!");
|
|
|
|
|
return Err(eyre!("Test termination"));
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn walk_dir<'a, T, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result<Vec<T>>>
|
|
|
|
|
where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync,
|
|
|
|
|
fn walk_dir<'a, T, E, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result<Vec<T>>>
|
|
|
|
|
where F: FnMut(&fs::DirEntry) -> Fut + Clone + 'static + Send + Sync,
|
|
|
|
|
P: AsRef<Path> + Send + Sync + 'static,
|
|
|
|
|
T: Send + 'static,
|
|
|
|
|
Fut: Future<Output= T> + Send + Sync,
|
|
|
|
|
E: Send + 'static + Sync,
|
|
|
|
|
Fut: Future<Output= Result<T, E>> + Send + Sync,
|
|
|
|
|
eyre::Report: From<E>
|
|
|
|
|
{
|
|
|
|
|
#[cfg(debug_assertions)] {
|
|
|
|
|
let path = path.as_ref();
|
|
|
|
@ -57,23 +65,31 @@ where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync,
|
|
|
|
|
.with_section(se_path)?;
|
|
|
|
|
let mut voutput: Vec<T> = util::alloc_stream_hint(&dir);
|
|
|
|
|
let mut workers: Vec<_> = util::alloc_stream_hint(&dir);
|
|
|
|
|
while let Some(entry) = dir.next_entry().await
|
|
|
|
|
.wrap_err(eyre!("Failed to read entry from directory contents"))
|
|
|
|
|
.with_section(se_path)?
|
|
|
|
|
{
|
|
|
|
|
let path = entry.path();
|
|
|
|
|
if path.is_dir() {
|
|
|
|
|
|
|
|
|
|
workers.push(tokio::spawn(walk_dir(path, output.clone())));
|
|
|
|
|
} else if path.is_file() {
|
|
|
|
|
voutput.push(output(entry).await);
|
|
|
|
|
let work = async {
|
|
|
|
|
while let Some(entry) = dir.next_entry().await
|
|
|
|
|
.wrap_err(eyre!("Failed to read entry from directory contents"))
|
|
|
|
|
.with_section(se_path)?
|
|
|
|
|
{
|
|
|
|
|
let path = entry.path();
|
|
|
|
|
if path.is_dir() {
|
|
|
|
|
let output = output.clone();
|
|
|
|
|
workers.push(tokio::spawn(walk_dir(path.clone(), output)));
|
|
|
|
|
} else if path.is_file() {
|
|
|
|
|
voutput.push(output(&entry).await
|
|
|
|
|
.map_err(eyre::Report::from)
|
|
|
|
|
.wrap_err(eyre!("Failed processing file"))
|
|
|
|
|
.with_section(move || format!("{:?}", entry.path()).header("Path was"))?);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok::<_, eyre::Report>(())
|
|
|
|
|
};
|
|
|
|
|
let resw = work.await;
|
|
|
|
|
for res in future::try_join_all(workers)
|
|
|
|
|
.await.wrap_err(eyre!("One or more child workers exited abnormally"))?.into_iter()
|
|
|
|
|
{
|
|
|
|
|
voutput.extend(res?);
|
|
|
|
|
}
|
|
|
|
|
voutput.extend(future::join_all(workers)
|
|
|
|
|
.map(|x| x.into_iter()
|
|
|
|
|
.filter_map(Result::ok).flatten())
|
|
|
|
|
.await.into_iter()
|
|
|
|
|
.flatten());
|
|
|
|
|
resw?;
|
|
|
|
|
Ok(voutput)
|
|
|
|
|
}.boxed()
|
|
|
|
|
}
|
|
|
|
@ -86,15 +102,10 @@ pub async fn process(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::
|
|
|
|
|
let path = path.as_ref();
|
|
|
|
|
let se_path = || format!("{:?}", path).header("Path was");
|
|
|
|
|
if path.is_dir() {
|
|
|
|
|
for res in walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await?
|
|
|
|
|
.into_iter()
|
|
|
|
|
.filter_map(Result::err)
|
|
|
|
|
{
|
|
|
|
|
error!("{:?} Failed to process child: {}", path, res);
|
|
|
|
|
return Err(res)
|
|
|
|
|
.wrap_err(eyre!("Failed to process child"))
|
|
|
|
|
.with_section(se_path);
|
|
|
|
|
}
|
|
|
|
|
debug!("Processed {} children", walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await
|
|
|
|
|
.map(|x| x.len())
|
|
|
|
|
.wrap_err(eyre!("Processing dir failed"))
|
|
|
|
|
.with_section(se_path)?);
|
|
|
|
|
Ok(())
|
|
|
|
|
} else if path.is_file() {
|
|
|
|
|
process_single(state, path).await
|
|
|
|
@ -102,7 +113,7 @@ pub async fn process(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::
|
|
|
|
|
.with_section(se_path)
|
|
|
|
|
} else {
|
|
|
|
|
error!("{:?} is not a recognised FS object", path);
|
|
|
|
|
return Err(eyre!("Unsupported FS object"))
|
|
|
|
|
Err(eyre!("Unsupported FS object"))
|
|
|
|
|
.with_section(se_path)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|