`progress-reactive`: Re-worked internal sync-to-async-signal barrier API to the much more simple and complete `reactive::Crosslink{,Sender,Receiver}`

Fortune for leanify-many's current commit: Half curse − 半凶
safe-cancel-interrupt
Avril 3 weeks ago
parent a4f288fd33
commit 9412636a0f
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -2,7 +2,12 @@
use super::*; use super::*;
use std::{ use std::{
ops,
thread, thread,
sync::{
Arc,
Weak,
},
}; };
use tokio::{ use tokio::{
sync::{ sync::{
@ -12,97 +17,275 @@ use tokio::{
}, },
}; };
use futures::{ use futures::{
prelude::*,
stream::{
self,
Stream,
},
future::{ future::{
Aborted, Abortable, abortable, AbortHandle, Aborted, Abortable, abortable, AbortHandle,
Shared,
WeakShared,
Remote, RemoteHandle,
}, },
}; };
/// An async condition variable that holds access to a potential value. /// Inner type for sending pulse signal from sync backing thread to async task.
/// #[derive(Debug, Default)]
/// Notification is *single* task only, for now there is no `notify_all()` implementation on this. pub(super) struct Crosslink
///
/// # Ownership and value transference
/// To pass a value through the condvar, `Arc<CondVar<T>>` can be used to simplify the ownership value-awaiting model to prevent deadlocks caused by dropped possible communicators.
/// Generally however, if the `CondVar<T>` is accessible via shared reference (non-exclusive ownership held) upon a waiting operation, it will assume it is `Sync`-accessible by shared reference so you must ensure that deadlocks cannot be reached when waiting.
///
/// ## Note on API's potential to create ~asyncronous~ un-completable `Future`s
/// In this context *"deadlock"* is being used to refer to an *un-completable future*, so as long as the future can be cancelled or awaiting can be stopped it's not going to hang up the task obvs; there's no syncronous blocking in the API.
#[derive(Debug)]
pub struct CondVar<T: ?Sized>
{ {
/// The condition to wait on and notify to. pub notification: Notify,
cond: Notify,
/// The (possibly empty) variable slot that is read after or written to before.
var: RwLock<Option<Box<T>>>, // XXX:
} }
unsafe impl<T: ?Sized + Send> Sync for CondVar<T>{} /// Sends pulse signals synchonously to an async
#[derive(Debug, Clone)]
pub struct CrosslinkSender(Weak<Crosslink>);
impl<T: ?Sized + Send> CondVar<T> { /// Receives pulse signals in a way that can be `await`ed for.
compile_error!("TODO: XXX: Re-do this, we don't want to just re-implement the sync condvar interface because there's no real need to. Hell, do we even need the condvar at all? Can we just use a `Arc<Notify>` for the purposes of this module \ #[derive(Debug)] // NOTE: Not `Clone`, dealing with multiple receivers is too much of a headache with no `notify_all()`.
(XXX: i.e. \"backing sync thread **notifies** a running async task holding a progress handle to send a `Resize` command when a `SIGWINCH` signal is received on it\" is **literally** what we want this module to do.)"); pub struct CrosslinkReceiver(Arc<Crosslink>);
async fn wait_raw_while<P, Fut>(&self, mut pred: P, cancel: Fut) -> bool impl Crosslink
where P: AsyncFnMut(Option<&T>) -> bool, //TODO: How to impl this with the async mutex? {
Fut: Future /// Consumes this owned reference into a `Clone`able future that can be waited on by multiple tasks at once.
#[inline]
pub fn waiter_shared(self: Arc<Self>) -> impl Future<Output = ()> + Clone + Send + Sync + Unpin + Sized + 'static
{ {
use futures::FutureExt; async move {
let (cancel, token) = { self.notification.notified().await
let (handle, reg) = AbortHandle::new_pair(); }.shared()
}
(cancel.map(move |_| handle.abort()), reg) /// Create a `Clone`able future that can be waited on by multiple tasks at once, **but** is still lifetime-bound by-ref to the instance.
}; ///
/// # Outliving the owner
/// If a `'static` lifetime bound is required (e.g. due to spawning on a non-local task-set,) use `waiter_shared()` on `Arc<Self>`.
#[inline]
pub fn create_waiter_shared(&self) -> impl Future<Output = ()> + Clone + Send + Sync + Unpin + Sized + use<'_>
{
self.notification.notified().shared()
}
/// Split the link into `(tx, rx)` pair.
///
/// # Example usage
/**```
# use std::sync::Arc;
# use leanify_many::progress::reactive::*;
let (tx, rx) = Arc::new(Crosslink::default()).into_split();
let rx = tokio::spawn!(async move {
let rx = rx.into_stream();
let mut n = 0usize;
while let Some(_) = rx.next().await {
n+=1;
println!("Received notification {n} time(s)!");
}
println!("Sender(s) all gone!");
});
// Notify the backing task twice.
tx.notify();
tx.notify();
// Drop the sender, and wait for the backing task to exit.
drop(tx);
rx.await.unwrap();
```*/
#[inline]
pub(super) fn into_split(self: Arc<Self>) -> (CrosslinkSender, CrosslinkReceiver)
{
let tx = Arc::downgrade(&self);
(CrosslinkSender(tx), CrosslinkReceiver(self))
}
}
let waiter = async { impl CrosslinkReceiver
loop { {
// Check the predicate fn has_senders(&self) -> bool
let waiter = { {
let value = self.var.read().await; Arc::weak_count(&self.0) != 0
let waiter = self.cond.notified(); // NOTE: We want to check if there is a notification *after* acquiring (**and** releasing) the read lock has been successful; so that if there was a writer that yielded to us, we get their notification immediately (see below.) (XXX: I don't know if calling this function without polling it here instead of below (after the read lock has been released) changes anything; if it does, move this down below. It's only been defined here instead as a visual guide to the ordering.) }
if !pred(value.as_deref()).await { /// Consume receiver into a `Stream` that yields each notification on `.next()`.
break false; ///
/// The stream ends when it is determined that there can be no more signals sent.
pub fn into_stream(self) -> impl Stream<Item = ()> + Send + Sync + 'static
{
stream::unfold(self, move |state| async move {
if state.has_senders() { // If there are more than 0 senders (weak references.)
state.0.notification.notified().await; // Wait for a notification. (XXX: This may not complete if all senders drop *while* it's being waited on.)
} else { } else {
drop(value); // NOTE: That we release the `read` lock *before* checking if the notification comes through, so the notifier can notify us *before* dropping their `write` lock. return None;
} }
waiter // If there are no senders left (i.e. we received a notification from the final sender `Drop`ing) we do not want to yield an element but end the stream.
}; state.has_senders().then(move || ((), state))
})
}
/// Wait for a notification or for there to be no senders left.
///
/// Note that this *will* complete spuriously if it is the final receiver and the final sender is dropped, however it **also** *may* complete spuriously before that.
///
/// (This future is cancel-safe.)
#[inline]
#[must_use]
pub async fn try_wait(&self) -> bool
{
if self.has_senders() {
return false;
}
self.0.notification.notified().await;
self.has_senders()
}
// Check if we've been notified /// Wait for a notification to be sent.
// ///
// TODO: XXX: Would this become a spinning loop if there are no current nofification operations goin on since we're not actually `await`ing on the notification itself at all? Is there any real way to prevent this??? /// # Panics
if let Some(_) = waiter.now_or_never() { /// If a signal is not received before the last sender is dropped.
break true; pub fn wait(&self) -> impl Future<Output = ()> + Send + Sync + '_
{
#[inline(never)]
#[cold]
fn _panic_no_senders() -> !
{
panic!("no senders left that can signal")
} }
self.try_wait().map(|r| if !r {
tokio::task::yield_now().await; // XXX: Do we actually want to yield this here...? })
} }
/// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders.
///
/// # Safety
/// This function will return a non-completable future if there are already no senders when it is called.
/// It may be preferable to use `try_wait_unsafe()` instead, (as that returns `ready()` if there are none instead of `pending()`.)
#[inline(always)]
fn wait_unsafe(&self) -> impl Future + Send + Sync + '_
{
self.0.notification.notified()
}
/// Wait for a notification to be sent or a final sender to be dropped without monitoring the number of senders.
///
/// # Safety
/// This function will return an immediately completed function if there are no senders when it is called.
/// But when the returned future completes it cannot be differentiated between an actual intentional `Sender::notify()` call, or if it's from the final sender being dropped.
#[inline]
fn try_wait_unsafe(&self) -> impl Future + Send + Sync + '_
{
if ! self.has_senders() {
future::ready(()).left_future()
} else {
self.0.notification.notified().right_future()
}
}
}
impl ops::Drop for CrosslinkSender
{
fn drop(&mut self)
{
// This is the last sender, dropping now.
if self.is_last_sender() {
let n = {
// Remove the last sender from the receiver's view.
let this = std::mem::replace(&mut self.0, Weak::new());
// So we will tell the receiver to wake up.
let Some(n) = this.upgrade() else {
// If there are any...
return
}; };
// We have waited for the validation through `pred`, and a notification has arrived to us. // Ensure there are no living senders it can see, before...
tokio::select! { drop(this);
cont = waiter => { n
cont };
// ...we wake it up, so it knows to die.
n.notification.notify();
}
}
}
impl CrosslinkSender
{
#[inline(always)]
fn has_receivers(&self) -> bool
{
Weak::strong_count(&self.0) > 0
} }
_ = cancel => {
false #[inline(always)]
pub fn is_last_sender(&self) -> bool
{
Weak::weak_count(&self.0) == 1
} }
/// If there are receivers that can be notified.
///
/// # **Non-atomic** operations
/// Note that it is still possible for `notify()` to panic if called after checking this, due to the lack of atomicity.
/// If atomicity is needed, either use `try_map_notify()` (or, if atomicity isn't needed, just ignore the result of `try_notify()` failing instead.)
#[inline]
pub fn can_notify(&self) -> bool
{
self.has_receivers()
} }
/// If there are any receivers, returns a thunk that when called will notify one.
///
/// # Usage
/// It is **not** intended for the returned function be kept around long, it is entirely possible that by the time the function is invoked, there are no receivers left.
/// The function will attempt to notify, and if there was no receiver to notify, this will be ignored.
///
/// The intended use is that if there is some work that needs to be done before sending the signal, but that can be skipped if there is no signal to send, the check can be made via a call to this method, and the signal can be sent by calling the returned thunk.
/** ```
# use std::sync::Arc;
# use leanify_many::progress::reactive::*;
# let (tx, _rx) = Arc::new(Crosslink::default()).into_split();
if let Some(notify) = tx.try_map_notify() {
/* ...do some work before sending the signal... */
notify();
}
```*/
#[inline]
#[must_use]
fn try_map_notify(&self) -> Option<impl FnOnce() + Unpin + Send + '_>
{
self.0.upgrade().map(|s| move || s.notification.notify())
} }
async fn notify_raw_with<P, Fut>(&self, mut setter: P) -> bool /// Send a notification signal if possible.
where P: AsyncFnMut(&mut Option<Box<T>>) -> bool, ///
Fut: Future /// # Return value
/// If there was a receiver to notify.
#[must_use]
pub fn try_notify(&self) -> bool
{ {
let mut value = self.var.write().await; self.0.upgrade().map(|s| s.notification.notify())
if !setter(&mut value).await { .map(|_| true).unwrap_or(false)
return false;
} }
self.cond.notify();
drop(value); // NOTE: We do not release the write lock until *after* we have notified, so a currently blocking reader will immediately get the notification /// Send a notification signal
true ///
/// # Panics
/// If there are no receivers to notify (See [try_notify()](try_notify).)
pub fn notify(&self)
{
#[inline(never)]
#[cold]
fn _panic_no_waiters() -> !
{
panic!("attempted to notify no waiters")
}
if !self.try_notify() {
_panic_no_waiters()
}
} }
//TODO: Implement CondVar's `async wait(pred, tok) -> Option<impl AsRef<T> + '_>` & `async notify(value: T)`
//TODO: & also `async wait_owned(Arc<Self>, ...)` & `async notify_owned(Arc<Self>)` too (XXX: also `&mut self` receiver funcs can *statically* guarantee that there is no other pending waiter/sender and can thus access the value directly; which is *dynamically* guaranteed with the `Arc<Self>` receiver funcs.)
} }

Loading…
Cancel
Save