//! Cancelling async operations use tokio::sync::{ watch, }; use futures::Future; use std::{ pin::Pin, task::{ Poll, Context, } }; use std::marker::{ PhantomData, Send, }; use std::{fmt, error}; #[macro_export] macro_rules! with_cancel { ($block:expr, $tok:expr) => { { ::tokio::select!{ _ = $tok => { Err($crate::cancel::TaskCancelledError) } a = $block => { Ok(a) } } } }; } async fn _test(tok: T) where T: CancelFuture { with_cancel!(async move { 123 }, tok).unwrap(); } /// Error returned for cancelled `with_cancel!` blocks. #[derive(Debug)] pub struct TaskCancelledError; impl error::Error for TaskCancelledError{} impl fmt::Display for TaskCancelledError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "async task was explicitly cancelled") } } /// A future used for cancelling an operation pub trait CancelFuture: Future{} impl CancelFuture for F where F: Future{} /// A token that can be awaited. #[derive(Debug, Clone)] pub struct CancellationToken(watch::Receiver); /// A source of `CancellationToken`s. #[derive(Debug)] pub struct CancellationTokenSource(watch::Sender, watch::Receiver); impl CancellationTokenSource { /// Create a new cancellation token source pub fn new() -> Self { let (tx, rx) = watch::channel(false); Self(tx, rx) } /// Create a new token pub fn create_token(&self) -> CancellationToken { CancellationToken(self.1.clone()) } /// Instruct all tokens to cancel. pub fn cancel(&mut self) { let _ = self.0.broadcast(true); } /// Instruct all tokens to cancel, then drop this source. /// /// When a source is dropped without cancelling, its tokens never complete. #[inline] pub fn into_cancel(mut self) { self.cancel(); } /// Has this source been signalled? pub fn is_cancelled(&self) -> bool { *self.1.borrow() } } impl CancellationToken { /// Try to run this future, cancel it an return error if this token is signalled, consuming the token in the process. pub fn try_run_owned<'a, F: 'a, T>(self, fut: F) -> impl Future> + 'a where F: Future { async move { with_cancel!(fut, self.into_wait_on()) } } /// Try to run this future, cancel it an return error if this token is signalled. pub async fn try_run(&mut self, fut: F) -> Result where F: Future { with_cancel!(fut, self.wait_on()) } /// Has this token been signalled? pub fn is_cancelled(&self) -> bool { *self.0.borrow() } /// Panic the current task if this token has signalled cancelled. #[inline(always)] pub fn panic_if_cancelled(&self) { #[inline(never)] #[cold] fn inner_panic() -> ! { panic!("Task cancelled") } if self.is_cancelled() { inner_panic(); } } /// Wait for cancel to be pub async fn wait_on(&mut self) { if !*self.0.borrow() { while !match self.0.recv().await { Some(v) => v, None => return NeverFuture::never().await, } { tokio::task::yield_now().await; } } } /// Consume into waiter. pub fn into_wait_on(mut self) -> impl CancelFuture + 'static { async move { if !*self.0.borrow() { while !match self.0.recv().await { Some(v) => v, None => return NeverFuture::never().await, } { tokio::task::yield_now().await; } } } } } impl Future for CancellationToken { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let fut = self.get_mut().wait_on(); tokio::pin!(fut); fut.poll(cx) } } /// A future that never completes, and never queues itself for awakening. #[derive(Debug, Clone)] pub struct NeverFuture(PhantomData); impl NeverFuture { /// Create a new never-completing future. #[inline] pub const fn never() -> Self { Self(PhantomData) } } impl Future for NeverFuture { type Output = T; #[inline(always)] fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { Poll::Pending } } /// Spawn a new task with a cancellation future pub fn spawn_with_cancel(fut: F, tok: C) -> tokio::task::JoinHandle> where F: Future + Send + Unpin + 'static, C: CancelFuture + Unpin + Send + 'static, T: Send + 'static, { use futures::prelude::*; tokio::spawn(futures::future::select(fut, tok).map(|either| match either { futures::future::Either::Left((complete, _)) => Ok(complete), _ => Err(TaskCancelledError), })) }