Start: Raw socket handling function

Fortune for rsh's current commit: Middle blessing − 中吉
specialisation
Avril 3 years ago
parent d3c43b323b
commit 2308b61b58

@ -10,9 +10,13 @@ use std::{
Context, Context,
} }
}; };
use std::marker::PhantomData; use std::marker::{
PhantomData,
Send,
};
use std::{fmt, error};
macro_rules! with_cancel { #[macro_export] macro_rules! with_cancel {
($block:expr, $tok:expr) => { ($block:expr, $tok:expr) => {
{ {
::tokio::select!{ ::tokio::select!{
@ -39,6 +43,17 @@ where T: CancelFuture
#[derive(Debug)] #[derive(Debug)]
pub struct TaskCancelledError; pub struct TaskCancelledError;
impl error::Error for TaskCancelledError{}
impl fmt::Display for TaskCancelledError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "async task was explicitly cancelled")
}
}
/// A future used for cancelling an operation /// A future used for cancelling an operation
pub trait CancelFuture: Future{} pub trait CancelFuture: Future{}
@ -77,7 +92,7 @@ impl CancellationTokenSource
/// Instruct all tokens to cancel, then drop this source. /// Instruct all tokens to cancel, then drop this source.
/// ///
/// When a source is dropped without cancelling, its tokens never complete. /// When a source is dropped without cancelling, its tokens never complete.
#[inline] pub fn cancel_consume(mut self) #[inline] pub fn into_cancel(mut self)
{ {
self.cancel(); self.cancel();
} }
@ -185,3 +200,16 @@ impl<T> Future for NeverFuture<T>
Poll::Pending Poll::Pending
} }
} }
/// Spawn a new task with a cancellation future
pub fn spawn_with_cancel<F, C, T>(fut: F, tok: C) -> tokio::task::JoinHandle<Result<T, TaskCancelledError>>
where F: Future<Output=T> + Send + Unpin + 'static,
C: CancelFuture + Unpin + Send + 'static,
T: Send + 'static,
{
use futures::prelude::*;
tokio::spawn(futures::future::select(fut, tok).map(|either| match either {
futures::future::Either::Left((complete, _)) => Ok(complete),
_ => Err(TaskCancelledError),
}))
}

@ -5,7 +5,24 @@ use tokio::io::{
AsyncWrite, AsyncWrite,
AsyncRead AsyncRead
}; };
use tokio::task::JoinHandle;
use futures::Future; use futures::Future;
use cancel::*; use cancel::*;
//pub fn handle_socket() -> impl Future<> /// Handles a raw socket
pub fn handle_socket_with_shutdown<R, W, C: cancel::CancelFuture + 'static + Send>(sock_read: R, sock_write: W, shutdown: C) -> JoinHandle<eyre::Result<()>>
where R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static
{
tokio::spawn(async move {
match {
with_cancel!(async move {
//TODO: How to handle reads+writes?
Ok(())
}, shutdown)
} {
Ok(v) => v,
Err(x) => Err(eyre::Report::from(x)),
}
})
}

Loading…
Cancel
Save