diff --git a/src/cancel.rs b/src/cancel.rs index 08d0130..c96e9b0 100644 --- a/src/cancel.rs +++ b/src/cancel.rs @@ -10,9 +10,13 @@ use std::{ Context, } }; -use std::marker::PhantomData; +use std::marker::{ + PhantomData, + Send, +}; +use std::{fmt, error}; -macro_rules! with_cancel { +#[macro_export] macro_rules! with_cancel { ($block:expr, $tok:expr) => { { ::tokio::select!{ @@ -39,6 +43,17 @@ where T: CancelFuture #[derive(Debug)] pub struct TaskCancelledError; +impl error::Error for TaskCancelledError{} + +impl fmt::Display for TaskCancelledError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "async task was explicitly cancelled") + } +} + + /// A future used for cancelling an operation pub trait CancelFuture: Future{} @@ -77,7 +92,7 @@ impl CancellationTokenSource /// Instruct all tokens to cancel, then drop this source. /// /// When a source is dropped without cancelling, its tokens never complete. - #[inline] pub fn cancel_consume(mut self) + #[inline] pub fn into_cancel(mut self) { self.cancel(); } @@ -185,3 +200,16 @@ impl Future for NeverFuture Poll::Pending } } + +/// Spawn a new task with a cancellation future +pub fn spawn_with_cancel(fut: F, tok: C) -> tokio::task::JoinHandle> +where F: Future + Send + Unpin + 'static, + C: CancelFuture + Unpin + Send + 'static, + T: Send + 'static, +{ + use futures::prelude::*; + tokio::spawn(futures::future::select(fut, tok).map(|either| match either { + futures::future::Either::Left((complete, _)) => Ok(complete), + _ => Err(TaskCancelledError), + })) +} diff --git a/src/sock.rs b/src/sock.rs index 706049c..fae8891 100644 --- a/src/sock.rs +++ b/src/sock.rs @@ -5,7 +5,24 @@ use tokio::io::{ AsyncWrite, AsyncRead }; +use tokio::task::JoinHandle; use futures::Future; use cancel::*; -//pub fn handle_socket() -> impl Future<> +/// Handles a raw socket +pub fn handle_socket_with_shutdown(sock_read: R, sock_write: W, shutdown: C) -> JoinHandle> +where R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static +{ + tokio::spawn(async move { + match { + with_cancel!(async move { + //TODO: How to handle reads+writes? + Ok(()) + }, shutdown) + } { + Ok(v) => v, + Err(x) => Err(eyre::Report::from(x)), + } + }) +}