Added task cancelling module

Fortune for rsh's current commit: Future blessing − 末吉
specialisation
Avril 3 years ago
parent c98f1c5c6c
commit d3c43b323b
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
Cargo.lock generated

@ -777,6 +777,7 @@ dependencies = [
"chacha20stream", "chacha20stream",
"color-eyre", "color-eyre",
"cryptohelpers", "cryptohelpers",
"futures",
"pin-project", "pin-project",
"rustc_version", "rustc_version",
"serde", "serde",

@ -9,6 +9,7 @@ edition = "2018"
chacha20stream = { version = "1.0.3", features = ["async"] } chacha20stream = { version = "1.0.3", features = ["async"] }
color-eyre = "0.5.11" color-eyre = "0.5.11"
cryptohelpers = { version = "1.8.1" , features = ["serialise", "full"] } cryptohelpers = { version = "1.8.1" , features = ["serialise", "full"] }
futures = "0.3.16"
pin-project = "1.0.8" pin-project = "1.0.8"
serde = { version = "1.0.126", features = ["derive"] } serde = { version = "1.0.126", features = ["derive"] }
serde_cbor = "0.11.1" serde_cbor = "0.11.1"

@ -0,0 +1,187 @@
//! Cancelling async operations
use tokio::sync::{
watch,
};
use futures::Future;
use std::{
pin::Pin,
task::{
Poll,
Context,
}
};
use std::marker::PhantomData;
macro_rules! with_cancel {
($block:expr, $tok:expr) => {
{
::tokio::select!{
_ = $tok => {
Err($crate::cancel::TaskCancelledError)
}
a = $block => {
Ok(a)
}
}
}
};
}
async fn _test<T>(tok: T)
where T: CancelFuture
{
with_cancel!(async move {
123
}, tok).unwrap();
}
/// Error returned for cancelled `with_cancel!` blocks.
#[derive(Debug)]
pub struct TaskCancelledError;
/// A future used for cancelling an operation
pub trait CancelFuture: Future{}
impl<F: ?Sized> CancelFuture for F
where F: Future<Output = ()>{}
/// A token that can be awaited.
#[derive(Debug, Clone)]
pub struct CancellationToken(watch::Receiver<bool>);
/// A source of `CancellationToken`s.
#[derive(Debug)]
pub struct CancellationTokenSource(watch::Sender<bool>, watch::Receiver<bool>);
impl CancellationTokenSource
{
/// Create a new cancellation token source
pub fn new() -> Self
{
let (tx, rx) = watch::channel(false);
Self(tx, rx)
}
/// Create a new token
pub fn create_token(&self) -> CancellationToken
{
CancellationToken(self.1.clone())
}
/// Instruct all tokens to cancel.
pub fn cancel(&mut self)
{
let _ = self.0.broadcast(true);
}
/// Instruct all tokens to cancel, then drop this source.
///
/// When a source is dropped without cancelling, its tokens never complete.
#[inline] pub fn cancel_consume(mut self)
{
self.cancel();
}
/// Has this source been signalled?
pub fn is_cancelled(&self) -> bool
{
*self.1.borrow()
}
}
impl CancellationToken
{
/// Try to run this future, cancel it an return error if this token is signalled, consuming the token in the process.
pub fn try_run_owned<'a, F: 'a, T>(self, fut: F) -> impl Future<Output=Result<T, TaskCancelledError>> + 'a
where F: Future<Output=T>
{
async move {
with_cancel!(fut, self.into_wait_on())
}
}
/// Try to run this future, cancel it an return error if this token is signalled.
pub async fn try_run<F, T>(&mut self, fut: F) -> Result<T, TaskCancelledError>
where F: Future<Output=T>
{
with_cancel!(fut, self.wait_on())
}
/// Has this token been signalled?
pub fn is_cancelled(&self) -> bool
{
*self.0.borrow()
}
/// Panic the current task if this token has signalled cancelled.
#[inline(always)] pub fn panic_if_cancelled(&self)
{
#[inline(never)]
#[cold]
fn inner_panic() -> !
{
panic!("Task cancelled")
}
if self.is_cancelled() {
inner_panic();
}
}
/// Wait for cancel to be
pub async fn wait_on(&mut self)
{
if !*self.0.borrow() {
while !match self.0.recv().await {
Some(v) => v,
None => return NeverFuture::never().await,
} {
tokio::task::yield_now().await;
}
}
}
/// Consume into waiter.
pub fn into_wait_on(mut self) -> impl CancelFuture + 'static
{
async move {
if !*self.0.borrow() {
while !match self.0.recv().await {
Some(v) => v,
None => return NeverFuture::never().await,
} {
tokio::task::yield_now().await;
}
}
}
}
}
impl Future for CancellationToken
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let fut = self.get_mut().wait_on();
tokio::pin!(fut);
fut.poll(cx)
}
}
/// A future that never completes, and never queues itself for awakening.
#[derive(Debug, Clone)]
pub struct NeverFuture<T=futures::never::Never>(PhantomData<T>);
impl<T> NeverFuture<T>
{
/// Create a new never-completing future.
#[inline] pub const fn never() -> Self
{
Self(PhantomData)
}
}
impl<T> Future for NeverFuture<T>
{
type Output = T;
#[inline(always)] fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}

@ -23,6 +23,10 @@ use std::convert::{
mod ext; use ext::*; mod ext; use ext::*;
mod message; mod message;
mod cancel;
mod sock;
//mod pipeline;
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
println!("Hello, world!"); println!("Hello, world!");

@ -69,7 +69,6 @@ macro_rules! accessor {
} }
}; };
($name:ident, $type:ty $(; $comment:literal)?) => (accessor!($name, $name, $type $(; $comment)?);); ($name:ident, $type:ty $(; $comment:literal)?) => (accessor!($name, $name, $type $(; $comment)?););
} }
impl<'a, V: ?Sized> MessageHeader<'a, V> impl<'a, V: ?Sized> MessageHeader<'a, V>
{ {
@ -238,6 +237,11 @@ impl<V: ?Sized + MessageValue> Message<V>
impl<V: ?Sized + MessageValue> SerializedMessage<V> impl<V: ?Sized + MessageValue> SerializedMessage<V>
{ {
/// Get the message header
#[inline(always)] pub fn header(&self) -> MessageHeader<'_, V>
{
MessageHeader(&self.header, PhantomData)
}
/// Consume into an async writer /// Consume into an async writer
pub async fn into_writer_async<W:AsyncWrite+Unpin>(self, mut writer: W) -> eyre::Result<usize> pub async fn into_writer_async<W:AsyncWrite+Unpin>(self, mut writer: W) -> eyre::Result<usize>
{ {

@ -0,0 +1,7 @@
//! Piping messages around
use super::*;
use futures::Future;
pub mod pubsub;
pub fn pass_messages()

@ -0,0 +1,22 @@
//! Pub-sub traits
use super::*;
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{
RwLock,
mpsc,
};
use message::Message;
#[derive(Debug)]
struct SubListInner
{
//TODO: how tf do we do this?
//subs: RwLock<HashMap<TypeID, RwLock<Vec<mpsc::Sender<message::SerializedMessage<dyn message::MessageValue>>>>>>,
}
/// List of subscibers
#[derive(Debug, Clone)]
pub struct SubscriberListRef(Arc<SubListInner>);

@ -0,0 +1,11 @@
//! Socket handlers
use super::*;
use tokio::io::{
AsyncWrite,
AsyncRead
};
use futures::Future;
use cancel::*;
//pub fn handle_socket() -> impl Future<>
Loading…
Cancel
Save