diff --git a/src/progress.rs b/src/progress.rs index 7eff400..31bb437 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -44,7 +44,7 @@ use termprogress::{ }; #[cfg(feature="progress-reactive")] -mod reactive; +pub mod reactive; #[derive(Debug)] pub struct Task diff --git a/src/progress/reactive.rs b/src/progress/reactive.rs index 384ece3..68fde7a 100644 --- a/src/progress/reactive.rs +++ b/src/progress/reactive.rs @@ -4,6 +4,7 @@ use super::*; use std::{ ops, thread, + io, sync::{ Arc, Weak, @@ -289,3 +290,133 @@ impl CrosslinkSender } } +/// Create a new shared `Crosslink` that can be split into a `(CrosslinkSender, CrosslinkReceiver)` pair. +/// +/// This convenience function is identical to `Arc::new(Crosslink::default())`. +#[inline(always)] +pub(super) fn crosslink_unsplit() -> Arc +{ + Arc::new(Default::default()) +} + +/// Create a new shared `(CrosslinkSender, CrosslinkReceiver)` pair. +/// +/// This convenience function is identical to `Arc::new(Crosslink::default()).into_split()`. +#[inline(always)] +pub(crate) fn crosslink() -> (CrosslinkSender, CrosslinkReceiver) +{ + Arc::new(Crosslink::default()).into_split() +} + +/// Handle to a backing signal-watcher thread (see `spawn_signal_watcher()`.) +#[derive(Debug)] +pub struct Handle +{ + joiner: thread::JoinHandle>, + handle: signal_hook::iterator::Handle, +} + +impl Handle { + /// Check if the backing thread is running. + pub fn is_running(&self) -> bool + { + ! self.joiner.is_finished() + } + /// Attempt to close the handle without waiting for it to respond *at all*. + #[inline] + pub fn signal_close(&self) + { + self.handle.close(); + } + /// Close the handle without joining the backing thread + /// + /// If the backing thread completes without having to `join()`, that result is returned; otherwise, it is ignored. + #[inline] + pub fn close(self) -> io::Result + { + self.handle.close(); + if self.joiner.is_finished() { + return self.joiner.join().unwrap(); + } + Ok(true) + } + /// Close the handle and wait for the backing thread to complete. + #[inline] + pub fn close_sync(self) -> io::Result + { + self.handle.close(); + self.joiner.join().expect("background signal_hook thread panicked!") + } + + /// Check if the signal watcher has been requested to close. + #[inline] + pub fn is_closed(&self) -> bool + { + self.handle.is_closed() + } + + /// Get a handle to the background signal watcher + #[inline(always)] + pub fn signal_handle(&self) -> &signal_hook::iterator::Handle + { + &self.handle + } +} + +fn spawn_signal_watcher_with_callback(signals: I, mut callback: F) -> io::Result<(CrosslinkReceiver, Handle)> +where I: IntoIterator, + S: std::borrow::Borrow, + F: FnMut(std::ffi::c_int, &CrosslinkSender) -> io::Result, +{ + use signal_hook::{ + consts::signal, + iterator::Signals, + }; + + let (tx, rx) = crosslink(); + let mut signals = Signals::new(signals)?; + let handle = signals.handle(); + let joiner = thread::spawn(move || { + let handle = signals.handle(); + let _exit = defer::Defer::new(move || handle.close()); + + for signal in signals.forever() { + match callback(signal, &tx) { + res @ Ok(false) | res @ Err(_) => return res, + _ => (), + }; + } + Ok(true) + }); + Ok((rx, Handle { + joiner, + handle, + })) +} + +//TODO: The API for spawning the `signal_hook` thread & returning a `CrosslinkReceiver` that it notifies on `SIGWINCH`s (with its join-handle) (that will break out of the loop and cleanly exit the thread if there are no receivers left.) + +/// Spawn a background thread to intercept specified `signals` to this process. +/// +/// When any of the specified signals are intercepted, a notification is sent to the returned [CrosslinkReceiver], which can be waited on in an async context. +/// +/// # Closing +/// The background thread can be explicitly communicated with through `Handle`, but will automatically close if a signal arrives *after* the returned receiver has been dropped. +/// However, as the dropping of the receiver will not auto-shutdown the background thread *until* another signal comes in and it notices it cannot send the notification, it is desireable that one of the explicit `close()` methods on `Handle` be registered to be called when either the receiver becomes unavailable to the caller's task-set or the task is over. +/// +/// Dropping the handle will not communicate a close, and since the lifetimes of the receiver and handle are seperated, they do not automatically interface with eachother. +pub fn spawn_signal_watcher(signals: I) -> io::Result<(CrosslinkReceiver, Handle)> +where I: IntoIterator, + S: std::borrow::Borrow, +{ + use std::collections::BTreeSet; + let signals: BTreeSet<_> = signals.into_iter().map(|x| *x.borrow()).collect(); + + spawn_signal_watcher_with_callback(signals.clone(), move |sig, sender| { + Ok(if signals.contains(&sig) { + sender.try_notify() + } else { + return Err(io::Error::new(io::ErrorKind::NotFound, format!("Signal {:?} was not found in the original signal list {:?}", sig, signals.iter().copied().collect::>()))); + }) + }) +} diff --git a/src/work.rs b/src/work.rs index 2f437ba..45726a2 100644 --- a/src/work.rs +++ b/src/work.rs @@ -144,10 +144,46 @@ where I: IntoIterator, } }; + // To close the backing handle and un-hook the signals: `close_resize_handle().await.expect("Failed to close reactive handle")` + // NOTE: This must be called *before* shutting down the progress-bar. #[cfg(feature="progress-reactive")] - let _size_handle = { + let close_resize_handle = { let mut progress = progress.clone(); - todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in."); + + let (rx, handle) = progress::reactive::spawn_signal_watcher([signal_hook::consts::signal::SIGWINCH])?; + + let raw_handle = handle.signal_handle().clone(); + let rx = tokio::spawn(async move { + use futures::stream::StreamExt; + + let _on_exit = defer::Defer::new(move || raw_handle.close()); + + let rx = rx.into_stream(); + futures::pin_mut!(rx); + + while let Some(_) = rx.next().await { + match progress.resize_bar(None).await { + Ok(_) => (), // NOTE: Do not wait for the resize to complete before checking if we may need to do so again. + Err(e) if e.can_ignore_on_com_send_failure() => { + // The progress-bar has been requested to shut down, we should end the task here. + + // _on_exit.now(); + return; + }, + Err(e) => { + let _ = progress.eprintln(format!("[{}] {}: Failed to resize progress-bar: {:?}", colour::style(colour!(Color::BrightRed), "!"), colour::style(colour!(Color::BrightWhite), "Error"), e )).await; + } + } + } + }); + use std::io; + async move || -> io::Result<()> { + if handle.close()? == false { + return Err(io::Error::new(io::ErrorKind::BrokenPipe, "The `resize_handle` task has already exited before it was requested to (BUG: This is not a problem, but means the resize-handle has not been closed in the right order in the bringdown code.)")); + } + rx.await?; + Ok(()) + } }; let display = { @@ -299,6 +335,11 @@ where I: IntoIterator, #[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); } + #[cfg(feature="progress-reactive")] { + if let Err(e) = close_resize_handle().await { + let _ = progress.eprintln(format!("[{}] Warning! Failed to close progress-reactive resize handle: {:?}", colour::style(colour!(Color::Yellow),"!"), e)).await; + } + }; #[cfg(feature="progress")] progress.shutdown().await?; drop(tx);