Started `progress-reactive` internal API. (Re-starting design.)

Fortune for leanify-many's current commit: Blessing − 吉
safe-cancel-interrupt
Avril 6 days ago
parent 8d8f932081
commit a4f288fd33
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -19,7 +19,7 @@ codegen-units = 1
default = ["splash", "progress-reactive", "colour", "collect_err", "shutdown"]
# Enable progress bar
progress = ["termprogress", "pin-project", "terminal_size"]
progress = ["termprogress", "pin-project"]
# Enable threaded scheduler
#
@ -49,7 +49,7 @@ collect_err = []
shutdown = ["libc"]
# TODO: Implement this to: Capture `SIGWINCH` events and re-size + re-render the progress bar to the new terminal width. (XXX: Use a background thread (outside the thread-pool, as it's blocking) listening on `signal_hooks::Signals.forever()` for this that sends events through a shared Tokio `CondVar<TerminalWidth>` notify_all() call.)
progress-reactive = ["progress", "tokio/signal", "signal-hook"]
progress-reactive = ["progress", "tokio/signal", "signal-hook", "terminal_size"]
[dependencies]
lazy_static = "1.4"

@ -43,6 +43,9 @@ use termprogress::{
ProgressBar,
};
#[cfg(feature="progress-reactive")]
mod reactive;
#[derive(Debug)]
pub struct Task
{
@ -457,12 +460,14 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
let mut has_blanked = false;
while let Some(command) = commands.next().await {
match command {
#[cfg(feature="progress-reactive")]
CommandKind::Resize { to: None } => {
use std::os::fd::{
AsFd,
AsRawFd,
};
if let Some((w, _)) = terminal_size::terminal_size()
.or_else(|| terminal_size::terminal_size_using_fd(std::io::stderr().as_fd().as_raw_fd()))
{
@ -480,6 +485,8 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.update_dimensions(to.get());
},
#[allow(unreachable_patterns)]
CommandKind::Resize { .. } => unimplemented!("`progress-reactive` feature not enabled! Cannot resize to terminal size."),
CommandKind::BumpHigh(high) => {
let stat = stat.to_mut();
stat.high+=high;

@ -0,0 +1,108 @@
//! Implementation of `progress-reactive` signal hooking & passing features
use super::*;
use std::{
thread,
};
use tokio::{
sync::{
Notify,
Mutex,
RwLock,
},
};
use futures::{
future::{
Aborted, Abortable, abortable, AbortHandle,
},
};
/// An async condition variable that holds access to a potential value.
///
/// Notification is *single* task only, for now there is no `notify_all()` implementation on this.
///
/// # Ownership and value transference
/// To pass a value through the condvar, `Arc<CondVar<T>>` can be used to simplify the ownership value-awaiting model to prevent deadlocks caused by dropped possible communicators.
/// Generally however, if the `CondVar<T>` is accessible via shared reference (non-exclusive ownership held) upon a waiting operation, it will assume it is `Sync`-accessible by shared reference so you must ensure that deadlocks cannot be reached when waiting.
///
/// ## Note on API's potential to create ~asyncronous~ un-completable `Future`s
/// In this context *"deadlock"* is being used to refer to an *un-completable future*, so as long as the future can be cancelled or awaiting can be stopped it's not going to hang up the task obvs; there's no syncronous blocking in the API.
#[derive(Debug)]
pub struct CondVar<T: ?Sized>
{
/// The condition to wait on and notify to.
cond: Notify,
/// The (possibly empty) variable slot that is read after or written to before.
var: RwLock<Option<Box<T>>>, // XXX:
}
unsafe impl<T: ?Sized + Send> Sync for CondVar<T>{}
impl<T: ?Sized + Send> CondVar<T> {
compile_error!("TODO: XXX: Re-do this, we don't want to just re-implement the sync condvar interface because there's no real need to. Hell, do we even need the condvar at all? Can we just use a `Arc<Notify>` for the purposes of this module \
(XXX: i.e. \"backing sync thread **notifies** a running async task holding a progress handle to send a `Resize` command when a `SIGWINCH` signal is received on it\" is **literally** what we want this module to do.)");
async fn wait_raw_while<P, Fut>(&self, mut pred: P, cancel: Fut) -> bool
where P: AsyncFnMut(Option<&T>) -> bool, //TODO: How to impl this with the async mutex?
Fut: Future
{
use futures::FutureExt;
let (cancel, token) = {
let (handle, reg) = AbortHandle::new_pair();
(cancel.map(move |_| handle.abort()), reg)
};
let waiter = async {
loop {
// Check the predicate
let waiter = {
let value = self.var.read().await;
let waiter = self.cond.notified(); // NOTE: We want to check if there is a notification *after* acquiring (**and** releasing) the read lock has been successful; so that if there was a writer that yielded to us, we get their notification immediately (see below.) (XXX: I don't know if calling this function without polling it here instead of below (after the read lock has been released) changes anything; if it does, move this down below. It's only been defined here instead as a visual guide to the ordering.)
if !pred(value.as_deref()).await {
break false;
} else {
drop(value); // NOTE: That we release the `read` lock *before* checking if the notification comes through, so the notifier can notify us *before* dropping their `write` lock.
}
waiter
};
// Check if we've been notified
//
// TODO: XXX: Would this become a spinning loop if there are no current nofification operations goin on since we're not actually `await`ing on the notification itself at all? Is there any real way to prevent this???
if let Some(_) = waiter.now_or_never() {
break true;
}
tokio::task::yield_now().await; // XXX: Do we actually want to yield this here...?
}
};
// We have waited for the validation through `pred`, and a notification has arrived to us.
tokio::select! {
cont = waiter => {
cont
}
_ = cancel => {
false
}
}
}
async fn notify_raw_with<P, Fut>(&self, mut setter: P) -> bool
where P: AsyncFnMut(&mut Option<Box<T>>) -> bool,
Fut: Future
{
let mut value = self.var.write().await;
if !setter(&mut value).await {
return false;
}
self.cond.notify();
drop(value); // NOTE: We do not release the write lock until *after* we have notified, so a currently blocking reader will immediately get the notification
true
}
//TODO: Implement CondVar's `async wait(pred, tok) -> Option<impl AsRef<T> + '_>` & `async notify(value: T)`
//TODO: & also `async wait_owned(Arc<Self>, ...)` & `async notify_owned(Arc<Self>)` too (XXX: also `&mut self` receiver funcs can *statically* guarantee that there is no other pending waiter/sender and can thus access the value directly; which is *dynamically* guaranteed with the `Arc<Self>` receiver funcs.)
}

@ -147,7 +147,7 @@ where I: IntoIterator<Item=T>,
#[cfg(feature="progress-reactive")]
let _size_handle = {
let mut progress = progress.clone();
todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{None} to `progress` when an event comes in.");
todo!("progress-reactive: Spawn the `signal_hooks` thread and set up the Tokio `CondVar` to wait on in a background task, sending `Resize{{to: None}} to `progress` when an event comes in.");
};
let display = {

Loading…
Cancel
Save