From e156e2cc1e13ed40785dc3c3798eb71ee374149d Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 26 Nov 2022 04:13:04 +0000 Subject: [PATCH] Added async path walker (input can come from stdin at any time, or from args immediately.) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Started mod `args` for parsing arguments. Fortune for enumerate-ordered's current commit: Future small blessing − 末小吉 --- Cargo.lock | 28 ++++++++++++++++++ Cargo.toml | 1 + src/args.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 70 +++++++++++++++++++++++++++++++++++++++----- src/walk.rs | 56 +++++++++++++++++------------------ src/work.rs | 2 +- 6 files changed, 204 insertions(+), 37 deletions(-) create mode 100644 src/args.rs diff --git a/Cargo.lock b/Cargo.lock index 207f25c..207b240 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,7 @@ version = "0.1.0" dependencies = [ "color-eyre", "futures", + "jemallocator", "log", "pretty_env_logger", "tokio", @@ -130,6 +131,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "futures" version = "0.3.25" @@ -249,6 +256,27 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +[[package]] +name = "jemalloc-sys" +version = "0.5.2+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134163979b6eed9564c98637b710b40979939ba351f59952708234ea11b5f3f8" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "libc" version = "0.2.137" diff --git a/Cargo.toml b/Cargo.toml index 96fd40b..1fbd5c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] color-eyre = { version = "0.6.2", default-features = false } futures = "0.3.25" +jemallocator = "0.5.0" log = "0.4.17" pretty_env_logger = "0.4.0" tokio = { version = "1.22.0", features = ["full"] } diff --git a/src/args.rs b/src/args.rs new file mode 100644 index 0000000..3c077a5 --- /dev/null +++ b/src/args.rs @@ -0,0 +1,84 @@ +//! Arg parsing +use super::*; +use std::{ + ffi::OsStr, + path::{ + Path, PathBuf, + }, + borrow::Cow, +}; +use tokio::{ + sync::{ + mpsc, + }, +}; +use futures::{ + stream::{self, Stream, BoxStream, StreamExt,}, +}; + +/// Parsed command-line args +#[derive(Debug)] +pub struct Args +{ + paths: Option>, +} + +impl Args +{ + /// Paths as an async stream + /// + /// # Non-immediate + /// When input paths come from `stdin`, the output stream will be non-immediate. + pub fn paths(&self) -> BoxStream<'_, Cow<'_, Path>> + { + if let Some(paths) = self.paths.as_ref() { + stream::iter(paths.iter().map(|x| Cow::Borrowed(Path::new(x)))).boxed() + } else { + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + use tokio::io::{ + self, + AsyncReadExt, AsyncBufReadExt + }; + let mut stdin = { + tokio::io::BufReader::new(io::stdin()) + }; + let mut buf = Vec::with_capacity(1024); + loop + { + buf.clear(); + use std::os::unix::prelude::*; + let n = match stdin.read_until(b'\n', &mut buf).await { + Ok(n) => n, + Err(e) => { + error!("paths: Failed to read input line: {}", e); + break; + }, + }; + if n == 0 { + trace!("paths: Stdin exhausted. Stopping read."); + break; + } + let path_bytes = &buf[..n]; + let path = Path::new(OsStr::from_bytes(path_bytes)); + if path.exists() { + if tx.send(path.to_owned()).await.is_err() { + trace!("paths: Stream dropped, cancelling stdin read."); + break; + } + } + } + }); + tokio_stream::wrappers::ReceiverStream::new(rx).map(|x| Cow::Owned(x)).boxed() + } + } +} + +#[inline] +pub fn parse_args() -> eyre::Result +{ + todo!("parse(std::env::args().skip(1))") +} + +//TODO: fn parse(args: impl IntoIterator) -> eyre::Result + diff --git a/src/main.rs b/src/main.rs index 32ac3e1..6b77640 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,12 @@ use tokio::{ }, }; +use jemallocator::Jemalloc; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +mod args; mod order; mod work; mod walk; @@ -40,6 +46,16 @@ async fn work_on(cfg: work::Config, mut into: mpsc::Receiver) -> Ok(map) } +async fn walk_paths(paths: I, cfg: walk::Config, worker_cfg: &std::sync::Arc, to: mpsc::Sender) -> eyre::Result +where I: futures::stream::Stream, + P: AsRef +{ + use futures::prelude::*; + let children: Vec = + paths.map(|path| futures::stream::once(walk::start_walk(cfg.clone(), std::sync::Arc::clone(worker_cfg), path, to.clone())).boxed_local()).flatten_unordered(None).try_collect().await?; + + Ok(children.into_iter().sum()) +} #[tokio::main] async fn main() -> eyre::Result<()> { @@ -47,18 +63,56 @@ async fn main() -> eyre::Result<()> { let (tx, rx) = mpsc::channel(4096); + //TODO: Read main config from args + + let args = args::parse_args() + .wrap_err("Failed to parse command line args") + .with_suggestion(|| "Try `--help`")?; + + let worker_cfg = { + //TODO: Read worker config from main config + std::sync::Arc::new(work::Config::default()) + }; + let walker_cfg = { + //TODO: Read walker config from main config + walk::Config::default() + }; + // Spin up ordering task. - let ordering = tokio::spawn(async move { - trace!("spun up ordering backing thread"); - work_on(Default::default(), rx).await //TODO: Parse config from args - }); + let ordering = { + let cfg = (*worker_cfg).clone(); + tokio::spawn(async move { + trace!("spun up ordering backing thread"); + work_on(cfg, rx).await //TODO: Parse config from args + }) + }; trace!("Starting recursive walk of input locations"); //TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx` + let walk = walk_paths(args.paths(), walker_cfg, &worker_cfg, tx); + tokio::pin!(walk); - - let set = ordering.await.wrap_err("Ordering task panic")? - .wrap_err(eyre!("Failed to collect ordered files"))?; + let set = async move { + ordering.await.wrap_err("Ordering task panic")? + .wrap_err(eyre!("Failed to collect ordered files")) + }; + tokio::pin!(set); + + /*let set = tokio::select! { + n = walk => { + let n =n.wrap_err("Walker failed")?; + info!("Walked {} files", n); +}, + res = set => { + let set = res.wrap_err("Ordering task failed before walk completed")?; + return Err(eyre!("Ordering task exited before walker task")); +} +};*/ + let (walk, set) = { + let (w, s) = tokio::join!(walk, set); + (w?, s?) + }; + info!("Walked {} files", walk); tokio::task::spawn_blocking(move || -> eyre::Result<()> { use std::io::Write; @@ -67,7 +121,7 @@ async fn main() -> eyre::Result<()> { let lock = std::io::stdout().lock(); std::io::BufWriter::new(lock) }; - + trace!("Writing ordered results to stdout... (buffered)"); for info in set.into_iter() { stdout.write_all(info.path().as_os_str().as_bytes()) diff --git a/src/walk.rs b/src/walk.rs index 29a2818..94933c1 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -94,7 +94,7 @@ impl State /// Walk a single directory in the current task. Dispatch async results to `walk async fn walk_directory(state: &State, mut push_child: F, whence: impl AsRef) -> eyre::Result - where F: FnMut(State, PathBuf) +where F: FnMut(State, PathBuf) { //use futures::future::FutureExt; let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?; @@ -124,44 +124,44 @@ async fn walk_directory(state: &State, mut push_child: F, whence: impl AsRef< /// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules. /// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily. -async fn walk_inner(state: State, whence: PathBuf) -> impl Future> + Send + Sync + 'static +async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result { - // Acquire permit *before* spawning. - let _permit = { - OptionFuture::from(state.shared.new_worker_semaphore.as_ref().map(|sem| sem.acquire())).map(|opt| match opt { - Some(Err(e)) => { - #[cold] - #[inline(never)] - fn _panic_permit(e: impl std::fmt::Display) -> ! { - panic!("Failed to attempt to acquire walker permit: {}", e) - } - _panic_permit(e) - }, - Some(Ok(p)) => Some(p), - None => None, - }) - }.await; + use futures::prelude::*; - async move { + //async move + { let backing_res = tokio::spawn(async move { - // Move permit into task once acquires. - let _permit = _permit; - // Number of *files* sent to tx from this iteration. - let mut counted = 0usize; + + let _permit = match &state.shared.new_worker_semaphore { + Some(sem) => Some(sem.acquire().await.expect("Failed to acquire permit")), + None => None, + }; // `JoinHandle>`s to recursion children // XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found. let mut children = Vec::new(); - let todo_res = walk_directory(&state, |state, whence| { - children.push(async move { walk_inner(state, whence).await }.boxed()); - }, &whence).await; + // Number of *files* sent to tx from this iteration. + let counted = walk_directory(&state, |state, whence| { + fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result> + { + walk_inner(state, whence).boxed() + } + children.push(walk_inner2(state, whence)); + }, &whence).await?; - todo!("counted += walk_directory(&state, &whence).await, etc..."); Ok((counted, children)) }).await.expect("Panic in backing walker thread"); + trace!("Spawning and collecting child workers"); match backing_res { Ok((counted, children)) => { - Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().sum::()) + use futures::prelude::*; + Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().filter_map(|res| match res { + Ok(n) => Some(n), + Err(e) => { + error!("Child failed to walk: {}", e); + None + }, + }).sum::()) }, Err(e) => Err(e), } @@ -183,5 +183,5 @@ pub fn start_walk(cfg: Config, worker: Arc, whence: impl AsRef