From 8d8f932081be08ba9ca7abd8a6221296259588d2 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 24 Mar 2025 17:38:34 +0000 Subject: [PATCH 1/5] Started impl of feature `progress-reactive`: Addded `.resize_bar()` (`CommandKind::Resize`) to update progress bar max size to fit recalculated terminal width. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Blessing − 吉 --- Cargo.lock | 25 +++++++++++++++---------- Cargo.toml | 10 +++++++--- src/main.rs | 11 +++++++---- src/progress.rs | 38 +++++++++++++++++++++++++++++++++++++- src/work.rs | 10 ++++++++-- 5 files changed, 74 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9114d5..818588a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "arc-swap" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" - [[package]] name = "atty" version = "0.2.14" @@ -184,7 +178,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "leanify-many" -version = "1.2.0+1" +version = "1.2.1+1" dependencies = [ "cfg-if", "futures", @@ -194,6 +188,8 @@ dependencies = [ "pin-project", "recolored", "rustc_version", + "signal-hook", + "terminal_size", "termprogress", "tokio", ] @@ -401,13 +397,22 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" -version = "1.2.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" dependencies = [ - "arc-swap", "libc", ] diff --git a/Cargo.toml b/Cargo.toml index 72d58e0..0603e57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "leanify-many" -version = "1.2.0+1" +version = "1.2.1+1" description = "spawn leanify subprocesses" authors = ["Avril "] edition = "2018" @@ -16,10 +16,10 @@ lto = "fat" codegen-units = 1 [features] -default = ["splash", "progress", "colour", "collect_err", "shutdown"] +default = ["splash", "progress-reactive", "colour", "collect_err", "shutdown"] # Enable progress bar -progress = ["termprogress", "pin-project"] +progress = ["termprogress", "pin-project", "terminal_size"] # Enable threaded scheduler # @@ -48,6 +48,8 @@ collect_err = [] # Without this feature enabled, `leanify` subprocesses will receive terminating `SIGINT`s as normal. shutdown = ["libc"] +# TODO: Implement this to: Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width. (XXX: Use a background thread (outside the thread-pool, as it's blocking) listening on `signal_hooks::Signals.forever()` for this that sends events through a shared Tokio `CondVar` notify_all() call.) +progress-reactive = ["progress", "tokio/signal", "signal-hook"] [dependencies] lazy_static = "1.4" @@ -59,6 +61,8 @@ recolored = { version = "1.9", optional = true } num_cpus = "1.13" pin-project = {version = "0.4", optional = true} libc = { version = "0.2.171", features = ["align"], optional = true } +terminal_size = { version = "^0.1.13", optional = true } +signal-hook = { version = "0.3.17", features = ["iterator"], optional = true } [build-dependencies] rustc_version = "0.2" diff --git a/src/main.rs b/src/main.rs index e09cbd2..093afe8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,9 +31,8 @@ mod maybe_single; #[cfg(feature="progress")] mod task_list; #[cfg(feature="progress")] mod progress; -async fn work() -> Result<(), Box> +async fn work() -> Result, Box> { - //println!("umm {}", colour::style(colour!(Color::Blue), "hiii")); let args = arg::parse_args().await.with_prefix("failed to parse args")?; let leanify = leanify::find_binary().with_prefix("Couldn't find leanify binary")?; @@ -85,13 +84,17 @@ async fn work() -> Result<(), Box> } }, _ => args.max_children, - }, get_shutdown_future(&args.flags).fuse()).await + }, get_shutdown_future(&args.flags).fuse()).await + .map(|_| None) //TODO: Can the shutdown future set an `is_interrupted` var we can use to return a non-zero exit code from a SIGINT graceful shutdown here (e.g. `is_interrupted.then_some(1)`)? Do something like this... } #[tokio::main] async fn main() { - prettify_expect(work().await.map_err(|e| e.to_string()), "exited with error"); + if let Some(code) = prettify_expect(work().await.map_err(|e| e.to_string()), "exited with error") { + // Exiting with error (non-zero) code + std::process::exit(code.get()); + } } #[inline] fn prettify_expect(res: Result, msg: S) -> T diff --git a/src/progress.rs b/src/progress.rs index 55967bc..d88f889 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -19,6 +19,7 @@ use std::{ self, Once, }, + num::NonZeroUsize, }; use tokio::{ sync::{ @@ -68,6 +69,10 @@ pub enum CommandKind Complete, + Resize { + to: Option, + }, + Many(Vec), } @@ -312,6 +317,15 @@ impl ProgressSender self.send_command(CommandKind::BumpLow(by)).await } + /// Resize the whole bar to either a specific size, or to query the terminal size. + /// + /// Currently, the terminal size is grabbed from `stdout` or `stderr` if possible. + /// If `None` is passed and the output of `stdout` *and* `stderr` are not TTYs, no resize will take place. + pub async fn resize_bar(&mut self, to: Option) -> Result + { + self.send_command(CommandKind::Resize { to }).await + } + /// Add a task to the worker's progress bar title line /// /// This function returns a [TaskWaiter]`TaskWaiter` future, upon successful `await`ing will yield the task's ID. @@ -382,7 +396,6 @@ pub fn create_progress { + use std::os::fd::{ + AsFd, + AsRawFd, + }; + + if let Some((w, _)) = terminal_size::terminal_size() + .or_else(|| terminal_size::terminal_size_using_fd(std::io::stderr().as_fd().as_raw_fd())) + { + // Blank the line, and tell to redraw after the dimension update. + progress.blank(); + has_blanked = true; + + progress.update_dimensions(w.0 as usize); + } + }, + CommandKind::Resize { to: Some(to) } => { + // Blank the line, and tell to redraw after the dimension update. + progress.blank(); + has_blanked = true; + + progress.update_dimensions(to.get()); + }, CommandKind::BumpHigh(high) => { let stat = stat.to_mut(); stat.high+=high; diff --git a/src/work.rs b/src/work.rs index 2a9519d..efb0bcd 100644 --- a/src/work.rs +++ b/src/work.rs @@ -143,6 +143,12 @@ where I: IntoIterator, progress::create_progress::(files.len(), iter::empty()) } }; + + #[cfg(feature="progress-reactive")] + let _size_handle = { + let mut progress = progress.clone(); + todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{None} to `progress` when an event comes in."); + }; let display = { #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -191,7 +197,7 @@ where I: IntoIterator, let process = Arc::clone(&process); let mut tx = tx.clone(); - let flags = flags.clone(); + let flags = flags.clone(); // XXX: Can we remove this clone somehow? It's kinda big this structure and we only need to do this because of not being able to exclude 'cli from `+ use<>` in the `async fn do_work()` as it's an `async fn` and not a `fn() -> impl Future`... #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -288,7 +294,7 @@ where I: IntoIterator, Ok(v) => Box::new(v), }; - for failed in results + for failed in results { #[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); From a4f288fd334a19f433cbadc16519f1d82df29c40 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 26 Mar 2025 18:25:53 +0000 Subject: [PATCH 2/5] Started `progress-reactive` internal API. (Re-starting design.) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Blessing − 吉 --- Cargo.toml | 4 +- src/progress.rs | 7 +++ src/progress/reactive.rs | 108 +++++++++++++++++++++++++++++++++++++++ src/work.rs | 2 +- 4 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 src/progress/reactive.rs diff --git a/Cargo.toml b/Cargo.toml index 0603e57..bf3f1db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ codegen-units = 1 default = ["splash", "progress-reactive", "colour", "collect_err", "shutdown"] # Enable progress bar -progress = ["termprogress", "pin-project", "terminal_size"] +progress = ["termprogress", "pin-project"] # Enable threaded scheduler # @@ -49,7 +49,7 @@ collect_err = [] shutdown = ["libc"] # TODO: Implement this to: Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width. (XXX: Use a background thread (outside the thread-pool, as it's blocking) listening on `signal_hooks::Signals.forever()` for this that sends events through a shared Tokio `CondVar` notify_all() call.) -progress-reactive = ["progress", "tokio/signal", "signal-hook"] +progress-reactive = ["progress", "tokio/signal", "signal-hook", "terminal_size"] [dependencies] lazy_static = "1.4" diff --git a/src/progress.rs b/src/progress.rs index d88f889..7eff400 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -43,6 +43,9 @@ use termprogress::{ ProgressBar, }; +#[cfg(feature="progress-reactive")] +mod reactive; + #[derive(Debug)] pub struct Task { @@ -457,12 +460,14 @@ pub fn create_progress { use std::os::fd::{ AsFd, AsRawFd, }; + if let Some((w, _)) = terminal_size::terminal_size() .or_else(|| terminal_size::terminal_size_using_fd(std::io::stderr().as_fd().as_raw_fd())) { @@ -480,6 +485,8 @@ pub fn create_progress unimplemented!("`progress-reactive` feature not enabled! Cannot resize to terminal size."), CommandKind::BumpHigh(high) => { let stat = stat.to_mut(); stat.high+=high; diff --git a/src/progress/reactive.rs b/src/progress/reactive.rs new file mode 100644 index 0000000..fe3396c --- /dev/null +++ b/src/progress/reactive.rs @@ -0,0 +1,108 @@ +//! Implementation of `progress-reactive` signal hooking & passing features +use super::*; + +use std::{ + thread, +}; +use tokio::{ + sync::{ + Notify, + Mutex, + RwLock, + }, +}; +use futures::{ + future::{ + Aborted, Abortable, abortable, AbortHandle, + }, +}; + +/// An async condition variable that holds access to a potential value. +/// +/// Notification is *single* task only, for now there is no `notify_all()` implementation on this. +/// +/// # Ownership and value transference +/// To pass a value through the condvar, `Arc>` can be used to simplify the ownership value-awaiting model to prevent deadlocks caused by dropped possible communicators. +/// Generally however, if the `CondVar` is accessible via shared reference (non-exclusive ownership held) upon a waiting operation, it will assume it is `Sync`-accessible by shared reference so you must ensure that deadlocks cannot be reached when waiting. +/// +/// ## Note on API's potential to create ~asyncronous~ un-completable `Future`s +/// In this context *"deadlock"* is being used to refer to an *un-completable future*, so as long as the future can be cancelled or awaiting can be stopped it's not going to hang up the task obvs; there's no syncronous blocking in the API. +#[derive(Debug)] +pub struct CondVar +{ + /// The condition to wait on and notify to. + cond: Notify, + /// The (possibly empty) variable slot that is read after or written to before. + var: RwLock>>, // XXX: +} + +unsafe impl Sync for CondVar{} + +impl CondVar { + compile_error!("TODO: XXX: Re-do this, we don't want to just re-implement the sync condvar interface because there's no real need to. Hell, do we even need the condvar at all? Can we just use a `Arc` for the purposes of this module \ + (XXX: i.e. \"backing sync thread **notifies** a running async task holding a progress handle to send a `Resize` command when a `SIGWINCH` signal is received on it\" is **literally** what we want this module to do.)"); + + async fn wait_raw_while(&self, mut pred: P, cancel: Fut) -> bool + where P: AsyncFnMut(Option<&T>) -> bool, //TODO: How to impl this with the async mutex? + Fut: Future + { + use futures::FutureExt; + let (cancel, token) = { + let (handle, reg) = AbortHandle::new_pair(); + + (cancel.map(move |_| handle.abort()), reg) + }; + + let waiter = async { + loop { + // Check the predicate + let waiter = { + let value = self.var.read().await; + let waiter = self.cond.notified(); // NOTE: We want to check if there is a notification *after* acquiring (**and** releasing) the read lock has been successful; so that if there was a writer that yielded to us, we get their notification immediately (see below.) (XXX: I don't know if calling this function without polling it here instead of below (after the read lock has been released) changes anything; if it does, move this down below. It's only been defined here instead as a visual guide to the ordering.) + if !pred(value.as_deref()).await { + break false; + } else { + drop(value); // NOTE: That we release the `read` lock *before* checking if the notification comes through, so the notifier can notify us *before* dropping their `write` lock. + } + + waiter + }; + + // Check if we've been notified + // + // TODO: XXX: Would this become a spinning loop if there are no current nofification operations goin on since we're not actually `await`ing on the notification itself at all? Is there any real way to prevent this??? + if let Some(_) = waiter.now_or_never() { + break true; + } + + tokio::task::yield_now().await; // XXX: Do we actually want to yield this here...? + } + }; + + // We have waited for the validation through `pred`, and a notification has arrived to us. + tokio::select! { + cont = waiter => { + cont + } + _ = cancel => { + false + } + } + } + + async fn notify_raw_with(&self, mut setter: P) -> bool + where P: AsyncFnMut(&mut Option>) -> bool, + Fut: Future + { + let mut value = self.var.write().await; + if !setter(&mut value).await { + return false; + } + self.cond.notify(); + + drop(value); // NOTE: We do not release the write lock until *after* we have notified, so a currently blocking reader will immediately get the notification + true + } + //TODO: Implement CondVar's `async wait(pred, tok) -> Option + '_>` & `async notify(value: T)` + //TODO: & also `async wait_owned(Arc, ...)` & `async notify_owned(Arc)` too (XXX: also `&mut self` receiver funcs can *statically* guarantee that there is no other pending waiter/sender and can thus access the value directly; which is *dynamically* guaranteed with the `Arc` receiver funcs.) +} diff --git a/src/work.rs b/src/work.rs index efb0bcd..2f437ba 100644 --- a/src/work.rs +++ b/src/work.rs @@ -147,7 +147,7 @@ where I: IntoIterator, #[cfg(feature="progress-reactive")] let _size_handle = { let mut progress = progress.clone(); - todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{None} to `progress` when an event comes in."); + todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in."); }; let display = { From 9412636a0fc3262e050aaa171e5ae04556e37089 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 26 Mar 2025 20:00:05 +0000 Subject: [PATCH 3/5] `progress-reactive`: Re-worked internal sync-to-async-signal barrier API to the much more simple and complete `reactive::Crosslink{,Sender,Receiver}` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Half curse − 半凶 --- src/progress/reactive.rs | 327 ++++++++++++++++++++++++++++++--------- 1 file changed, 255 insertions(+), 72 deletions(-) diff --git a/src/progress/reactive.rs b/src/progress/reactive.rs index fe3396c..384ece3 100644 --- a/src/progress/reactive.rs +++ b/src/progress/reactive.rs @@ -2,7 +2,12 @@ use super::*; use std::{ + ops, thread, + sync::{ + Arc, + Weak, + }, }; use tokio::{ sync::{ @@ -12,97 +17,275 @@ use tokio::{ }, }; use futures::{ + prelude::*, + stream::{ + self, + Stream, + }, future::{ Aborted, Abortable, abortable, AbortHandle, + + Shared, + WeakShared, + + Remote, RemoteHandle, }, }; -/// An async condition variable that holds access to a potential value. -/// -/// Notification is *single* task only, for now there is no `notify_all()` implementation on this. -/// -/// # Ownership and value transference -/// To pass a value through the condvar, `Arc>` can be used to simplify the ownership value-awaiting model to prevent deadlocks caused by dropped possible communicators. -/// Generally however, if the `CondVar` is accessible via shared reference (non-exclusive ownership held) upon a waiting operation, it will assume it is `Sync`-accessible by shared reference so you must ensure that deadlocks cannot be reached when waiting. -/// -/// ## Note on API's potential to create ~asyncronous~ un-completable `Future`s -/// In this context *"deadlock"* is being used to refer to an *un-completable future*, so as long as the future can be cancelled or awaiting can be stopped it's not going to hang up the task obvs; there's no syncronous blocking in the API. -#[derive(Debug)] -pub struct CondVar +/// Inner type for sending pulse signal from sync backing thread to async task. +#[derive(Debug, Default)] +pub(super) struct Crosslink { - /// The condition to wait on and notify to. - cond: Notify, - /// The (possibly empty) variable slot that is read after or written to before. - var: RwLock>>, // XXX: + pub notification: Notify, } -unsafe impl Sync for CondVar{} +/// Sends pulse signals synchonously to an async +#[derive(Debug, Clone)] +pub struct CrosslinkSender(Weak); -impl CondVar { - compile_error!("TODO: XXX: Re-do this, we don't want to just re-implement the sync condvar interface because there's no real need to. Hell, do we even need the condvar at all? Can we just use a `Arc` for the purposes of this module \ - (XXX: i.e. \"backing sync thread **notifies** a running async task holding a progress handle to send a `Resize` command when a `SIGWINCH` signal is received on it\" is **literally** what we want this module to do.)"); - - async fn wait_raw_while(&self, mut pred: P, cancel: Fut) -> bool - where P: AsyncFnMut(Option<&T>) -> bool, //TODO: How to impl this with the async mutex? - Fut: Future +/// Receives pulse signals in a way that can be `await`ed for. +#[derive(Debug)] // NOTE: Not `Clone`, dealing with multiple receivers is too much of a headache with no `notify_all()`. +pub struct CrosslinkReceiver(Arc); + +impl Crosslink +{ + /// Consumes this owned reference into a `Clone`able future that can be waited on by multiple tasks at once. + #[inline] + pub fn waiter_shared(self: Arc) -> impl Future + Clone + Send + Sync + Unpin + Sized + 'static { - use futures::FutureExt; - let (cancel, token) = { - let (handle, reg) = AbortHandle::new_pair(); - - (cancel.map(move |_| handle.abort()), reg) - }; - - let waiter = async { - loop { - // Check the predicate - let waiter = { - let value = self.var.read().await; - let waiter = self.cond.notified(); // NOTE: We want to check if there is a notification *after* acquiring (**and** releasing) the read lock has been successful; so that if there was a writer that yielded to us, we get their notification immediately (see below.) (XXX: I don't know if calling this function without polling it here instead of below (after the read lock has been released) changes anything; if it does, move this down below. It's only been defined here instead as a visual guide to the ordering.) - if !pred(value.as_deref()).await { - break false; - } else { - drop(value); // NOTE: That we release the `read` lock *before* checking if the notification comes through, so the notifier can notify us *before* dropping their `write` lock. - } - - waiter - }; + async move { + self.notification.notified().await + }.shared() + } - // Check if we've been notified - // - // TODO: XXX: Would this become a spinning loop if there are no current nofification operations goin on since we're not actually `await`ing on the notification itself at all? Is there any real way to prevent this??? - if let Some(_) = waiter.now_or_never() { - break true; - } + /// Create a `Clone`able future that can be waited on by multiple tasks at once, **but** is still lifetime-bound by-ref to the instance. + /// + /// # Outliving the owner + /// If a `'static` lifetime bound is required (e.g. due to spawning on a non-local task-set,) use `waiter_shared()` on `Arc`. + #[inline] + pub fn create_waiter_shared(&self) -> impl Future + Clone + Send + Sync + Unpin + Sized + use<'_> + { + self.notification.notified().shared() + } - tokio::task::yield_now().await; // XXX: Do we actually want to yield this here...? - } - }; + /// Split the link into `(tx, rx)` pair. + /// + /// # Example usage + /**``` + # use std::sync::Arc; + # use leanify_many::progress::reactive::*; + let (tx, rx) = Arc::new(Crosslink::default()).into_split(); + let rx = tokio::spawn!(async move { + let rx = rx.into_stream(); + let mut n = 0usize; + while let Some(_) = rx.next().await { + n+=1; + println!("Received notification {n} time(s)!"); +} + println!("Sender(s) all gone!"); +}); + + // Notify the backing task twice. + tx.notify(); + tx.notify(); - // We have waited for the validation through `pred`, and a notification has arrived to us. - tokio::select! { - cont = waiter => { - cont - } - _ = cancel => { - false + // Drop the sender, and wait for the backing task to exit. + drop(tx); + rx.await.unwrap(); + ```*/ + #[inline] + pub(super) fn into_split(self: Arc) -> (CrosslinkSender, CrosslinkReceiver) + { + let tx = Arc::downgrade(&self); + (CrosslinkSender(tx), CrosslinkReceiver(self)) + } +} + +impl CrosslinkReceiver +{ + fn has_senders(&self) -> bool + { + Arc::weak_count(&self.0) != 0 + } + /// Consume receiver into a `Stream` that yields each notification on `.next()`. + /// + /// The stream ends when it is determined that there can be no more signals sent. + pub fn into_stream(self) -> impl Stream + Send + Sync + 'static + { + stream::unfold(self, move |state| async move { + if state.has_senders() { // If there are more than 0 senders (weak references.) + state.0.notification.notified().await; // Wait for a notification. (XXX: This may not complete if all senders drop *while* it's being waited on.) + } else { + return None; } - } + + // If there are no senders left (i.e. we received a notification from the final sender `Drop`ing) we do not want to yield an element but end the stream. + state.has_senders().then(move || ((), state)) + }) } - async fn notify_raw_with(&self, mut setter: P) -> bool - where P: AsyncFnMut(&mut Option>) -> bool, - Fut: Future + /// Wait for a notification or for there to be no senders left. + /// + /// Note that this *will* complete spuriously if it is the final receiver and the final sender is dropped, however it **also** *may* complete spuriously before that. + /// + /// (This future is cancel-safe.) + #[inline] + #[must_use] + pub async fn try_wait(&self) -> bool { - let mut value = self.var.write().await; - if !setter(&mut value).await { + if self.has_senders() { return false; } - self.cond.notify(); + + self.0.notification.notified().await; + self.has_senders() + } - drop(value); // NOTE: We do not release the write lock until *after* we have notified, so a currently blocking reader will immediately get the notification - true + /// Wait for a notification to be sent. + /// + /// # Panics + /// If a signal is not received before the last sender is dropped. + pub fn wait(&self) -> impl Future + Send + Sync + '_ + { + #[inline(never)] + #[cold] + fn _panic_no_senders() -> ! + { + panic!("no senders left that can signal") + } + self.try_wait().map(|r| if !r { + + }) + } + + /// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders. + /// + /// # Safety + /// This function will return a non-completable future if there are already no senders when it is called. + /// It may be preferable to use `try_wait_unsafe()` instead, (as that returns `ready()` if there are none instead of `pending()`.) + #[inline(always)] + fn wait_unsafe(&self) -> impl Future + Send + Sync + '_ + { + self.0.notification.notified() } - //TODO: Implement CondVar's `async wait(pred, tok) -> Option + '_>` & `async notify(value: T)` - //TODO: & also `async wait_owned(Arc, ...)` & `async notify_owned(Arc)` too (XXX: also `&mut self` receiver funcs can *statically* guarantee that there is no other pending waiter/sender and can thus access the value directly; which is *dynamically* guaranteed with the `Arc` receiver funcs.) + + + /// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders. + /// + /// # Safety + /// This function will return an immediately completed function if there are no senders when it is called. + /// But when the returned future completes it cannot be differentiated between an actual intentional `Sender::notify()` call, or if it's from the final sender being dropped. + #[inline] + fn try_wait_unsafe(&self) -> impl Future + Send + Sync + '_ + { + if ! self.has_senders() { + future::ready(()).left_future() + } else { + self.0.notification.notified().right_future() + } + } +} + +impl ops::Drop for CrosslinkSender +{ + fn drop(&mut self) + { + // This is the last sender, dropping now. + if self.is_last_sender() { + let n = { + // Remove the last sender from the receiver's view. + let this = std::mem::replace(&mut self.0, Weak::new()); + + // So we will tell the receiver to wake up. + let Some(n) = this.upgrade() else { + // If there are any... + return + }; + + // Ensure there are no living senders it can see, before... + drop(this); + n + }; + // ...we wake it up, so it knows to die. + n.notification.notify(); + } + } +} + +impl CrosslinkSender +{ + #[inline(always)] + fn has_receivers(&self) -> bool + { + Weak::strong_count(&self.0) > 0 + } + + #[inline(always)] + pub fn is_last_sender(&self) -> bool + { + Weak::weak_count(&self.0) == 1 + } + + /// If there are receivers that can be notified. + /// + /// # **Non-atomic** operations + /// Note that it is still possible for `notify()` to panic if called after checking this, due to the lack of atomicity. + /// If atomicity is needed, either use `try_map_notify()` (or, if atomicity isn't needed, just ignore the result of `try_notify()` failing instead.) + #[inline] + pub fn can_notify(&self) -> bool + { + self.has_receivers() + } + + /// If there are any receivers, returns a thunk that when called will notify one. + /// + /// # Usage + /// It is **not** intended for the returned function be kept around long, it is entirely possible that by the time the function is invoked, there are no receivers left. + /// The function will attempt to notify, and if there was no receiver to notify, this will be ignored. + /// + /// The intended use is that if there is some work that needs to be done before sending the signal, but that can be skipped if there is no signal to send, the check can be made via a call to this method, and the signal can be sent by calling the returned thunk. + /** ``` + # use std::sync::Arc; + # use leanify_many::progress::reactive::*; + # let (tx, _rx) = Arc::new(Crosslink::default()).into_split(); + if let Some(notify) = tx.try_map_notify() { + /* ...do some work before sending the signal... */ + notify(); } + ```*/ + #[inline] + #[must_use] + fn try_map_notify(&self) -> Option + { + self.0.upgrade().map(|s| move || s.notification.notify()) + } + + /// Send a notification signal if possible. + /// + /// # Return value + /// If there was a receiver to notify. + #[must_use] + pub fn try_notify(&self) -> bool + { + self.0.upgrade().map(|s| s.notification.notify()) + .map(|_| true).unwrap_or(false) + } + + /// Send a notification signal + /// + /// # Panics + /// If there are no receivers to notify (See [try_notify()](try_notify).) + pub fn notify(&self) + { + #[inline(never)] + #[cold] + fn _panic_no_waiters() -> ! + { + panic!("attempted to notify no waiters") + } + if !self.try_notify() { + _panic_no_waiters() + } + } +} + From ad03ce86e81fd891c2d63234cfd77a0f2d64389c Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 27 Mar 2025 14:58:07 +0000 Subject: [PATCH 4/5] Added functional feature `progress-reactive`: A caught `SIGWINCH` signal will now re-draw progress bars. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Future blessing − 末吉 --- src/progress.rs | 2 +- src/progress/reactive.rs | 131 +++++++++++++++++++++++++++++++++++++++ src/work.rs | 45 +++++++++++++- 3 files changed, 175 insertions(+), 3 deletions(-) diff --git a/src/progress.rs b/src/progress.rs index 7eff400..31bb437 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -44,7 +44,7 @@ use termprogress::{ }; #[cfg(feature="progress-reactive")] -mod reactive; +pub mod reactive; #[derive(Debug)] pub struct Task diff --git a/src/progress/reactive.rs b/src/progress/reactive.rs index 384ece3..68fde7a 100644 --- a/src/progress/reactive.rs +++ b/src/progress/reactive.rs @@ -4,6 +4,7 @@ use super::*; use std::{ ops, thread, + io, sync::{ Arc, Weak, @@ -289,3 +290,133 @@ impl CrosslinkSender } } +/// Create a new shared `Crosslink` that can be split into a `(CrosslinkSender, CrosslinkReceiver)` pair. +/// +/// This convenience function is identical to `Arc::new(Crosslink::default())`. +#[inline(always)] +pub(super) fn crosslink_unsplit() -> Arc +{ + Arc::new(Default::default()) +} + +/// Create a new shared `(CrosslinkSender, CrosslinkReceiver)` pair. +/// +/// This convenience function is identical to `Arc::new(Crosslink::default()).into_split()`. +#[inline(always)] +pub(crate) fn crosslink() -> (CrosslinkSender, CrosslinkReceiver) +{ + Arc::new(Crosslink::default()).into_split() +} + +/// Handle to a backing signal-watcher thread (see `spawn_signal_watcher()`.) +#[derive(Debug)] +pub struct Handle +{ + joiner: thread::JoinHandle>, + handle: signal_hook::iterator::Handle, +} + +impl Handle { + /// Check if the backing thread is running. + pub fn is_running(&self) -> bool + { + ! self.joiner.is_finished() + } + /// Attempt to close the handle without waiting for it to respond *at all*. + #[inline] + pub fn signal_close(&self) + { + self.handle.close(); + } + /// Close the handle without joining the backing thread + /// + /// If the backing thread completes without having to `join()`, that result is returned; otherwise, it is ignored. + #[inline] + pub fn close(self) -> io::Result + { + self.handle.close(); + if self.joiner.is_finished() { + return self.joiner.join().unwrap(); + } + Ok(true) + } + /// Close the handle and wait for the backing thread to complete. + #[inline] + pub fn close_sync(self) -> io::Result + { + self.handle.close(); + self.joiner.join().expect("background signal_hook thread panicked!") + } + + /// Check if the signal watcher has been requested to close. + #[inline] + pub fn is_closed(&self) -> bool + { + self.handle.is_closed() + } + + /// Get a handle to the background signal watcher + #[inline(always)] + pub fn signal_handle(&self) -> &signal_hook::iterator::Handle + { + &self.handle + } +} + +fn spawn_signal_watcher_with_callback(signals: I, mut callback: F) -> io::Result<(CrosslinkReceiver, Handle)> +where I: IntoIterator, + S: std::borrow::Borrow, + F: FnMut(std::ffi::c_int, &CrosslinkSender) -> io::Result, +{ + use signal_hook::{ + consts::signal, + iterator::Signals, + }; + + let (tx, rx) = crosslink(); + let mut signals = Signals::new(signals)?; + let handle = signals.handle(); + let joiner = thread::spawn(move || { + let handle = signals.handle(); + let _exit = defer::Defer::new(move || handle.close()); + + for signal in signals.forever() { + match callback(signal, &tx) { + res @ Ok(false) | res @ Err(_) => return res, + _ => (), + }; + } + Ok(true) + }); + Ok((rx, Handle { + joiner, + handle, + })) +} + +//TODO: The API for spawning the `signal_hook` thread & returning a `CrosslinkReceiver` that it notifies on `SIGWINCH`s (with its join-handle) (that will break out of the loop and cleanly exit the thread if there are no receivers left.) + +/// Spawn a background thread to intercept specified `signals` to this process. +/// +/// When any of the specified signals are intercepted, a notification is sent to the returned [CrosslinkReceiver], which can be waited on in an async context. +/// +/// # Closing +/// The background thread can be explicitly communicated with through `Handle`, but will automatically close if a signal arrives *after* the returned receiver has been dropped. +/// However, as the dropping of the receiver will not auto-shutdown the background thread *until* another signal comes in and it notices it cannot send the notification, it is desireable that one of the explicit `close()` methods on `Handle` be registered to be called when either the receiver becomes unavailable to the caller's task-set or the task is over. +/// +/// Dropping the handle will not communicate a close, and since the lifetimes of the receiver and handle are seperated, they do not automatically interface with eachother. +pub fn spawn_signal_watcher(signals: I) -> io::Result<(CrosslinkReceiver, Handle)> +where I: IntoIterator, + S: std::borrow::Borrow, +{ + use std::collections::BTreeSet; + let signals: BTreeSet<_> = signals.into_iter().map(|x| *x.borrow()).collect(); + + spawn_signal_watcher_with_callback(signals.clone(), move |sig, sender| { + Ok(if signals.contains(&sig) { + sender.try_notify() + } else { + return Err(io::Error::new(io::ErrorKind::NotFound, format!("Signal {:?} was not found in the original signal list {:?}", sig, signals.iter().copied().collect::>()))); + }) + }) +} diff --git a/src/work.rs b/src/work.rs index 2f437ba..45726a2 100644 --- a/src/work.rs +++ b/src/work.rs @@ -144,10 +144,46 @@ where I: IntoIterator, } }; + // To close the backing handle and un-hook the signals: `close_resize_handle().await.expect("Failed to close reactive handle")` + // NOTE: This must be called *before* shutting down the progress-bar. #[cfg(feature="progress-reactive")] - let _size_handle = { + let close_resize_handle = { let mut progress = progress.clone(); - todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in."); + + let (rx, handle) = progress::reactive::spawn_signal_watcher([signal_hook::consts::signal::SIGWINCH])?; + + let raw_handle = handle.signal_handle().clone(); + let rx = tokio::spawn(async move { + use futures::stream::StreamExt; + + let _on_exit = defer::Defer::new(move || raw_handle.close()); + + let rx = rx.into_stream(); + futures::pin_mut!(rx); + + while let Some(_) = rx.next().await { + match progress.resize_bar(None).await { + Ok(_) => (), // NOTE: Do not wait for the resize to complete before checking if we may need to do so again. + Err(e) if e.can_ignore_on_com_send_failure() => { + // The progress-bar has been requested to shut down, we should end the task here. + + // _on_exit.now(); + return; + }, + Err(e) => { + let _ = progress.eprintln(format!("[{}] {}: Failed to resize progress-bar: {:?}", colour::style(colour!(Color::BrightRed), "!"), colour::style(colour!(Color::BrightWhite), "Error"), e )).await; + } + } + } + }); + use std::io; + async move || -> io::Result<()> { + if handle.close()? == false { + return Err(io::Error::new(io::ErrorKind::BrokenPipe, "The `resize_handle` task has already exited before it was requested to (BUG: This is not a problem, but means the resize-handle has not been closed in the right order in the bringdown code.)")); + } + rx.await?; + Ok(()) + } }; let display = { @@ -299,6 +335,11 @@ where I: IntoIterator, #[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); } + #[cfg(feature="progress-reactive")] { + if let Err(e) = close_resize_handle().await { + let _ = progress.eprintln(format!("[{}] Warning! Failed to close progress-reactive resize handle: {:?}", colour::style(colour!(Color::Yellow),"!"), e)).await; + } + }; #[cfg(feature="progress")] progress.shutdown().await?; drop(tx); From aaaddd66ec9d5b3ca007a5b40003f6d9c33dbb4a Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 27 Mar 2025 15:51:04 +0000 Subject: [PATCH 5/5] Added fully-functional (default) feature `progress-reactive`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Blessing − 吉 --- Cargo.lock | 2 +- Cargo.toml | 4 ++-- src/arg.rs | 26 ++++++++++++++++++++++++++ src/ext.rs | 17 +++++++++++++++++ src/main.rs | 1 + src/work.rs | 49 +++++++++++++++++++++++++++++++++++++++++-------- 6 files changed, 88 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 818588a..083020b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,7 +178,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "leanify-many" -version = "1.2.1+1" +version = "1.2.1+2" dependencies = [ "cfg-if", "futures", diff --git a/Cargo.toml b/Cargo.toml index bf3f1db..3e69fa8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "leanify-many" -version = "1.2.1+1" +version = "1.2.1+2" description = "spawn leanify subprocesses" authors = ["Avril "] edition = "2018" @@ -48,7 +48,7 @@ collect_err = [] # Without this feature enabled, `leanify` subprocesses will receive terminating `SIGINT`s as normal. shutdown = ["libc"] -# TODO: Implement this to: Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width. (XXX: Use a background thread (outside the thread-pool, as it's blocking) listening on `signal_hooks::Signals.forever()` for this that sends events through a shared Tokio `CondVar` notify_all() call.) +# Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width when appropriate. progress-reactive = ["progress", "tokio/signal", "signal-hook", "terminal_size"] [dependencies] diff --git a/src/arg.rs b/src/arg.rs index 9091459..a67586b 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -28,6 +28,7 @@ mod extra { #[inline] fn extra_args(#[allow(unused_variables)] output: &mut W) -> fmt::Result { #[cfg(feature="progress")] writeln!(output, " --no-progress Do not display progress bar")?; + #[cfg(feature="progress-reactive")] writeln!(output, " --static-progress Do not dynamically resize the progress bar")?; #[cfg(feature="colour")] writeln!(output, " --no-colour Do not display terminal colours")?; #[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?; #[cfg(feature="shutdown")] writeln!(output, " -n, --no-cancel Do not capture `SIGINT`s for graceful shutdown.")?; @@ -163,6 +164,9 @@ fn comp_flags() check!(on "splash", "Show splash-screen"); check!(on "colour", "Enable coloured output"); check!(on "progress", "Enable progress bar"); + if cfg!(feature="progress") { + check!(on "progress-reactive", "Enable progress bar to reactively-resize to terminal width when changed."); + } check!(on "collect_err", "Collect the output of children's stderr instead of printing immediately"); check!(off "threads", "Enable threaded scheduler (usually not needed)"); check!(off "checked_pass", "Check the arguments passed with `--passthrough` to leanify. By default they are passed as is"); @@ -258,6 +262,8 @@ pub struct Flags { /// Display the progress bar #[cfg(feature="progress")] pub progress: bool, + /// Allow the dynamic reactive resizing of the progress bar + #[cfg(feature="progress-reactive")] pub progress_reactive: bool, /// Force use of colour #[cfg(feature="colour")] pub coloured: Option, /// Limit max children to this number @@ -268,6 +274,20 @@ pub struct Flags #[cfg(feature="shutdown")] pub graceful_shutdown: bool, } +impl Flags +{ + /// Should we watch for `SIGWINCH` to dynamically resize the progress bar? + /// + /// This will return false if there is either: not an output window that can be resized, no rendered progress bar (specified by user,) no dynamic-resize of progress bar (specified by user,) or the `progress-reactive` feature was not enabled at build-time. + #[inline] + pub fn watch_sigwinch(&self) -> bool + { + #![allow(unreachable_code)] + #[cfg(feature="progress-reactive")] return self.progress && self.progress_reactive && (util::is_terminal(&std::io::stdout()) || util::is_terminal(&std::io::stderr())); + false + } +} + impl Default for Flags { #[inline] @@ -279,6 +299,7 @@ impl Default for Flags hard_limit: None, leanify_flags: Default::default(), #[cfg(feature="shutdown")] graceful_shutdown: true, + #[cfg(feature="progress-reactive")] progress_reactive: true, } } } @@ -364,6 +385,11 @@ where I: IntoIterator, continue; } }, + #[cfg(feature="progress-reactive")] + "--static-progress" => { + cfg.flags.progress_reactive = false; + continue; + }, #[cfg(feature="progress")] "--no-progress" => { cfg.flags.progress = false; diff --git a/src/ext.rs b/src/ext.rs index 78bc335..9188cb6 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -24,3 +24,20 @@ where I: Iterator, output } } + +/// Explicit utilities +pub mod util { + use crate::*; + use std::os::fd::*; + + /// Check if `stream` is open to a tty. + pub fn is_terminal(stream: &T) -> bool + { + use std::ffi::c_int; + unsafe extern "C" { + safe fn isatty(fd: c_int) -> c_int; + } + + isatty(stream.as_fd().as_raw_fd()) == 1 + } +} diff --git a/src/main.rs b/src/main.rs index 093afe8..5b994f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use cfg_if::cfg_if; mod defer; mod ext; pub use ext::JoinStrsExt as _; +pub use ext::util; #[cfg(feature="splash")] mod splash; diff --git a/src/work.rs b/src/work.rs index 45726a2..9633944 100644 --- a/src/work.rs +++ b/src/work.rs @@ -144,12 +144,43 @@ where I: IntoIterator, } }; - // To close the backing handle and un-hook the signals: `close_resize_handle().await.expect("Failed to close reactive handle")` + /// Coerce a potentially-`None` thunk's return type into an `Option`, so that `Option T>` becomes `Fn() -> Option`. + /// + /// # `Option` mapping + /// Example usage would be `maybe_func!(if flag { Some(some_thunk_expr) } else { None })` will map the return type to `Option`, creating a thunk with the same signature as `some_thunk_expr` but an `Option`-wrapped return type. + /// + /// ## Internal type-coercion + /// The return-type can also be coerced itself via `maybe_func!(U: option_thunk_expression)` (where `U: From>`,) creating `for>> Option T> -> Fn() -> Option`. + macro_rules! maybe_func { + ($type:ty: $func:expr) => {{ + let func = $func; + move || -> $type { + if let Some(func) = func { + Some(func()).into() + } else { + None.into() + } + } + }}; + ($func:expr) => {{ + let func = $func; + move || { + if let Some(func) = func { + Some(func()) + } else { + None + } + } + }} + } + + // To close the backing handle and un-hook the signals: `close_resize_handle().await[.expect("Failed to close reactive handle")]` // NOTE: This must be called *before* shutting down the progress-bar. + // The return type is coerced to `Option>` (`None` is for if `flags.sigwinch()` is false.) #[cfg(feature="progress-reactive")] - let close_resize_handle = { + let close_resize_handle = maybe_func!(futures::future::OptionFuture<_>: if flags.watch_sigwinch() { let mut progress = progress.clone(); - + let (rx, handle) = progress::reactive::spawn_signal_watcher([signal_hook::consts::signal::SIGWINCH])?; let raw_handle = handle.signal_handle().clone(); @@ -157,7 +188,7 @@ where I: IntoIterator, use futures::stream::StreamExt; let _on_exit = defer::Defer::new(move || raw_handle.close()); - + let rx = rx.into_stream(); futures::pin_mut!(rx); @@ -177,14 +208,16 @@ where I: IntoIterator, } }); use std::io; - async move || -> io::Result<()> { + Some(async move || -> io::Result<()> { if handle.close()? == false { return Err(io::Error::new(io::ErrorKind::BrokenPipe, "The `resize_handle` task has already exited before it was requested to (BUG: This is not a problem, but means the resize-handle has not been closed in the right order in the bringdown code.)")); } rx.await?; Ok(()) - } - }; + }) + } else { + None + }); let display = { #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -336,7 +369,7 @@ where I: IntoIterator, #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); } #[cfg(feature="progress-reactive")] { - if let Err(e) = close_resize_handle().await { + if let Some(Err(e)) = close_resize_handle().await { let _ = progress.eprintln(format!("[{}] Warning! Failed to close progress-reactive resize handle: {:?}", colour::style(colour!(Color::Yellow),"!"), e)).await; } };