You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
4.7 KiB
216 lines
4.7 KiB
//! Cancelling async operations
|
|
use tokio::sync::{
|
|
watch,
|
|
};
|
|
use futures::Future;
|
|
use std::{
|
|
pin::Pin,
|
|
task::{
|
|
Poll,
|
|
Context,
|
|
}
|
|
};
|
|
use std::marker::{
|
|
PhantomData,
|
|
Send,
|
|
};
|
|
use std::{fmt, error};
|
|
|
|
#[macro_export] macro_rules! with_cancel {
|
|
($block:expr, $tok:expr) => {
|
|
{
|
|
::tokio::select!{
|
|
_ = $tok => {
|
|
Err($crate::cancel::TaskCancelledError)
|
|
}
|
|
a = $block => {
|
|
Ok(a)
|
|
}
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
async fn _test<T>(tok: T)
|
|
where T: CancelFuture
|
|
{
|
|
with_cancel!(async move {
|
|
123
|
|
}, tok).unwrap();
|
|
}
|
|
|
|
/// Error returned for cancelled `with_cancel!` blocks.
|
|
#[derive(Debug)]
|
|
pub struct TaskCancelledError;
|
|
|
|
impl error::Error for TaskCancelledError{}
|
|
|
|
impl fmt::Display for TaskCancelledError
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
write!(f, "async task was explicitly cancelled")
|
|
}
|
|
}
|
|
|
|
|
|
/// A future used for cancelling an operation
|
|
pub trait CancelFuture: Future{}
|
|
|
|
impl<F: ?Sized> CancelFuture for F
|
|
where F: Future<Output = ()>{}
|
|
|
|
/// A token that can be awaited.
|
|
#[derive(Debug, Clone)]
|
|
pub struct CancellationToken(watch::Receiver<bool>);
|
|
|
|
/// A source of `CancellationToken`s.
|
|
#[derive(Debug)]
|
|
pub struct CancellationTokenSource(watch::Sender<bool>, watch::Receiver<bool>);
|
|
|
|
impl CancellationTokenSource
|
|
{
|
|
/// Create a new cancellation token source
|
|
pub fn new() -> Self
|
|
{
|
|
let (tx, rx) = watch::channel(false);
|
|
Self(tx, rx)
|
|
}
|
|
|
|
/// Create a new token
|
|
pub fn create_token(&self) -> CancellationToken
|
|
{
|
|
CancellationToken(self.1.clone())
|
|
}
|
|
|
|
/// Instruct all tokens to cancel.
|
|
pub fn cancel(&mut self)
|
|
{
|
|
let _ = self.0.broadcast(true);
|
|
}
|
|
|
|
/// Instruct all tokens to cancel, then drop this source.
|
|
///
|
|
/// When a source is dropped without cancelling, its tokens never complete.
|
|
#[inline] pub fn into_cancel(mut self)
|
|
{
|
|
self.cancel();
|
|
}
|
|
|
|
/// Has this source been signalled?
|
|
pub fn is_cancelled(&self) -> bool
|
|
{
|
|
*self.1.borrow()
|
|
}
|
|
}
|
|
|
|
impl CancellationToken
|
|
{
|
|
/// Try to run this future, cancel it an return error if this token is signalled, consuming the token in the process.
|
|
pub fn try_run_owned<'a, F: 'a, T>(self, fut: F) -> impl Future<Output=Result<T, TaskCancelledError>> + 'a
|
|
where F: Future<Output=T>
|
|
{
|
|
async move {
|
|
with_cancel!(fut, self.into_wait_on())
|
|
}
|
|
}
|
|
/// Try to run this future, cancel it an return error if this token is signalled.
|
|
pub async fn try_run<F, T>(&mut self, fut: F) -> Result<T, TaskCancelledError>
|
|
where F: Future<Output=T>
|
|
{
|
|
with_cancel!(fut, self.wait_on())
|
|
}
|
|
/// Has this token been signalled?
|
|
pub fn is_cancelled(&self) -> bool
|
|
{
|
|
*self.0.borrow()
|
|
}
|
|
|
|
/// Panic the current task if this token has signalled cancelled.
|
|
#[inline(always)] pub fn panic_if_cancelled(&self)
|
|
{
|
|
#[inline(never)]
|
|
#[cold]
|
|
fn inner_panic() -> !
|
|
{
|
|
panic!("Task cancelled")
|
|
|
|
}
|
|
if self.is_cancelled() {
|
|
inner_panic();
|
|
}
|
|
}
|
|
|
|
/// Wait for cancel to be
|
|
pub async fn wait_on(&mut self)
|
|
{
|
|
if !*self.0.borrow() {
|
|
while !match self.0.recv().await {
|
|
Some(v) => v,
|
|
None => return NeverFuture::never().await,
|
|
} {
|
|
tokio::task::yield_now().await;
|
|
}
|
|
}
|
|
}
|
|
/// Consume into waiter.
|
|
pub fn into_wait_on(mut self) -> impl CancelFuture + 'static
|
|
{
|
|
async move {
|
|
if !*self.0.borrow() {
|
|
while !match self.0.recv().await {
|
|
Some(v) => v,
|
|
None => return NeverFuture::never().await,
|
|
} {
|
|
tokio::task::yield_now().await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Future for CancellationToken
|
|
{
|
|
type Output = ();
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let fut = self.get_mut().wait_on();
|
|
tokio::pin!(fut);
|
|
fut.poll(cx)
|
|
}
|
|
}
|
|
|
|
/// A future that never completes, and never queues itself for awakening.
|
|
#[derive(Debug, Clone)]
|
|
pub struct NeverFuture<T=futures::never::Never>(PhantomData<T>);
|
|
|
|
impl<T> NeverFuture<T>
|
|
{
|
|
/// Create a new never-completing future.
|
|
#[inline] pub const fn never() -> Self
|
|
{
|
|
Self(PhantomData)
|
|
}
|
|
}
|
|
|
|
impl<T> Future for NeverFuture<T>
|
|
{
|
|
type Output = T;
|
|
|
|
#[inline(always)] fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// Spawn a new task with a cancellation future
|
|
pub fn spawn_with_cancel<F, C, T>(fut: F, tok: C) -> tokio::task::JoinHandle<Result<T, TaskCancelledError>>
|
|
where F: Future<Output=T> + Send + Unpin + 'static,
|
|
C: CancelFuture + Unpin + Send + 'static,
|
|
T: Send + 'static,
|
|
{
|
|
use futures::prelude::*;
|
|
tokio::spawn(futures::future::select(fut, tok).map(|either| match either {
|
|
futures::future::Either::Left((complete, _)) => Ok(complete),
|
|
_ => Err(TaskCancelledError),
|
|
}))
|
|
}
|