Compare commits

...

6 Commits

Author SHA1 Message Date
Avril d8f217d7cb
Version bump to 1.2.2: Added `progress-reactive` feature (default.)
5 days ago
Avril aaaddd66ec
Added fully-functional (default) feature `progress-reactive`.
5 days ago
Avril ad03ce86e8
Added functional feature `progress-reactive`: A caught `SIGWINCH` signal will now re-draw progress bars.
5 days ago
Avril 9412636a0f
`progress-reactive`: Re-worked internal sync-to-async-signal barrier API to the much more simple and complete `reactive::Crosslink{,Sender,Receiver}`
6 days ago
Avril a4f288fd33
Started `progress-reactive` internal API. (Re-starting design.)
6 days ago
Avril 8d8f932081
Started impl of feature `progress-reactive`: Addded `.resize_bar()` (`CommandKind::Resize`) to update progress bar max size to fit recalculated terminal width.
1 week ago

25
Cargo.lock generated

@ -2,12 +2,6 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "atty"
version = "0.2.14"
@ -184,7 +178,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "leanify-many"
version = "1.2.1"
version = "1.2.2"
dependencies = [
"cfg-if",
"futures",
@ -194,6 +188,8 @@ dependencies = [
"pin-project",
"recolored",
"rustc_version",
"signal-hook",
"terminal_size",
"termprogress",
"tokio",
]
@ -401,13 +397,22 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "signal-hook"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.2.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"arc-swap",
"libc",
]

@ -1,6 +1,6 @@
[package]
name = "leanify-many"
version = "1.2.1"
version = "1.2.2"
description = "spawn leanify subprocesses"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1
[features]
default = ["splash", "progress", "colour", "collect_err", "shutdown"]
default = ["splash", "progress-reactive", "colour", "collect_err", "shutdown"]
# Enable progress bar
progress = ["termprogress", "pin-project"]
@ -48,6 +48,8 @@ collect_err = []
# Without this feature enabled, `leanify` subprocesses will receive terminating `SIGINT`s as normal.
shutdown = ["libc"]
# Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width when appropriate.
progress-reactive = ["progress", "tokio/signal", "signal-hook", "terminal_size"]
[dependencies]
lazy_static = "1.4"
@ -59,6 +61,8 @@ recolored = { version = "1.9", optional = true }
num_cpus = "1.13"
pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"], optional = true }
terminal_size = { version = "^0.1.13", optional = true }
signal-hook = { version = "0.3.17", features = ["iterator"], optional = true }
[build-dependencies]
rustc_version = "0.2"

@ -28,6 +28,7 @@ mod extra {
#[inline] fn extra_args<W: Write + ?Sized>(#[allow(unused_variables)] output: &mut W) -> fmt::Result
{
#[cfg(feature="progress")] writeln!(output, " --no-progress Do not display progress bar")?;
#[cfg(feature="progress-reactive")] writeln!(output, " --static-progress Do not dynamically resize the progress bar")?;
#[cfg(feature="colour")] writeln!(output, " --no-colour Do not display terminal colours")?;
#[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?;
#[cfg(feature="shutdown")] writeln!(output, " -n, --no-cancel Do not capture `SIGINT`s for graceful shutdown.")?;
@ -163,6 +164,9 @@ fn comp_flags()
check!(on "splash", "Show splash-screen");
check!(on "colour", "Enable coloured output");
check!(on "progress", "Enable progress bar");
if cfg!(feature="progress") {
check!(on "progress-reactive", "Enable progress bar to reactively-resize to terminal width when changed.");
}
check!(on "collect_err", "Collect the output of children's stderr instead of printing immediately");
check!(off "threads", "Enable threaded scheduler (usually not needed)");
check!(off "checked_pass", "Check the arguments passed with `--passthrough` to leanify. By default they are passed as is");
@ -258,6 +262,8 @@ pub struct Flags
{
/// Display the progress bar
#[cfg(feature="progress")] pub progress: bool,
/// Allow the dynamic reactive resizing of the progress bar
#[cfg(feature="progress-reactive")] pub progress_reactive: bool,
/// Force use of colour
#[cfg(feature="colour")] pub coloured: Option<bool>,
/// Limit max children to this number
@ -268,6 +274,20 @@ pub struct Flags
#[cfg(feature="shutdown")] pub graceful_shutdown: bool,
}
impl Flags
{
/// Should we watch for `SIGWINCH` to dynamically resize the progress bar?
///
/// This will return false if there is either: not an output window that can be resized, no rendered progress bar (specified by user,) no dynamic-resize of progress bar (specified by user,) or the `progress-reactive` feature was not enabled at build-time.
#[inline]
pub fn watch_sigwinch(&self) -> bool
{
#![allow(unreachable_code)]
#[cfg(feature="progress-reactive")] return self.progress && self.progress_reactive && (util::is_terminal(&std::io::stdout()) || util::is_terminal(&std::io::stderr()));
false
}
}
impl Default for Flags
{
#[inline]
@ -279,6 +299,7 @@ impl Default for Flags
hard_limit: None,
leanify_flags: Default::default(),
#[cfg(feature="shutdown")] graceful_shutdown: true,
#[cfg(feature="progress-reactive")] progress_reactive: true,
}
}
}
@ -364,6 +385,11 @@ where I: IntoIterator<Item=T>,
continue;
}
},
#[cfg(feature="progress-reactive")]
"--static-progress" => {
cfg.flags.progress_reactive = false;
continue;
},
#[cfg(feature="progress")]
"--no-progress" => {
cfg.flags.progress = false;

@ -24,3 +24,20 @@ where I: Iterator<Item=T>,
output
}
}
/// Explicit utilities
pub mod util {
use crate::*;
use std::os::fd::*;
/// Check if `stream` is open to a tty.
pub fn is_terminal<T: ?Sized + AsFd>(stream: &T) -> bool
{
use std::ffi::c_int;
unsafe extern "C" {
safe fn isatty(fd: c_int) -> c_int;
}
isatty(stream.as_fd().as_raw_fd()) == 1
}
}

