From d3c43b323bb51e68086745ce9a56fddb6c9c8a1b Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 1 Aug 2021 16:27:43 +0100 Subject: [PATCH] Added task cancelling module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for rsh's current commit: Future blessing − 末吉 --- Cargo.lock | 1 + Cargo.toml | 1 + src/cancel.rs | 187 +++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 + src/message.rs | 6 +- src/pipeline.rs | 7 ++ src/pipeline/pubsub.rs | 22 +++++ src/sock.rs | 11 +++ 8 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 src/cancel.rs create mode 100644 src/pipeline.rs create mode 100644 src/pipeline/pubsub.rs create mode 100644 src/sock.rs diff --git a/Cargo.lock b/Cargo.lock index a9fd4a9..7e78deb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,6 +777,7 @@ dependencies = [ "chacha20stream", "color-eyre", "cryptohelpers", + "futures", "pin-project", "rustc_version", "serde", diff --git a/Cargo.toml b/Cargo.toml index 833c252..96c733c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" chacha20stream = { version = "1.0.3", features = ["async"] } color-eyre = "0.5.11" cryptohelpers = { version = "1.8.1" , features = ["serialise", "full"] } +futures = "0.3.16" pin-project = "1.0.8" serde = { version = "1.0.126", features = ["derive"] } serde_cbor = "0.11.1" diff --git a/src/cancel.rs b/src/cancel.rs new file mode 100644 index 0000000..08d0130 --- /dev/null +++ b/src/cancel.rs @@ -0,0 +1,187 @@ +//! Cancelling async operations +use tokio::sync::{ + watch, +}; +use futures::Future; +use std::{ + pin::Pin, + task::{ + Poll, + Context, + } +}; +use std::marker::PhantomData; + +macro_rules! with_cancel { + ($block:expr, $tok:expr) => { + { + ::tokio::select!{ + _ = $tok => { + Err($crate::cancel::TaskCancelledError) + } + a = $block => { + Ok(a) + } + } + } + }; +} + +async fn _test(tok: T) +where T: CancelFuture +{ + with_cancel!(async move { + 123 + }, tok).unwrap(); +} + +/// Error returned for cancelled `with_cancel!` blocks. +#[derive(Debug)] +pub struct TaskCancelledError; + +/// A future used for cancelling an operation +pub trait CancelFuture: Future{} + +impl CancelFuture for F +where F: Future{} + +/// A token that can be awaited. +#[derive(Debug, Clone)] +pub struct CancellationToken(watch::Receiver); + +/// A source of `CancellationToken`s. +#[derive(Debug)] +pub struct CancellationTokenSource(watch::Sender, watch::Receiver); + +impl CancellationTokenSource +{ + /// Create a new cancellation token source + pub fn new() -> Self + { + let (tx, rx) = watch::channel(false); + Self(tx, rx) + } + + /// Create a new token + pub fn create_token(&self) -> CancellationToken + { + CancellationToken(self.1.clone()) + } + + /// Instruct all tokens to cancel. + pub fn cancel(&mut self) + { + let _ = self.0.broadcast(true); + } + + /// Instruct all tokens to cancel, then drop this source. + /// + /// When a source is dropped without cancelling, its tokens never complete. + #[inline] pub fn cancel_consume(mut self) + { + self.cancel(); + } + + /// Has this source been signalled? + pub fn is_cancelled(&self) -> bool + { + *self.1.borrow() + } +} + +impl CancellationToken +{ + /// Try to run this future, cancel it an return error if this token is signalled, consuming the token in the process. + pub fn try_run_owned<'a, F: 'a, T>(self, fut: F) -> impl Future> + 'a + where F: Future + { + async move { + with_cancel!(fut, self.into_wait_on()) + } + } + /// Try to run this future, cancel it an return error if this token is signalled. + pub async fn try_run(&mut self, fut: F) -> Result + where F: Future + { + with_cancel!(fut, self.wait_on()) + } + /// Has this token been signalled? + pub fn is_cancelled(&self) -> bool + { + *self.0.borrow() + } + + /// Panic the current task if this token has signalled cancelled. + #[inline(always)] pub fn panic_if_cancelled(&self) + { + #[inline(never)] + #[cold] + fn inner_panic() -> ! + { + panic!("Task cancelled") + + } + if self.is_cancelled() { + inner_panic(); + } + } + + /// Wait for cancel to be + pub async fn wait_on(&mut self) + { + if !*self.0.borrow() { + while !match self.0.recv().await { + Some(v) => v, + None => return NeverFuture::never().await, + } { + tokio::task::yield_now().await; + } + } + } + /// Consume into waiter. + pub fn into_wait_on(mut self) -> impl CancelFuture + 'static + { + async move { + if !*self.0.borrow() { + while !match self.0.recv().await { + Some(v) => v, + None => return NeverFuture::never().await, + } { + tokio::task::yield_now().await; + } + } + } + } +} + +impl Future for CancellationToken +{ + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let fut = self.get_mut().wait_on(); + tokio::pin!(fut); + fut.poll(cx) + } +} + +/// A future that never completes, and never queues itself for awakening. +#[derive(Debug, Clone)] +pub struct NeverFuture(PhantomData); + +impl NeverFuture +{ + /// Create a new never-completing future. + #[inline] pub const fn never() -> Self + { + Self(PhantomData) + } +} + +impl Future for NeverFuture +{ + type Output = T; + + #[inline(always)] fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } +} diff --git a/src/main.rs b/src/main.rs index 797ad9d..fe60f0a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,10 @@ use std::convert::{ mod ext; use ext::*; mod message; +mod cancel; +mod sock; +//mod pipeline; + #[tokio::main] async fn main() -> eyre::Result<()> { println!("Hello, world!"); diff --git a/src/message.rs b/src/message.rs index fb1326e..a2b5f52 100644 --- a/src/message.rs +++ b/src/message.rs @@ -69,7 +69,6 @@ macro_rules! accessor { } }; ($name:ident, $type:ty $(; $comment:literal)?) => (accessor!($name, $name, $type $(; $comment)?);); - } impl<'a, V: ?Sized> MessageHeader<'a, V> { @@ -238,6 +237,11 @@ impl Message impl SerializedMessage { + /// Get the message header + #[inline(always)] pub fn header(&self) -> MessageHeader<'_, V> + { + MessageHeader(&self.header, PhantomData) + } /// Consume into an async writer pub async fn into_writer_async(self, mut writer: W) -> eyre::Result { diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..3289d3c --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1,7 @@ +//! Piping messages around +use super::*; +use futures::Future; + +pub mod pubsub; + +pub fn pass_messages() diff --git a/src/pipeline/pubsub.rs b/src/pipeline/pubsub.rs new file mode 100644 index 0000000..e9ddc45 --- /dev/null +++ b/src/pipeline/pubsub.rs @@ -0,0 +1,22 @@ +//! Pub-sub traits +use super::*; +use std::any::TypeId; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{ + RwLock, + mpsc, +}; + +use message::Message; + +#[derive(Debug)] +struct SubListInner +{ + //TODO: how tf do we do this? + //subs: RwLock>>>>>, +} + +/// List of subscibers +#[derive(Debug, Clone)] +pub struct SubscriberListRef(Arc); diff --git a/src/sock.rs b/src/sock.rs new file mode 100644 index 0000000..706049c --- /dev/null +++ b/src/sock.rs @@ -0,0 +1,11 @@ +//! Socket handlers +use super::*; + +use tokio::io::{ + AsyncWrite, + AsyncRead +}; +use futures::Future; +use cancel::*; + +//pub fn handle_socket() -> impl Future<>