walking tree

work
Avril 4 years ago
parent cc998b42a4
commit edc40681b9
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -13,34 +13,84 @@ use std::{
Path,
},
};
use tokio::{
fs,
};
use futures::{
prelude::*,
future::BoxFuture,
};
/// Process a single file.
///
/// `path` is known to be a file at this point.
async fn process_single(state: Arc<state::State>, path: &Path) -> eyre::Result<()>
async fn process_single(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::Result<()>
{
let path = path.as_ref();
let _g = state.lock().await;
debug!("{:?} Processing", path);
Ok(())
}
fn walk_dir<'a, T, F, P, Fut>(path: P, mut output: F) -> BoxFuture<'static, eyre::Result<Vec<T>>>
where F: FnMut(fs::DirEntry) -> Fut + Clone + 'static + Send + Sync,
P: AsRef<Path> + Send + Sync + 'static,
T: Send + 'static,
Fut: Future<Output= T> + Send + Sync,
{
async move {
let se_path = || format!("{:?}", path.as_ref()).header("Path was");
let mut dir = fs::read_dir(&path).await
.wrap_err(eyre!("Failed to read directory contents"))
.with_section(se_path)?;
let mut voutput: Vec<T> = util::alloc_stream_hint(&dir);
let mut workers: Vec<_> = util::alloc_stream_hint(&dir);
while let Some(entry) = dir.next_entry().await
.wrap_err(eyre!("Failed to read entry from directory contents"))
.with_section(se_path)?
{
let path = entry.path();
if path.is_dir() {
workers.push(tokio::spawn(walk_dir(path, output.clone())));
} else if path.is_file() {
voutput.push(output(entry).await);
}
}
voutput.extend(future::join_all(workers)
.map(|x| x.into_iter().filter_map(Result::ok).flatten())
.await
.into_iter()
.flatten());
Ok(voutput)
}.boxed()
}
/// Process this path
///
/// This will not return until all its children finish too (if any)
pub async fn process(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::Result<()>
{
let path = path.as_ref();
let se_path = || format!("{:?}", path).header("Path was");
if path.is_dir() {
trace!("{:?} Spawning children", path);
//TODO! Walk dir tree
todo!()
for res in walk_dir(path.to_owned(), move |file| process_single(Arc::clone(&state), file.path())).await?
.into_iter()
.filter_map(Result::err)
{
error!("{:?} Failed to process child: {}", path, res);
return Err(res)
.wrap_err(eyre!("Failed to process child"))
.with_section(se_path);
}
Ok(())
} else if path.is_file() {
process_single(state, path).await
.wrap_err(eyre!("Processing file failed"))
.with_section(|| format!("{:?}", path).header("Path was"))
.with_section(se_path)
} else {
error!("{:?} is not a recognised FS object", path);
return Err(eyre!("Unsupported FS object"))
.with_section(|| format!("{:?}", path).header("Path was"))
.with_section(se_path)
}
}

@ -70,3 +70,54 @@ mod tests
assert_eq!(from, &to[0..37]);
}
}
pub trait NewWithCap: Sized
{
fn new() -> Self;
fn with_capacity(cap: usize) -> Self;
}
impl<T> NewWithCap for Vec<T>
{
#[inline(always)] fn new() -> Self
{
Self::new()
}
#[inline(always)] fn with_capacity(cap: usize) -> Self
{
Self::with_capacity(cap)
}
}
impl NewWithCap for String
{
#[inline(always)] fn new() -> Self
{
Self::new()
}
#[inline(always)] fn with_capacity(cap: usize) -> Self
{
Self::with_capacity(cap)
}
}
/// Allocate with capacity based on iterator `size_hint`
#[inline] pub fn alloc_iterator_hint<T: NewWithCap, I: Iterator+?Sized>(hint: &I) -> T
{
match hint.size_hint() {
(0, None) | (_, Some(0)) => T::new(),
(_, Some(x)) | (x, None) => T::with_capacity(x),
}
}
/// Allocate with capacity based on stream `size_hint`
#[inline] pub fn alloc_stream_hint<T: NewWithCap, I: futures::stream::Stream+?Sized>(hint: &I) -> T
{
match hint.size_hint() {
(0, None) | (_, Some(0)) => T::new(),
(_, Some(x)) | (x, None) => T::with_capacity(x),
}
}

Loading…
Cancel
Save