@ -9,6 +9,7 @@ use cfg_if::cfg_if;
mod defer;
mod ext;
pub use ext::JoinStrsExt as _;
pub use ext::util;
#[cfg(feature="splash")]
mod splash;
@ -31,9 +32,8 @@ mod maybe_single;
#[cfg(feature="progress")] mod task_list;
#[cfg(feature="progress")] mod progress;
async fn work() -> Result<(), Box<dyn std::error::Error>>
async fn work() -> Result<Option<std::num::NonZeroI32>, Box<dyn std::error::Error>>
{
//println!("umm {}", colour::style(colour!(Color::Blue), "hiii"));
let args = arg::parse_args().await.with_prefix("failed to parse args")?;
let leanify = leanify::find_binary().with_prefix("Couldn't find leanify binary")?;
@ -85,13 +85,17 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
},
_ => args.max_children,
}, get_shutdown_future(&args.flags).fuse()).await
}, get_shutdown_future(&args.flags).fuse()).await
.map(|_| None) //TODO: Can the shutdown future set an `is_interrupted` var we can use to return a non-zero exit code from a SIGINT graceful shutdown here (e.g. `is_interrupted.then_some(1)`)? Do something like this...
}
#[tokio::main]
async fn main() {
prettify_expect(work().await.map_err(|e| e.to_string()), "exited with error");
if let Some(code) = prettify_expect(work().await.map_err(|e| e.to_string()), "exited with error") {
// Exiting with error (non-zero) code
std::process::exit(code.get());
}
}
#[inline] fn prettify_expect<T,E,S>(res: Result<T,E>, msg: S) -> T

