From 4459d58df97d7c391b20d39dffe8b26f738ca74b Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 7 Oct 2020 00:31:15 +0100 Subject: [PATCH] async ok --- Cargo.toml | 5 +++- src/main.rs | 19 +++++++++--- src/parallel.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++-- src/recurse.rs | 33 ++++++++++++++++++++ src/splash.rs | 5 +++- test/test2/hello | 0 6 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 src/recurse.rs create mode 100644 test/test2/hello diff --git a/Cargo.toml b/Cargo.toml index 3a52ace..922f99c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["parallel", "limit-concurrency", "splash"] +default = ["parallel", "limit-concurrency", "splash", "limit-recursion"] # Limit the concurrent operations of parallel mode to 4096 limit-concurrency = ["parallel"] +# Handle directories recursively +recursive = [] +limit-recursion = ["recursive"] splash = [] parallel = ["tokio", "futures"] threads = ["parallel", "tokio/rt-threaded"] diff --git a/src/main.rs b/src/main.rs index 7650469..bbc1ea4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,8 @@ use map::*; mod temp; mod error; mod arg; +#[cfg(feature="recursive")] +mod recurse; cfg_if!{ if #[cfg(feature="splash")] { @@ -57,11 +59,20 @@ fn args_or_out(i: T, low: usize) -> T #[cfg(feature="parallel")] #[cfg_attr(feature="parallel", tokio::main)] async fn main() -> eyre::Result<()> { - reyre!(init(), "Failed to initialise")?; + use futures::{ + stream, + prelude::*, + }; - reyre!(parallel::main(futures::stream::iter(args_or_out(std::env::args(), 2) - .skip(1) - .dedup())).await, + reyre!(init(), "Failed to initialise")?; + reyre!(parallel::main(stream::iter(args_or_out(std::env::args(), 2) + .skip(1) + .dedup()) + .filter_map(|file| { + async move { + Some(parallel::expand_dir(file).await) + } + }).flatten()).await, "Jobs failed") } diff --git a/src/parallel.rs b/src/parallel.rs index 04405b5..66de235 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -12,9 +12,11 @@ use futures::{ Future, OptionFuture, FutureExt, + BoxFuture, join_all, }, stream::{ + self, Stream, StreamExt, }, @@ -22,6 +24,7 @@ use futures::{ use tokio::{ sync::{ Semaphore, + mpsc, }, fs::{ OpenOptions, @@ -39,6 +42,8 @@ cfg_if!{ } } +#[cfg(feature="recursive")] use recurse::{MAX_DEPTH, Recursion}; + fn gensem() -> Option> { trace!("Limiting concurrency to {:?}", MAX_WORKERS); @@ -90,7 +95,7 @@ async fn work>(apath: P, sem: Option>) -> Result<( } async fn join_stream(stream: I) -> impl Iterator::Output> + ExactSizeIterator - where I::Item: Future +where I::Item: Future { //gotta be a better way than heap allocating here, right? stream.then(|x| async move { x.await }).collect::>().await.into_iter() @@ -108,7 +113,6 @@ pub async fn main>(list: I) -> eyre::Result<()> for (i, res) in (0usize..).zip(join_stream(list.map(|file| tokio::spawn(work(file, sem.clone())))) .map(|x| {trace!("--- {} Finished ---", x.len()); x}).await) { - //trace!("Done on {:?}", res); match res { Ok(Ok((path, true))) => info!("<{:?}> OK (processed)", path), Ok(Ok((path, false))) => info!("<{:?}> OK (skipped)", path), @@ -156,3 +160,73 @@ pub async fn main>(list: I) -> eyre::Result<()> Ok(()) } + +#[cfg(feature="recursive")] +fn push_dir<'a>(path: &'a Path, depth: usize, to: mpsc::Sender) -> BoxFuture<'a, tokio::io::Result<()>> +{ + async move { + let mut dir = fs::read_dir(path).await?; + let mut workers = match dir.size_hint() { + (0, Some(0)) | (0, None) => Vec::new(), + (x, None) | (_, Some(x)) => Vec::with_capacity(x), + }; + let can_recurse = match MAX_DEPTH { + Recursion::All => true, + Recursion::N(n) if depth < usize::from(n) => true, + _ => false, + }; + while let Some(item) = dir.next_entry().await? { + let mut to = to.clone(); + workers.push(async move { + match path.join(item.file_name()).into_os_string().into_string() { + Ok(name) => { + if item.file_type().await?.is_dir() { + if can_recurse { + if let Err(e) = push_dir(name.as_ref(), depth+1, to).await { + error!("Walking dir {:?} failed: {}", item.file_name(), e); + } + } + } else { + to.send(name).await.unwrap(); + } + }, + Err(err) => { + error!("Couldn't process file {:?} because it contains invalid UTF-8", err); + }, + } + Ok::<_, std::io::Error>(()) + }); + } + join_all(workers).await; + Ok(()) + }.boxed() +} + +pub async fn expand_dir(p: String) -> impl Stream +{ + cfg_if!{ + if #[cfg(feature="recursive")] { + let (mut tx, rx) = mpsc::channel(16); + tokio::spawn(async move { + let path = Path::new(&p); + if path.is_dir() { + if let Err(err) = push_dir(path, 0, tx).await { + error!("Walking dir {:?} failed: {}", path, err); + } + } else { + tx.send(p).await.unwrap(); + } + }); + rx + } else { + stream::iter(iter::once(p).filter_map(|p| { + if Path::new(&p).is_dir() { + warn!("{:?} is a directory, skipping", p); + None + } else { + Some(p) + } + })) + } + } +} diff --git a/src/recurse.rs b/src/recurse.rs new file mode 100644 index 0000000..daee62e --- /dev/null +++ b/src/recurse.rs @@ -0,0 +1,33 @@ +//! Recursion stuffs +use super::*; +use std::{ + num::NonZeroUsize, + fmt, +}; + +#[derive(Debug)] +pub enum Recursion +{ + All, + N(NonZeroUsize), +} + +cfg_if!{ + if #[cfg(feature="limit-recursion")] { + pub const MAX_DEPTH: Recursion = Recursion::N(unsafe{NonZeroUsize::new_unchecked(256)}); + } else { + pub const MAX_DEPTH: Recursion = Recursion::All; + } +} + +impl fmt::Display for Recursion +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::N(n) => write!(f, "{}", n), + Self::All => write!(f, "unlimited"), + _ => write!(f, "no"), + } + } +} diff --git a/src/splash.rs b/src/splash.rs index 8f02c91..493fdce 100644 --- a/src/splash.rs +++ b/src/splash.rs @@ -39,9 +39,12 @@ For verbose output, set `RUST_LOG` env var to one of the following: Made by {} with <3 (Licensed GPL 3.0 or later)"#, arg::program_name(), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_AUTHORS")); println!("\nEnabled extensions: "); feature!(in nightly, "\tCompiled with Rust nightly extensions"); - println!(); + println!("Features:"); feature!("parallel", "\tWill run up to {} operations in parallel", parallel::MAX_WORKERS.map(|x| Cow::Owned(x.to_string())).unwrap_or(Cow::Borrowed("unlimited"))); feature!("limit-concurrency", "Concurrency is capped"); feature!("threads", "\tUsing thread-pool"); + feature!("recursive", "\tRecursivly process files up to {} directories deep", recurse::MAX_DEPTH); + feature!("limit-recursion", "Concurrency is capped"); + std::process::exit(1) } diff --git a/test/test2/hello b/test/test2/hello new file mode 100644 index 0000000..e69de29