diff --git a/Cargo.toml b/Cargo.toml index 0603e57..bf3f1db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ codegen-units = 1 default = ["splash", "progress-reactive", "colour", "collect_err", "shutdown"] # Enable progress bar -progress = ["termprogress", "pin-project", "terminal_size"] +progress = ["termprogress", "pin-project"] # Enable threaded scheduler # @@ -49,7 +49,7 @@ collect_err = [] shutdown = ["libc"] # TODO: Implement this to: Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width. (XXX: Use a background thread (outside the thread-pool, as it's blocking) listening on `signal_hooks::Signals.forever()` for this that sends events through a shared Tokio `CondVar` notify_all() call.) -progress-reactive = ["progress", "tokio/signal", "signal-hook"] +progress-reactive = ["progress", "tokio/signal", "signal-hook", "terminal_size"] [dependencies] lazy_static = "1.4" diff --git a/src/progress.rs b/src/progress.rs index d88f889..7eff400 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -43,6 +43,9 @@ use termprogress::{ ProgressBar, }; +#[cfg(feature="progress-reactive")] +mod reactive; + #[derive(Debug)] pub struct Task { @@ -457,12 +460,14 @@ pub fn create_progress { use std::os::fd::{ AsFd, AsRawFd, }; + if let Some((w, _)) = terminal_size::terminal_size() .or_else(|| terminal_size::terminal_size_using_fd(std::io::stderr().as_fd().as_raw_fd())) { @@ -480,6 +485,8 @@ pub fn create_progress unimplemented!("`progress-reactive` feature not enabled! Cannot resize to terminal size."), CommandKind::BumpHigh(high) => { let stat = stat.to_mut(); stat.high+=high; diff --git a/src/progress/reactive.rs b/src/progress/reactive.rs new file mode 100644 index 0000000..fe3396c --- /dev/null +++ b/src/progress/reactive.rs @@ -0,0 +1,108 @@ +//! Implementation of `progress-reactive` signal hooking & passing features +use super::*; + +use std::{ + thread, +}; +use tokio::{ + sync::{ + Notify, + Mutex, + RwLock, + }, +}; +use futures::{ + future::{ + Aborted, Abortable, abortable, AbortHandle, + }, +}; + +/// An async condition variable that holds access to a potential value. +/// +/// Notification is *single* task only, for now there is no `notify_all()` implementation on this. +/// +/// # Ownership and value transference +/// To pass a value through the condvar, `Arc>` can be used to simplify the ownership value-awaiting model to prevent deadlocks caused by dropped possible communicators. +/// Generally however, if the `CondVar` is accessible via shared reference (non-exclusive ownership held) upon a waiting operation, it will assume it is `Sync`-accessible by shared reference so you must ensure that deadlocks cannot be reached when waiting. +/// +/// ## Note on API's potential to create ~asyncronous~ un-completable `Future`s +/// In this context *"deadlock"* is being used to refer to an *un-completable future*, so as long as the future can be cancelled or awaiting can be stopped it's not going to hang up the task obvs; there's no syncronous blocking in the API. +#[derive(Debug)] +pub struct CondVar +{ + /// The condition to wait on and notify to. + cond: Notify, + /// The (possibly empty) variable slot that is read after or written to before. + var: RwLock>>, // XXX: +} + +unsafe impl Sync for CondVar{} + +impl CondVar { + compile_error!("TODO: XXX: Re-do this, we don't want to just re-implement the sync condvar interface because there's no real need to. Hell, do we even need the condvar at all? Can we just use a `Arc` for the purposes of this module \ + (XXX: i.e. \"backing sync thread **notifies** a running async task holding a progress handle to send a `Resize` command when a `SIGWINCH` signal is received on it\" is **literally** what we want this module to do.)"); + + async fn wait_raw_while(&self, mut pred: P, cancel: Fut) -> bool + where P: AsyncFnMut(Option<&T>) -> bool, //TODO: How to impl this with the async mutex? + Fut: Future + { + use futures::FutureExt; + let (cancel, token) = { + let (handle, reg) = AbortHandle::new_pair(); + + (cancel.map(move |_| handle.abort()), reg) + }; + + let waiter = async { + loop { + // Check the predicate + let waiter = { + let value = self.var.read().await; + let waiter = self.cond.notified(); // NOTE: We want to check if there is a notification *after* acquiring (**and** releasing) the read lock has been successful; so that if there was a writer that yielded to us, we get their notification immediately (see below.) (XXX: I don't know if calling this function without polling it here instead of below (after the read lock has been released) changes anything; if it does, move this down below. It's only been defined here instead as a visual guide to the ordering.) + if !pred(value.as_deref()).await { + break false; + } else { + drop(value); // NOTE: That we release the `read` lock *before* checking if the notification comes through, so the notifier can notify us *before* dropping their `write` lock. + } + + waiter + }; + + // Check if we've been notified + // + // TODO: XXX: Would this become a spinning loop if there are no current nofification operations goin on since we're not actually `await`ing on the notification itself at all? Is there any real way to prevent this??? + if let Some(_) = waiter.now_or_never() { + break true; + } + + tokio::task::yield_now().await; // XXX: Do we actually want to yield this here...? + } + }; + + // We have waited for the validation through `pred`, and a notification has arrived to us. + tokio::select! { + cont = waiter => { + cont + } + _ = cancel => { + false + } + } + } + + async fn notify_raw_with(&self, mut setter: P) -> bool + where P: AsyncFnMut(&mut Option>) -> bool, + Fut: Future + { + let mut value = self.var.write().await; + if !setter(&mut value).await { + return false; + } + self.cond.notify(); + + drop(value); // NOTE: We do not release the write lock until *after* we have notified, so a currently blocking reader will immediately get the notification + true + } + //TODO: Implement CondVar's `async wait(pred, tok) -> Option + '_>` & `async notify(value: T)` + //TODO: & also `async wait_owned(Arc, ...)` & `async notify_owned(Arc)` too (XXX: also `&mut self` receiver funcs can *statically* guarantee that there is no other pending waiter/sender and can thus access the value directly; which is *dynamically* guaranteed with the `Arc` receiver funcs.) +} diff --git a/src/work.rs b/src/work.rs index efb0bcd..2f437ba 100644 --- a/src/work.rs +++ b/src/work.rs @@ -147,7 +147,7 @@ where I: IntoIterator, #[cfg(feature="progress-reactive")] let _size_handle = { let mut progress = progress.clone(); - todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{None} to `progress` when an event comes in."); + todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in."); }; let display = {