@ -19,6 +19,7 @@ use std::{
self,
Once,
},
num::NonZeroUsize,
};
use tokio::{
sync::{
@ -42,6 +43,9 @@ use termprogress::{
ProgressBar,
};
#[cfg(feature="progress-reactive")]
pub mod reactive;
#[derive(Debug)]
pub struct Task
{
@ -68,6 +72,10 @@ pub enum CommandKind
Complete,
Resize {
to: Option<NonZeroUsize>,
},
Many(Vec<CommandKind>),
}
@ -312,6 +320,15 @@ impl ProgressSender
self.send_command(CommandKind::BumpLow(by)).await
}
/// Resize the whole bar to either a specific size, or to query the terminal size.
///
/// Currently, the terminal size is grabbed from `stdout` or `stderr` if possible.
/// If `None` is passed and the output of `stdout` *and* `stderr` are not TTYs, no resize will take place.
pub async fn resize_bar(&mut self, to: Option<NonZeroUsize>) -> Result<CommandWaiter, Error>
{
self.send_command(CommandKind::Resize { to }).await
}
/// Add a task to the worker's progress bar title line
///
/// This function returns a [TaskWaiter]`TaskWaiter` future, upon successful `await`ing will yield the task's ID.
@ -382,7 +399,6 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
low: 0,
};
let (stat_tx, stat_rx) = watch::channel(stat.clone());
let handle = {
let handle = task::spawn(async move {
@ -444,6 +460,33 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
let mut has_blanked = false;
while let Some(command) = commands.next().await {
match command {
#[cfg(feature="progress-reactive")]
CommandKind::Resize { to: None } => {
use std::os::fd::{
AsFd,
AsRawFd,
};
if let Some((w, _)) = terminal_size::terminal_size()
.or_else(|| terminal_size::terminal_size_using_fd(std::io::stderr().as_fd().as_raw_fd()))
{
// Blank the line, and tell to redraw after the dimension update.
progress.blank();
has_blanked = true;
progress.update_dimensions(w.0 as usize);
}
},
CommandKind::Resize { to: Some(to) } => {
// Blank the line, and tell to redraw after the dimension update.
progress.blank();
has_blanked = true;
progress.update_dimensions(to.get());
},
#[allow(unreachable_patterns)]
CommandKind::Resize { .. } => unimplemented!("`progress-reactive` feature not enabled! Cannot resize to terminal size."),
CommandKind::BumpHigh(high) => {
let stat = stat.to_mut();
stat.high+=high;

@ -0,0 +1,422 @@
//! Implementation of `progress-reactive` signal hooking & passing features
use super::*;
use std::{
ops,
thread,
io,
sync::{
Arc,
Weak,
},
};
use tokio::{
sync::{
Notify,
Mutex,
RwLock,
},
};
use futures::{
prelude::*,
stream::{
self,
Stream,
},
future::{
Aborted, Abortable, abortable, AbortHandle,
Shared,
WeakShared,
Remote, RemoteHandle,
},
};
/// Inner type for sending pulse signal from sync backing thread to async task.
#[derive(Debug, Default)]
pub(super) struct Crosslink
{
pub notification: Notify,
}
/// Sends pulse signals synchonously to an async
#[derive(Debug, Clone)]
pub struct CrosslinkSender(Weak<Crosslink>);
/// Receives pulse signals in a way that can be `await`ed for.
#[derive(Debug)] // NOTE: Not `Clone`, dealing with multiple receivers is too much of a headache with no `notify_all()`.
pub struct CrosslinkReceiver(Arc<Crosslink>);
impl Crosslink
{
/// Consumes this owned reference into a `Clone`able future that can be waited on by multiple tasks at once.
#[inline]
pub fn waiter_shared(self: Arc<Self>) -> impl Future<Output = ()> + Clone + Send + Sync + Unpin + Sized + 'static
{
async move {
self.notification.notified().await
}.shared()
}
/// Create a `Clone`able future that can be waited on by multiple tasks at once, **but** is still lifetime-bound by-ref to the instance.
///
/// # Outliving the owner
/// If a `'static` lifetime bound is required (e.g. due to spawning on a non-local task-set,) use `waiter_shared()` on `Arc<Self>`.
#[inline]
pub fn create_waiter_shared(&self) -> impl Future<Output = ()> + Clone + Send + Sync + Unpin + Sized + use<'_>
{
self.notification.notified().shared()
}
/// Split the link into `(tx, rx)` pair.
///
/// # Example usage
/**```
# use std::sync::Arc;
# use leanify_many::progress::reactive::*;
let (tx, rx) = Arc::new(Crosslink::default()).into_split();
let rx = tokio::spawn!(async move {
let rx = rx.into_stream();
let mut n = 0usize;
while let Some(_) = rx.next().await {
n+=1;
println!("Received notification {n} time(s)!");
}
println!("Sender(s) all gone!");
});
// Notify the backing task twice.
tx.notify();
tx.notify();
// Drop the sender, and wait for the backing task to exit.
drop(tx);
rx.await.unwrap();
```*/
#[inline]
pub(super) fn into_split(self: Arc<Self>) -> (CrosslinkSender, CrosslinkReceiver)
{
let tx = Arc::downgrade(&self);
(CrosslinkSender(tx), CrosslinkReceiver(self))
}
}
impl CrosslinkReceiver
{
fn has_senders(&self) -> bool
{
Arc::weak_count(&self.0) != 0
}
/// Consume receiver into a `Stream` that yields each notification on `.next()`.
///
/// The stream ends when it is determined that there can be no more signals sent.
pub fn into_stream(self) -> impl Stream<Item = ()> + Send + Sync + 'static
{
stream::unfold(self, move |state| async move {
if state.has_senders() { // If there are more than 0 senders (weak references.)
state.0.notification.notified().await; // Wait for a notification. (XXX: This may not complete if all senders drop *while* it's being waited on.)
} else {
return None;
}
// If there are no senders left (i.e. we received a notification from the final sender `Drop`ing) we do not want to yield an element but end the stream.
state.has_senders().then(move || ((), state))
})
}
/// Wait for a notification or for there to be no senders left.
///
/// Note that this *will* complete spuriously if it is the final receiver and the final sender is dropped, however it **also** *may* complete spuriously before that.
///
/// (This future is cancel-safe.)
#[inline]
#[must_use]
pub async fn try_wait(&self) -> bool
{
if self.has_senders() {
return false;
}
self.0.notification.notified().await;
self.has_senders()
}
/// Wait for a notification to be sent.
///
/// # Panics
/// If a signal is not received before the last sender is dropped.
pub fn wait(&self) -> impl Future<Output = ()> + Send + Sync + '_
{
#[inline(never)]
#[cold]
fn _panic_no_senders() -> !
{
panic!("no senders left that can signal")
}
self.try_wait().map(|r| if !r {
})
}
/// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders.
///
/// # Safety
/// This function will return a non-completable future if there are already no senders when it is called.
/// It may be preferable to use `try_wait_unsafe()` instead, (as that returns `ready()` if there are none instead of `pending()`.)
#[inline(always)]
fn wait_unsafe(&self) -> impl Future + Send + Sync + '_
{
self.0.notification.notified()
}
/// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders.
///
/// # Safety
/// This function will return an immediately completed function if there are no senders when it is called.
/// But when the returned future completes it cannot be differentiated between an actual intentional `Sender::notify()` call, or if it's from the final sender being dropped.
#[inline]
fn try_wait_unsafe(&self) -> impl Future + Send + Sync + '_
{
if ! self.has_senders() {
future::ready(()).left_future()
} else {
self.0.notification.notified().right_future()
}
}
}
impl ops::Drop for CrosslinkSender
{
fn drop(&mut self)
{
// This is the last sender, dropping now.
if self.is_last_sender() {
let n = {
// Remove the last sender from the receiver's view.
let this = std::mem::replace(&mut self.0, Weak::new());
// So we will tell the receiver to wake up.
let Some(n) = this.upgrade() else {
// If there are any...
return
};
// Ensure there are no living senders it can see, before...
drop(this);
n
};
// ...we wake it up, so it knows to die.
n.notification.notify();
}
}
}
impl CrosslinkSender
{
#[inline(always)]
fn has_receivers(&self) -> bool
{
Weak::strong_count(&self.0) > 0
}
#[inline(always)]
pub fn is_last_sender(&self) -> bool
{
Weak::weak_count(&self.0) == 1
}
/// If there are receivers that can be notified.
///
/// # **Non-atomic** operations
/// Note that it is still possible for `notify()` to panic if called after checking this, due to the lack of atomicity.
/// If atomicity is needed, either use `try_map_notify()` (or, if atomicity isn't needed, just ignore the result of `try_notify()` failing instead.)
#[inline]
pub fn can_notify(&self) -> bool
{
self.has_receivers()
}
/// If there are any receivers, returns a thunk that when called will notify one.
///
/// # Usage
/// It is **not** intended for the returned function be kept around long, it is entirely possible that by the time the function is invoked, there are no receivers left.
/// The function will attempt to notify, and if there was no receiver to notify, this will be ignored.
///
/// The intended use is that if there is some work that needs to be done before sending the signal, but that can be skipped if there is no signal to send, the check can be made via a call to this method, and the signal can be sent by calling the returned thunk.
/** ```
# use std::sync::Arc;
# use leanify_many::progress::reactive::*;
# let (tx, _rx) = Arc::new(Crosslink::default()).into_split();
if let Some(notify) = tx.try_map_notify() {
/* ...do some work before sending the signal... */
notify();
}
```*/
#[inline]
#[must_use]
fn try_map_notify(&self) -> Option<impl FnOnce() + Unpin + Send + '_>
{
self.0.upgrade().map(|s| move || s.notification.notify())
}
/// Send a notification signal if possible.
///
/// # Return value
/// If there was a receiver to notify.
#[must_use]
pub fn try_notify(&self) -> bool
{
self.0.upgrade().map(|s| s.notification.notify())
.map(|_| true).unwrap_or(false)
}
/// Send a notification signal
///
/// # Panics
/// If there are no receivers to notify (See [try_notify()](try_notify).)
pub fn notify(&self)
{
#[inline(never)]
#[cold]
fn _panic_no_waiters() -> !
{
panic!("attempted to notify no waiters")
}
if !self.try_notify() {
_panic_no_waiters()
}
}
}
/// Create a new shared `Crosslink` that can be split into a `(CrosslinkSender, CrosslinkReceiver)` pair.
///
/// This convenience function is identical to `Arc::new(Crosslink::default())`.
#[inline(always)]
pub(super) fn crosslink_unsplit() -> Arc<Crosslink>
{
Arc::new(Default::default())
}
/// Create a new shared `(CrosslinkSender, CrosslinkReceiver)` pair.
///
/// This convenience function is identical to `Arc::new(Crosslink::default()).into_split()`.
#[inline(always)]
pub(crate) fn crosslink() -> (CrosslinkSender, CrosslinkReceiver)
{
Arc::new(Crosslink::default()).into_split()
}
/// Handle to a backing signal-watcher thread (see `spawn_signal_watcher()`.)
#[derive(Debug)]
pub struct Handle
{
joiner: thread::JoinHandle<io::Result<bool>>,
handle: signal_hook::iterator::Handle,
}
impl Handle {
/// Check if the backing thread is running.
pub fn is_running(&self) -> bool
{
! self.joiner.is_finished()
}
/// Attempt to close the handle without waiting for it to respond *at all*.
#[inline]
pub fn signal_close(&self)
{
self.handle.close();
}
/// Close the handle without joining the backing thread
///
/// If the backing thread completes without having to `join()`, that result is returned; otherwise, it is ignored.
#[inline]
pub fn close(self) -> io::Result<bool>
{
self.handle.close();
if self.joiner.is_finished() {
return self.joiner.join().unwrap();
}
Ok(true)
}
/// Close the handle and wait for the backing thread to complete.
#[inline]
pub fn close_sync(self) -> io::Result<bool>
{
self.handle.close();
self.joiner.join().expect("background signal_hook thread panicked!")
}
/// Check if the signal watcher has been requested to close.
#[inline]
pub fn is_closed(&self) -> bool
{
self.handle.is_closed()
}
/// Get a handle to the background signal watcher
#[inline(always)]
pub fn signal_handle(&self) -> &signal_hook::iterator::Handle
{
&self.handle
}
}
fn spawn_signal_watcher_with_callback<I, S, F: Send + 'static>(signals: I, mut callback: F) -> io::Result<(CrosslinkReceiver, Handle)>
where I: IntoIterator<Item = S>,
S: std::borrow::Borrow<std::ffi::c_int>,
F: FnMut(std::ffi::c_int, &CrosslinkSender) -> io::Result<bool>,
{
use signal_hook::{
consts::signal,
iterator::Signals,
};
let (tx, rx) = crosslink();
let mut signals = Signals::new(signals)?;
let handle = signals.handle();
let joiner = thread::spawn(move || {
let handle = signals.handle();
let _exit = defer::Defer::new(move || handle.close());
for signal in signals.forever() {
match callback(signal, &tx) {
res @ Ok(false) | res @ Err(_) => return res,
_ => (),
};
}
Ok(true)
});
Ok((rx, Handle {
joiner,
handle,
}))
}
//TODO: The API for spawning the `signal_hook` thread & returning a `CrosslinkReceiver` that it notifies on `SIGWINCH`s (with its join-handle) (that will break out of the loop and cleanly exit the thread if there are no receivers left.)
/// Spawn a background thread to intercept specified `signals` to this process.
///
/// When any of the specified signals are intercepted, a notification is sent to the returned [CrosslinkReceiver], which can be waited on in an async context.
///
/// # Closing
/// The background thread can be explicitly communicated with through `Handle`, but will automatically close if a signal arrives *after* the returned receiver has been dropped.
/// However, as the dropping of the receiver will not auto-shutdown the background thread *until* another signal comes in and it notices it cannot send the notification, it is desireable that one of the explicit `close()` methods on `Handle` be registered to be called when either the receiver becomes unavailable to the caller's task-set or the task is over.
///
/// Dropping the handle will not communicate a close, and since the lifetimes of the receiver and handle are seperated, they do not automatically interface with eachother.
pub fn spawn_signal_watcher<I, S>(signals: I) -> io::Result<(CrosslinkReceiver, Handle)>
where I: IntoIterator<Item = S>,
S: std::borrow::Borrow<std::ffi::c_int>,
{
use std::collections::BTreeSet;
let signals: BTreeSet<_> = signals.into_iter().map(|x| *x.borrow()).collect();
spawn_signal_watcher_with_callback(signals.clone(), move |sig, sender| {
Ok(if signals.contains(&sig) {
sender.try_notify()
} else {
return Err(io::Error::new(io::ErrorKind::NotFound, format!("Signal {:?} was not found in the original signal list {:?}", sig, signals.iter().copied().collect::<Box<[_]>>())));
})
})
}

@ -143,6 +143,81 @@ where I: IntoIterator<Item=T>,
progress::create_progress::<termprogress::silent::Silent,_>(files.len(), iter::empty())
}
};
/// Coerce a potentially-`None` thunk's return type into an `Option`, so that `Option<Fn() -> T>` becomes `Fn() -> Option<T>`.
///
/// # `Option` mapping
/// Example usage would be `maybe_func!(if flag { Some(some_thunk_expr) } else { None })` will map the return type to `Option`, creating a thunk with the same signature as `some_thunk_expr` but an `Option`-wrapped return type.
///
/// ## Internal type-coercion
/// The return-type can also be coerced itself via `maybe_func!(U: option_thunk_expression)` (where `U: From<Option<T>>`,) creating `for<U: From<Option<T>>> Option<Fn() -> T> -> Fn() -> Option<U>`.
macro_rules! maybe_func {
($type:ty: $func:expr) => {{
let func = $func;
move || -> $type {
if let Some(func) = func {
Some(func()).into()
} else {
None.into()
}
}
}};
($func:expr) => {{
let func = $func;
move || {
if let Some(func) = func {
Some(func())
} else {
None
}
}
}}
}
// To close the backing handle and un-hook the signals: `close_resize_handle().await[.expect("Failed to close reactive handle")]`
// NOTE: This must be called *before* shutting down the progress-bar.
// The return type is coerced to `Option<io::Result<()>>` (`None` is for if `flags.sigwinch()` is false.)
#[cfg(feature="progress-reactive")]
let close_resize_handle = maybe_func!(futures::future::OptionFuture<_>: if flags.watch_sigwinch() {
let mut progress = progress.clone();
let (rx, handle) = progress::reactive::spawn_signal_watcher([signal_hook::consts::signal::SIGWINCH])?;
let raw_handle = handle.signal_handle().clone();
let rx = tokio::spawn(async move {
use futures::stream::StreamExt;
let _on_exit = defer::Defer::new(move || raw_handle.close());
let rx = rx.into_stream();
futures::pin_mut!(rx);
while let Some(_) = rx.next().await {
match progress.resize_bar(None).await {
Ok(_) => (), // NOTE: Do not wait for the resize to complete before checking if we may need to do so again.
Err(e) if e.can_ignore_on_com_send_failure() => {
// The progress-bar has been requested to shut down, we should end the task here.
// _on_exit.now();
return;
},
Err(e) => {
let _ = progress.eprintln(format!("[{}] {}: Failed to resize progress-bar: {:?}", colour::style(colour!(Color::BrightRed), "!"), colour::style(colour!(Color::BrightWhite), "Error"), e )).await;
}
}
}
});
use std::io;
Some(async move || -> io::Result<()> {
if handle.close()? == false {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "The `resize_handle` task has already exited before it was requested to (BUG: This is not a problem, but means the resize-handle has not been closed in the right order in the bringdown code.)"));
}
rx.await?;
Ok(())
})
} else {
None
});
let display = {
#[cfg(feature="progress")] let mut progress = progress.clone();
@ -191,7 +266,7 @@ where I: IntoIterator<Item=T>,
let process = Arc::clone(&process);
let mut tx = tx.clone();
let flags = flags.clone();
let flags = flags.clone(); // XXX: Can we remove this clone somehow? It's kinda big this structure and we only need to do this because of not being able to exclude 'cli from `+ use<>` in the `async fn do_work()` as it's an `async fn` and not a `fn() -> impl Future`...
#[cfg(feature="progress")] let mut progress = progress.clone();
@ -288,11 +363,16 @@ where I: IntoIterator<Item=T>,
Ok(v) => Box::new(v),
};
for failed in results
for failed in results
{
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed);
}
#[cfg(feature="progress-reactive")] {
if let Some(Err(e)) = close_resize_handle().await {
let _ = progress.eprintln(format!("[{}] Warning! Failed to close progress-reactive resize handle: {:?}", colour::style(colour!(Color::Yellow),"!"), e)).await;
}
};
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx);

Loading…
Cancel
Save