From 32e094d593f8a2303155b44418720912b6ad32e4 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 12 Aug 2020 18:32:19 +0100 Subject: [PATCH] added collect_err --- Cargo.lock | 3 +- Cargo.toml | 14 +++++---- TODO | 2 +- src/arg.rs | 1 + src/main.rs | 3 +- src/process.rs | 6 ++-- src/progress.rs | 35 +++++++++------------- src/timeout.rs | 46 ++++++++++++++++++++++++++++ src/work.rs | 79 +++++++++++++++++++++++++++++++++++++------------ 9 files changed, 137 insertions(+), 52 deletions(-) create mode 100644 src/timeout.rs diff --git a/Cargo.lock b/Cargo.lock index 7d56d73..507b17d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,12 +188,13 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "leanify-many" -version = "1.0.2" +version = "1.1.0" dependencies = [ "cfg-if", "futures", "lazy_static", "num_cpus", + "pin-project", "recolored", "rustc_version", "termprogress", diff --git a/Cargo.toml b/Cargo.toml index 9478b21..555a1a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "leanify-many" -version = "1.0.2" +version = "1.1.0" description = "spawn leanify subprocesses" authors = ["Avril "] edition = "2018" @@ -11,22 +11,24 @@ homepage = "https://git.flanchan.moe/flanchan/leanify-many" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["splash", "progress", "colour"] +default = ["splash", "progress", "colour", "collect_err"] -progress = ["termprogress"] +progress = ["termprogress", "pin-project"] threads = ["tokio/rt-threaded"] splash = [] colour = ["recolored"] checked_pass = [] +collect_err = [] [dependencies] lazy_static = "1.4" tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} futures = "0.3" termprogress = {version="0.3", optional=true} -cfg-if = "0.1.10" -recolored = { version = "1.9.3", optional = true } -num_cpus = "1.13.0" +cfg-if = "0.1" +recolored = { version = "1.9", optional = true } +num_cpus = "1.13" +pin-project = {version = "0.4", optional = true} [build-dependencies] rustc_version = "0.2" diff --git a/TODO b/TODO index 4c33494..4bdc524 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,3 @@ -Add `collect_stderr` feature, to not immediately print stderr, but collect it like stdout. +Why is there a lockup sometimes when compiled without `progress` and `collect_err`? Add `--install` mode with feature flag to enable using it diff --git a/src/arg.rs b/src/arg.rs index 5221627..975e860 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -162,6 +162,7 @@ fn comp_flags() check!(on "splash", "Show splash-screen"); check!(on "colour", "Enable coloured output"); check!(on "progress", "Enable progress bar"); + check!(on "collect_err", "Collect the output of children's stderr instead of printing immediately"); check!(off "threads", "Enable threaded scheduler (usually not needed)"); check!(off "checked_pass", "Check the arguments passed with `--passthrough` to leanify. By default they are passed as is"); } diff --git a/src/main.rs b/src/main.rs index 593bb3b..f3e42ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,8 @@ mod fixed_stack; mod process; mod work; -#[cfg(feature="progress")] mod maybe_single; +//mod timeout; +mod maybe_single; #[cfg(feature="progress")] mod task_list; #[cfg(feature="progress")] mod progress; diff --git a/src/process.rs b/src/process.rs index fbb55fe..d648ffe 100644 --- a/src/process.rs +++ b/src/process.rs @@ -63,7 +63,7 @@ where U: IntoIterator, args: process_args } = process.as_ref(); cfg_if!{ - if #[cfg(feature="progress")] { + if #[cfg(any(feature="progress",feature="collect_err"))] { let stderr = std::process::Stdio::piped(); } else { let stderr = std::process::Stdio::inherit(); @@ -84,7 +84,7 @@ where U: IntoIterator, }; let stdout = child.stdout.take().unwrap(); - #[cfg(feature="progress")] let stderr_sender = { + #[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = { let stderr = child.stderr.take().unwrap(); let mut output = output_to.clone(); tokio::spawn(async move { @@ -111,7 +111,7 @@ where U: IntoIterator, Ok(exit) => { if exit.success() { cfg_if!{ - if #[cfg(feature="progress")] { + if #[cfg(any(feature="collect_err",feature="progress"))] { let (o1, o2) = futures::future::join( sender, stderr_sender diff --git a/src/progress.rs b/src/progress.rs index 582a119..4a711b4 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -1,4 +1,5 @@ use super::*; +use pin_project::pin_project; use std::{ iter::FromIterator as _, borrow::{ @@ -93,23 +94,19 @@ fn create_command(kind: CommandKind) -> (Command, oneshot::Receiver<()>) /// /// # Panics /// Awaiting on this multiple times will cause it to panic +#[pin_project] #[derive(Debug)] -pub struct CommandWaiter(Option>); +pub struct CommandWaiter(#[pin] Option>); impl Future for CommandWaiter { type Output = Result<(), Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - if let Some(value) = self.0.take() { - let future = async move { - value.await.map_err(|_| Error::WorkerDropped) - }; - tokio::pin!(future); - future.poll(cx) - } else { - Poll::Ready(Err(Error::WorkerDropped)) + match self.project().0.as_pin_mut() { + Some(x) => x.poll(cx).map_err(|_| Error::WorkerDropped), + None => Poll::Ready(Err(Error::WorkerDropped)), } } } @@ -120,24 +117,20 @@ impl Future for CommandWaiter /// /// # Panics /// Awaiting on this multiple times will cause it to panic +#[pin_project] #[derive(Debug)] -pub struct TaskWaiter(Option>,CommandWaiter); +pub struct TaskWaiter(#[pin] Option>,CommandWaiter); impl Future for TaskWaiter { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let value = self.0.take().unwrap(); - let one = &mut self.1; - let future = async { - let val = value.await.map_err(|_| Error::WorkerDropped)?; - one.await.map_err(|_| Error::WorkerDropped)?; - Ok(val) - }; - tokio::pin!(future); - future.poll(cx) + match self.project().0.as_pin_mut() { + Some(x) => x.poll(cx).map_err(|_| Error::WorkerDropped), + None => Poll::Ready(Err(Error::WorkerDropped)), + } } } diff --git a/src/timeout.rs b/src/timeout.rs new file mode 100644 index 0000000..453ea65 --- /dev/null +++ b/src/timeout.rs @@ -0,0 +1,46 @@ + +#[macro_export] macro_rules! timeout { + ($fut:expr, $dur:expr) => { + { + let dur = $dur; + tokio::select! { + output = $fut => { + Ok(output) + } + _ = tokio::time::delay_for(dur) => { + Err($crate::timeout::TimeoutError::from(dur)) + } + } + } + } +} + + +/// Returned from timeout macro +#[derive(Debug)] +pub struct TimeoutError(tokio::time::Duration); +impl std::error::Error for TimeoutError{} +impl std::fmt::Display for TimeoutError +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result + { + write!(f, "timeout of {} ms reached", self.0.as_millis()) + } +} +impl From for TimeoutError +{ + fn from(from: tokio::time::Duration) -> Self + { + TimeoutError(from) + } +} +impl TimeoutError +{ + /// Get the timeout that this error lapsed on + #[inline] pub fn timeout(&self) -> &tokio::time::Duration + { + &self.0 + } +} + + diff --git a/src/work.rs b/src/work.rs index b918c2f..f3f53a5 100644 --- a/src/work.rs +++ b/src/work.rs @@ -41,9 +41,18 @@ type ProgressSender = progress::ProgressSender; #[cfg(not(feature="progress"))] type ProgressSender = (); +#[cfg(feature="collect_err")] +struct Error +{ + pub internal: process::Error, + pub stack: fixed_stack::FixedStack<(bool, String)>, +} +#[cfg(not(feature="collect_err"))] +type Error = process::Error; + #[allow(unused_mut)] #[allow(unused_variables)] -async fn do_work(process: impl AsRef, file: impl AsRef, mut prog: ProgressSender) -> Result, process::Error> +async fn do_work(process: impl AsRef, file: impl AsRef, mut prog: ProgressSender) -> Result, Error> { let file = file.as_ref(); @@ -56,19 +65,25 @@ async fn do_work(process: impl AsRef, file: impl AsRef, mut prog tokio::spawn(async move { let mut stack = fixed_stack::FixedStack::new(100); while let Some((err, value)) = rx.recv().await { - if err { - cfg_if!{ - if #[cfg(feature="progress")] { - let value = format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); - if let Err(_) = prog.eprintln(&value[..]).await { - eprintln!("\n{}", value); + cfg_if! { + if #[cfg(feature="collect_err")] { + stack.push((err, value)); + } else { + if err { + cfg_if!{ + if #[cfg(feature="progress")] { + let value = format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); + if let Err(_) = prog.eprintln(&value[..]).await { + eprintln!("\n{}", value); + } + } else { + eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); + } } } else { - eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); + stack.push((false, value)); } } - } else { - stack.push(value); } } stack @@ -78,7 +93,18 @@ async fn do_work(process: impl AsRef, file: impl AsRef, mut prog //let _ = opt_await.await; match process::contained_spawn(process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), - Err(error) => Err(error), + Err(error) => { + cfg_if! { + if #[cfg(feature="collect_err")] { + Err(Error{ + internal: error, + stack: collector.await.expect("Child panic"), + }) + } else { + Err(error) + } + } + }, } } @@ -88,7 +114,7 @@ where I: IntoIterator, T: AsRef + Send + Sync + 'static + Clone, U: Into { - let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); + let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); @@ -109,16 +135,24 @@ where I: IntoIterator, cfg_if!{ if #[cfg(feature="progress")] { let mut builder =progress.builder(); - for line in values.into_iter() + for (err, line) in values.into_iter() { - let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line); - builder.println(line); + if err { + builder.eprintln(format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line))); + } else { + let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line); + builder.println(line); + } } let _ = builder.send().await; } else { - for line in values.into_iter() + for (err, line) in values.into_iter() { - println!(" -> ({}) {:?}: {}", i, file.as_ref(), line); + if err { + eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line)); + } else { + println!(" -> ({}) {:?}: {}", i, file.as_ref(), line); + } } } } @@ -152,7 +186,7 @@ where I: IntoIterator, } else { #[cfg(nightly)] type NoReturn = !; #[cfg(not(nightly))] type NoReturn = (); - (Option::::None, do_work(process.as_ref(), &filename, ()).await) + (Option::::None, do_work(&process, &filename, ()).await) } } }; @@ -160,6 +194,13 @@ where I: IntoIterator, match worker.1 { Ok(strings) => tx.send((filename, strings, i)).await.map_err(|_| "Child panic").unwrap(), Err(error) => { + #[cfg(feature="collect_err")] let error = { + let Error{internal, stack} = error; + + tx.send((filename.clone(), stack.into_iter(), i)).await.map_err(|_| "Child panic").unwrap(); + + internal + }; #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await .or_else(|e| { @@ -200,7 +241,7 @@ where I: IntoIterator, { #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); - } + } drop(tx); display.await?;