Added functional feature `progress-reactive`: A caught `SIGWINCH` signal will now re-draw progress bars.

Fortune for leanify-many's current commit: Future blessing − 末吉
safe-cancel-interrupt
Avril 5 days ago
parent 9412636a0f
commit ad03ce86e8
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -44,7 +44,7 @@ use termprogress::{
};
#[cfg(feature="progress-reactive")]
mod reactive;
pub mod reactive;
#[derive(Debug)]
pub struct Task

@ -4,6 +4,7 @@ use super::*;
use std::{
ops,
thread,
io,
sync::{
Arc,
Weak,
@ -289,3 +290,133 @@ impl CrosslinkSender
}
}
/// Create a new shared `Crosslink` that can be split into a `(CrosslinkSender, CrosslinkReceiver)` pair.
///
/// This convenience function is identical to `Arc::new(Crosslink::default())`.
#[inline(always)]
pub(super) fn crosslink_unsplit() -> Arc<Crosslink>
{
Arc::new(Default::default())
}
/// Create a new shared `(CrosslinkSender, CrosslinkReceiver)` pair.
///
/// This convenience function is identical to `Arc::new(Crosslink::default()).into_split()`.
#[inline(always)]
pub(crate) fn crosslink() -> (CrosslinkSender, CrosslinkReceiver)
{
Arc::new(Crosslink::default()).into_split()
}
/// Handle to a backing signal-watcher thread (see `spawn_signal_watcher()`.)
#[derive(Debug)]
pub struct Handle
{
joiner: thread::JoinHandle<io::Result<bool>>,
handle: signal_hook::iterator::Handle,
}
impl Handle {
/// Check if the backing thread is running.
pub fn is_running(&self) -> bool
{
! self.joiner.is_finished()
}
/// Attempt to close the handle without waiting for it to respond *at all*.
#[inline]
pub fn signal_close(&self)
{
self.handle.close();
}
/// Close the handle without joining the backing thread
///
/// If the backing thread completes without having to `join()`, that result is returned; otherwise, it is ignored.
#[inline]
pub fn close(self) -> io::Result<bool>
{
self.handle.close();
if self.joiner.is_finished() {
return self.joiner.join().unwrap();
}
Ok(true)
}
/// Close the handle and wait for the backing thread to complete.
#[inline]
pub fn close_sync(self) -> io::Result<bool>
{
self.handle.close();
self.joiner.join().expect("background signal_hook thread panicked!")
}
/// Check if the signal watcher has been requested to close.
#[inline]
pub fn is_closed(&self) -> bool
{
self.handle.is_closed()
}
/// Get a handle to the background signal watcher
#[inline(always)]
pub fn signal_handle(&self) -> &signal_hook::iterator::Handle
{
&self.handle
}
}
fn spawn_signal_watcher_with_callback<I, S, F: Send + 'static>(signals: I, mut callback: F) -> io::Result<(CrosslinkReceiver, Handle)>
where I: IntoIterator<Item = S>,
S: std::borrow::Borrow<std::ffi::c_int>,
F: FnMut(std::ffi::c_int, &CrosslinkSender) -> io::Result<bool>,
{
use signal_hook::{
consts::signal,
iterator::Signals,
};
let (tx, rx) = crosslink();
let mut signals = Signals::new(signals)?;
let handle = signals.handle();
let joiner = thread::spawn(move || {
let handle = signals.handle();
let _exit = defer::Defer::new(move || handle.close());
for signal in signals.forever() {
match callback(signal, &tx) {
res @ Ok(false) | res @ Err(_) => return res,
_ => (),
};
}
Ok(true)
});
Ok((rx, Handle {
joiner,
handle,
}))
}
//TODO: The API for spawning the `signal_hook` thread & returning a `CrosslinkReceiver` that it notifies on `SIGWINCH`s (with its join-handle) (that will break out of the loop and cleanly exit the thread if there are no receivers left.)
/// Spawn a background thread to intercept specified `signals` to this process.
///
/// When any of the specified signals are intercepted, a notification is sent to the returned [CrosslinkReceiver], which can be waited on in an async context.
///
/// # Closing
/// The background thread can be explicitly communicated with through `Handle`, but will automatically close if a signal arrives *after* the returned receiver has been dropped.
/// However, as the dropping of the receiver will not auto-shutdown the background thread *until* another signal comes in and it notices it cannot send the notification, it is desireable that one of the explicit `close()` methods on `Handle` be registered to be called when either the receiver becomes unavailable to the caller's task-set or the task is over.
///
/// Dropping the handle will not communicate a close, and since the lifetimes of the receiver and handle are seperated, they do not automatically interface with eachother.
pub fn spawn_signal_watcher<I, S>(signals: I) -> io::Result<(CrosslinkReceiver, Handle)>
where I: IntoIterator<Item = S>,
S: std::borrow::Borrow<std::ffi::c_int>,
{
use std::collections::BTreeSet;
let signals: BTreeSet<_> = signals.into_iter().map(|x| *x.borrow()).collect();
spawn_signal_watcher_with_callback(signals.clone(), move |sig, sender| {
Ok(if signals.contains(&sig) {
sender.try_notify()
} else {
return Err(io::Error::new(io::ErrorKind::NotFound, format!("Signal {:?} was not found in the original signal list {:?}", sig, signals.iter().copied().collect::<Box<[_]>>())));
})
})
}

@ -144,10 +144,46 @@ where I: IntoIterator<Item=T>,
}
};
// To close the backing handle and un-hook the signals: `close_resize_handle().await.expect("Failed to close reactive handle")`
// NOTE: This must be called *before* shutting down the progress-bar.
#[cfg(feature="progress-reactive")]
let _size_handle = {
let close_resize_handle = {
let mut progress = progress.clone();
todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in.");
let (rx, handle) = progress::reactive::spawn_signal_watcher([signal_hook::consts::signal::SIGWINCH])?;
let raw_handle = handle.signal_handle().clone();
let rx = tokio::spawn(async move {
use futures::stream::StreamExt;
let _on_exit = defer::Defer::new(move || raw_handle.close());
let rx = rx.into_stream();
futures::pin_mut!(rx);
while let Some(_) = rx.next().await {
match progress.resize_bar(None).await {
Ok(_) => (), // NOTE: Do not wait for the resize to complete before checking if we may need to do so again.
Err(e) if e.can_ignore_on_com_send_failure() => {
// The progress-bar has been requested to shut down, we should end the task here.
// _on_exit.now();
return;
},
Err(e) => {
let _ = progress.eprintln(format!("[{}] {}: Failed to resize progress-bar: {:?}", colour::style(colour!(Color::BrightRed), "!"), colour::style(colour!(Color::BrightWhite), "Error"), e )).await;
}
}
}
});
use std::io;
async move || -> io::Result<()> {
if handle.close()? == false {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "The `resize_handle` task has already exited before it was requested to (BUG: This is not a problem, but means the resize-handle has not been closed in the right order in the bringdown code.)"));
}
rx.await?;
Ok(())
}
};
let display = {
@ -299,6 +335,11 @@ where I: IntoIterator<Item=T>,
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed);
}
#[cfg(feature="progress-reactive")] {
if let Err(e) = close_resize_handle().await {
let _ = progress.eprintln(format!("[{}] Warning! Failed to close progress-reactive resize handle: {:?}", colour::style(colour!(Color::Yellow),"!"), e)).await;
}
};
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx);

Loading…
Cancel
Save