From cfe11a191605bf24f83dbe382b45ff0909537968 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 26 Nov 2022 05:08:47 +0000 Subject: [PATCH] Basic perf testing. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for enumerate-ordered's current commit: Future blessing − 末吉 --- .gitignore | 2 ++ Cargo.toml | 6 ++++++ src/args.rs | 12 +++++++++++- src/main.rs | 7 ++++--- src/walk.rs | 29 +++++++++++++++++++++-------- 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..3353926 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +flamegraph.* +perf.* diff --git a/Cargo.toml b/Cargo.toml index 1fbd5c2..bb9d83a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[profile.release] +opt-level = 3 +lto = "fat" +codegen-units = 1 +strip = false + [dependencies] color-eyre = { version = "0.6.2", default-features = false } futures = "0.3.25" diff --git a/src/args.rs b/src/args.rs index 3c077a5..d868583 100644 --- a/src/args.rs +++ b/src/args.rs @@ -55,12 +55,21 @@ impl Args break; }, }; + trace!("paths: buffer: {:?}", &buf[..]); if n == 0 { - trace!("paths: Stdin exhausted. Stopping read."); + trace!("paths: Stdin exhausted. Exiting."); break; } let path_bytes = &buf[..n]; + let path_bytes = if path_bytes.len() == 1 { + trace!("paths: Ignoring empty line. Yielding then continuing."); + tokio::task::yield_now().await; + continue; + } else { + &path_bytes[.. (path_bytes.len()-1)] + }; let path = Path::new(OsStr::from_bytes(path_bytes)); + trace!("Read path {:?}", path); if path.exists() { if tx.send(path.to_owned()).await.is_err() { trace!("paths: Stream dropped, cancelling stdin read."); @@ -77,6 +86,7 @@ impl Args #[inline] pub fn parse_args() -> eyre::Result { + //return Ok(Args { paths: None }); todo!("parse(std::env::args().skip(1))") } diff --git a/src/main.rs b/src/main.rs index 6b77640..a41e211 100644 --- a/src/main.rs +++ b/src/main.rs @@ -113,7 +113,8 @@ async fn main() -> eyre::Result<()> { (w?, s?) }; info!("Walked {} files", walk); - + + // Write the output in a blocking task - There's not much benefit from async here. tokio::task::spawn_blocking(move || -> eyre::Result<()> { use std::io::Write; use std::os::unix::prelude::*; @@ -121,7 +122,7 @@ async fn main() -> eyre::Result<()> { let lock = std::io::stdout().lock(); std::io::BufWriter::new(lock) }; - trace!("Writing ordered results to stdout... (buffered)"); + trace!("Writing ordered results to stdout... (buffered, sync)"); for info in set.into_iter() { stdout.write_all(info.path().as_os_str().as_bytes()) @@ -132,7 +133,7 @@ async fn main() -> eyre::Result<()> { stdout.flush().wrap_err("Failed to flush buffered output to stdout")?; Ok(()) - }).await.wrap_err("Writer task panic")? + }).await.wrap_err("Writer (blocking) task panic")? .wrap_err("Failed to write results to stdout")?; trace!("Finished output task"); diff --git a/src/walk.rs b/src/walk.rs index 94933c1..9feb0ee 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -100,6 +100,7 @@ where F: FnMut(State, PathBuf) let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?; let sender = &state.current.output_sender; let file_info = state.create_file_info_generator(); + let mut n = 0; while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")? { let metadata = match fso.metadata().await { @@ -113,13 +114,16 @@ where F: FnMut(State, PathBuf) if let Err(_) = sender.send(file_info(fso.path(), metadata)).await { warn!("Worker shut down, stopping iteration"); break; + } else { + n += 1; } } else if metadata.is_dir() { let Some(state) = state.create_child() else { continue }; push_child(state, fso.path()); } // Ignore symlinks. } - todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks") + Ok(n) + //todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks") } /// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules. @@ -140,13 +144,22 @@ async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result // XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found. let mut children = Vec::new(); // Number of *files* sent to tx from this iteration. - let counted = walk_directory(&state, |state, whence| { - fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result> - { - walk_inner(state, whence).boxed() + + let counted = if whence.is_dir() { + walk_directory(&state, |state, whence| { + fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result> + { + walk_inner(state, whence).boxed() + } + children.push(walk_inner2(state, whence)); + }, &whence).await? + } else { + let metadata = tokio::fs::metadata(&whence).await.wrap_err("Failed to stat top-level file")?; + if let Err(_) = state.current.output_sender.send(work::FileInfo::new(Arc::clone(&state.shared.worker_config), whence, metadata)).await { + return Err(eyre!("Failed to send top-level file to backing sorter: Sorter closed.")); } - children.push(walk_inner2(state, whence)); - }, &whence).await?; + 1 + }; Ok((counted, children)) }).await.expect("Panic in backing walker thread"); @@ -183,5 +196,5 @@ pub fn start_walk(cfg: Config, worker: Arc, whence: impl AsRef