You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rsh/src/cancel.rs

216 lines
4.7 KiB

//! Cancelling async operations
use tokio::sync::{
watch,
};
use futures::Future;
use std::{
pin::Pin,
task::{
Poll,
Context,
}
};
use std::marker::{
PhantomData,
Send,
};
use std::{fmt, error};
#[macro_export] macro_rules! with_cancel {
($block:expr, $tok:expr) => {
{
::tokio::select!{
_ = $tok => {
Err($crate::cancel::TaskCancelledError)
}
a = $block => {
Ok(a)
}
}
}
};
}
async fn _test<T>(tok: T)
where T: CancelFuture
{
with_cancel!(async move {
123
}, tok).unwrap();
}
/// Error returned for cancelled `with_cancel!` blocks.
#[derive(Debug)]
pub struct TaskCancelledError;
impl error::Error for TaskCancelledError{}
impl fmt::Display for TaskCancelledError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "async task was explicitly cancelled")
}
}
/// A future used for cancelling an operation
pub trait CancelFuture: Future{}
impl<F: ?Sized> CancelFuture for F
where F: Future<Output = ()>{}
/// A token that can be awaited.
#[derive(Debug, Clone)]
pub struct CancellationToken(watch::Receiver<bool>);
/// A source of `CancellationToken`s.
#[derive(Debug)]
pub struct CancellationTokenSource(watch::Sender<bool>, watch::Receiver<bool>);
impl CancellationTokenSource
{
/// Create a new cancellation token source
pub fn new() -> Self
{
let (tx, rx) = watch::channel(false);
Self(tx, rx)
}
/// Create a new token
pub fn create_token(&self) -> CancellationToken
{
CancellationToken(self.1.clone())
}
/// Instruct all tokens to cancel.
pub fn cancel(&mut self)
{
let _ = self.0.broadcast(true);
}
/// Instruct all tokens to cancel, then drop this source.
///
/// When a source is dropped without cancelling, its tokens never complete.
#[inline] pub fn into_cancel(mut self)
{
self.cancel();
}
/// Has this source been signalled?
pub fn is_cancelled(&self) -> bool
{
*self.1.borrow()
}
}
impl CancellationToken
{
/// Try to run this future, cancel it an return error if this token is signalled, consuming the token in the process.
pub fn try_run_owned<'a, F: 'a, T>(self, fut: F) -> impl Future<Output=Result<T, TaskCancelledError>> + 'a
where F: Future<Output=T>
{
async move {
with_cancel!(fut, self.into_wait_on())
}
}
/// Try to run this future, cancel it an return error if this token is signalled.
pub async fn try_run<F, T>(&mut self, fut: F) -> Result<T, TaskCancelledError>
where F: Future<Output=T>
{
with_cancel!(fut, self.wait_on())
}
/// Has this token been signalled?
pub fn is_cancelled(&self) -> bool
{
*self.0.borrow()
}
/// Panic the current task if this token has signalled cancelled.
#[inline(always)] pub fn panic_if_cancelled(&self)
{
#[inline(never)]
#[cold]
fn inner_panic() -> !
{
panic!("Task cancelled")
}
if self.is_cancelled() {
inner_panic();
}
}
/// Wait for cancel to be
pub async fn wait_on(&mut self)
{
if !*self.0.borrow() {
while !match self.0.recv().await {
Some(v) => v,
None => return NeverFuture::never().await,
} {
tokio::task::yield_now().await;
}
}
}
/// Consume into waiter.
pub fn into_wait_on(mut self) -> impl CancelFuture + 'static
{
async move {
if !*self.0.borrow() {
while !match self.0.recv().await {
Some(v) => v,
None => return NeverFuture::never().await,
} {
tokio::task::yield_now().await;
}
}
}
}
}
impl Future for CancellationToken
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let fut = self.get_mut().wait_on();
tokio::pin!(fut);
fut.poll(cx)
}
}
/// A future that never completes, and never queues itself for awakening.
#[derive(Debug, Clone)]
pub struct NeverFuture<T=futures::never::Never>(PhantomData<T>);
impl<T> NeverFuture<T>
{
/// Create a new never-completing future.
#[inline] pub const fn never() -> Self
{
Self(PhantomData)
}
}
impl<T> Future for NeverFuture<T>
{
type Output = T;
#[inline(always)] fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
/// Spawn a new task with a cancellation future
pub fn spawn_with_cancel<F, C, T>(fut: F, tok: C) -> tokio::task::JoinHandle<Result<T, TaskCancelledError>>
where F: Future<Output=T> + Send + Unpin + 'static,
C: CancelFuture + Unpin + Send + 'static,
T: Send + 'static,
{
use futures::prelude::*;
tokio::spawn(futures::future::select(fut, tok).map(|either| match either {
futures::future::Either::Left((complete, _)) => Ok(complete),
_ => Err(TaskCancelledError),
}))
}