|
|
|
@ -15,6 +15,7 @@ use std::{
|
|
|
|
|
};
|
|
|
|
|
use tokio::{
|
|
|
|
|
fs,
|
|
|
|
|
task::JoinHandle,
|
|
|
|
|
};
|
|
|
|
|
use futures::{
|
|
|
|
|
prelude::*,
|
|
|
|
@ -36,84 +37,41 @@ async fn process_single(state: Arc<state::State>, path: impl AsRef<Path>) -> eyr
|
|
|
|
|
let _g = state.lock().await;
|
|
|
|
|
debug!("{:?} Processing", path);
|
|
|
|
|
//TODO: Actual processing
|
|
|
|
|
if path.as_os_str().len() > 25 {
|
|
|
|
|
panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!");
|
|
|
|
|
if path.as_os_str().len() > 100 {
|
|
|
|
|
//panic!("FUCK WHY ISN'T THIS SHIT RAN!??!??!?!??!??!");
|
|
|
|
|
return Err(eyre!("Test termination"));
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn walk_dir<'a, T, E, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result<Vec<T>>>
|
|
|
|
|
where F: FnMut(&fs::DirEntry) -> Fut + Clone + 'static + Send + Sync,
|
|
|
|
|
P: AsRef<Path> + Send + Sync + 'static,
|
|
|
|
|
T: Send + 'static,
|
|
|
|
|
E: Send + 'static + Sync,
|
|
|
|
|
Fut: Future<Output= Result<T, E>> + Send + Sync,
|
|
|
|
|
eyre::Report: From<E>
|
|
|
|
|
{
|
|
|
|
|
#[cfg(debug_assertions)] {
|
|
|
|
|
let path = path.as_ref();
|
|
|
|
|
if !path.is_dir() {
|
|
|
|
|
panic!("walk_dir() expected a directory, but {:?} is not one.", path);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
trace!("{:?} Spawning children", path.as_ref());
|
|
|
|
|
async move {
|
|
|
|
|
let se_path = || format!("{:?}", path.as_ref()).header("Path was");
|
|
|
|
|
let mut dir = fs::read_dir(&path).await
|
|
|
|
|
.wrap_err(eyre!("Failed to read directory contents"))
|
|
|
|
|
.with_section(se_path)?;
|
|
|
|
|
let mut voutput: Vec<T> = util::alloc_stream_hint(&dir);
|
|
|
|
|
let mut workers: Vec<_> = util::alloc_stream_hint(&dir);
|
|
|
|
|
let work = async {
|
|
|
|
|
while let Some(entry) = dir.next_entry().await
|
|
|
|
|
.wrap_err(eyre!("Failed to read entry from directory contents"))
|
|
|
|
|
.with_section(se_path)?
|
|
|
|
|
{
|
|
|
|
|
let path = entry.path();
|
|
|
|
|
if path.is_dir() {
|
|
|
|
|
let output = output.clone();
|
|
|
|
|
workers.push(tokio::spawn(walk_dir(path.clone(), output)));
|
|
|
|
|
} else if path.is_file() {
|
|
|
|
|
voutput.push(output(&entry).await
|
|
|
|
|
.map_err(eyre::Report::from)
|
|
|
|
|
.wrap_err(eyre!("Failed processing file"))
|
|
|
|
|
.with_section(move || format!("{:?}", entry.path()).header("Path was"))?);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok::<_, eyre::Report>(())
|
|
|
|
|
};
|
|
|
|
|
let resw = work.await;
|
|
|
|
|
for res in future::try_join_all(workers)
|
|
|
|
|
.await.wrap_err(eyre!("One or more child workers exited abnormally"))?.into_iter()
|
|
|
|
|
{
|
|
|
|
|
voutput.extend(res?);
|
|
|
|
|
}
|
|
|
|
|
resw?;
|
|
|
|
|
Ok(voutput)
|
|
|
|
|
}.boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Process this path
|
|
|
|
|
///
|
|
|
|
|
/// This will not return until all its children finish too (if any)
|
|
|
|
|
pub async fn process(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::Result<()>
|
|
|
|
|
pub async fn process<'a, P>(state: Arc<state::State>, path: P) -> eyre::Result<()>
|
|
|
|
|
where P: 'a + Send + AsRef<Path>
|
|
|
|
|
{
|
|
|
|
|
let path = path.as_ref();
|
|
|
|
|
let se_path = || format!("{:?}", path).header("Path was");
|
|
|
|
|
if path.is_dir() {
|
|
|
|
|
debug!("Processed {} children", walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await
|
|
|
|
|
.map(|x| x.len())
|
|
|
|
|
.wrap_err(eyre!("Processing dir failed"))
|
|
|
|
|
.with_section(se_path)?);
|
|
|
|
|
Ok(())
|
|
|
|
|
let read = fs::read_dir(path).await?;
|
|
|
|
|
fn proc_dir(state: Arc<state::State>, mut read: fs::ReadDir) -> BoxFuture<'static, JoinHandle<Result<(), eyre::Report>>>
|
|
|
|
|
{
|
|
|
|
|
async move {
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
while let Some(entry) = read.next_entry().await?
|
|
|
|
|
{
|
|
|
|
|
process(Arc::clone(&state), entry.path()).await?;
|
|
|
|
|
}
|
|
|
|
|
Ok::<_, eyre::Report>(())
|
|
|
|
|
})
|
|
|
|
|
}.boxed()
|
|
|
|
|
}
|
|
|
|
|
let handle = proc_dir(state, read).await;
|
|
|
|
|
let res = handle.await
|
|
|
|
|
.wrap_err(eyre!("Child exited abnormally"))?;
|
|
|
|
|
res.wrap_err(eyre!("Failed to process children"))
|
|
|
|
|
} else if path.is_file() {
|
|
|
|
|
process_single(state, path).await
|
|
|
|
|
.wrap_err(eyre!("Processing file failed"))
|
|
|
|
|
.with_section(se_path)
|
|
|
|
|
} else {
|
|
|
|
|
error!("{:?} is not a recognised FS object", path);
|
|
|
|
|
Err(eyre!("Unsupported FS object"))
|
|
|
|
|
.with_section(se_path)
|
|
|
|
|
}
|
|
|
|
|
Err(eyre!("Invalid/unsupported FSO"))
|
|
|
|
|
}.with_section(|| format!("{:?}", path).header("Path was"))
|
|
|
|
|
}
|
|
|
|
|