From 1bfa6bbd4ea913d7cd6d933ea034ce76214ccfd7 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 22 Aug 2021 15:15:18 +0100 Subject: [PATCH] bsock_reader(): Buffers and marshals bytes from network to fake socket. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TODO: Propagate socket read errors to the fake socket. Fortune for rsh's current commit: Half curse − 半凶 --- src/ext/sync.rs | 3 +-- src/sock/pipe.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/ext/sync.rs b/src/ext/sync.rs index 4b508fd..148ef38 100644 --- a/src/ext/sync.rs +++ b/src/ext/sync.rs @@ -57,8 +57,7 @@ pub trait TimeSyncExt<'a>: Future + Sized + 'a #[cfg(nightly)] impl<'a, F> TimeSyncExt<'a> for F where F: Future + 'a -{ - +{ //#[cfg(nightly)] type OutputFuture = impl Future + 'a; //#[cfg(not(nightly))] diff --git a/src/sock/pipe.rs b/src/sock/pipe.rs index 6c78173..d168803 100644 --- a/src/sock/pipe.rs +++ b/src/sock/pipe.rs @@ -28,6 +28,7 @@ use enc::{ pub const DEFAULT_BUFFER_SIZE: usize = 32; /// Task-based buffered piping to/from encrypted sockets. +#[derive(Debug)] pub struct BufferedESock { @@ -35,12 +36,19 @@ pub struct BufferedESock _backing: PhantomData>, } +/// Info for bsock tasks +#[derive(Debug, Clone, PartialEq, Eq)] +struct BSockTaskInfo +{ + bufsz: usize, +} + /// `tx`: ESock-wrapped network output socket. /// `rx`: Reading half of the user's fake stream //TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be written to, so I think making it opaque with a generic is fine. -fn bsock_writer<'a, Raw, Fake>(mut tx: ESockWriteHalf, mut rx: Fake) -> impl Future + 'a +fn bsock_writer<'a, Raw, Fake>(info: BSockTaskInfo, mut tx: ESockWriteHalf, mut rx: Fake) -> impl Future + 'a where Raw: AsyncWrite + Unpin + 'a, -Fake: AsyncRead + Unpin + 'a, + Fake: AsyncRead + Unpin + 'a, { async move { @@ -50,12 +58,49 @@ Fake: AsyncRead + Unpin + 'a, /// `rx`: Raw, unencrypted network input socket /// `tx`: Writing half of the user's `ESockReadHalf`'s rx stream, which wraps the receiver from this fake sender stream. //TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be read from, so I think making it opaque with a generic is fine. -fn bsock_reader<'a, Raw, Fake>(mut rx: Raw, mut tx: Fake) -> impl Future + 'a +fn bsock_reader<'a, Raw, Fake>(info: BSockTaskInfo, mut rx: Raw, mut tx: Fake) -> impl Future + 'a where Raw: AsyncRead + Unpin + 'a, Fake: AsyncWrite + Unpin + 'a { + use tokio::prelude::*; async move { - + let mut last_read_error = None; + let mut buffer = vec![0u8; info.bufsz]; + loop { + // Read into buffer. + let mut read; + let mut done=0; + while { + match rx.read(&mut buffer[done..]).await { + Ok(n) => {read = n; n > 0}, + Err(err) => {read = 0; last_read_error = Some(err); false}, + } + } { + done += read; + if done == info.bufsz { + // Buffer is full. + break; + } + } + + // Write the buffer + if done > 0 { + if let Err(write_err) = tx.write_all(&buffer[..done]).await { + //We don't want to propagate this error. If writing to the fake socket fails, then it has been dropped or closed some other way, and that is fine. Just report then exit the task. + eprintln!("bsock_reader: Failed to write to fake stream: {}", write_err); + break; + } + } + // Check if there was an error mid-way through reading the buffer + // XXX: Should we instead propagate read errors immediately, ignoring the partially-filled buffer? I think not, as we may get a valid partial buffer which the remote socket then disconnects (exits process) from afterwards. + if let Some(read_err) = last_read_error.take() + { + //TODO: How to propagate errors to `tx`? + eprintln!("bsock_reader: Failed to read from real stream: {}", read_err); + let _ = tx.shutdown().await; + break; + } + } } }