From f2868d11fff32557da80685921596cc1fa65e613 Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 20 Apr 2021 01:52:14 +0100 Subject: [PATCH] added Stream and Stream::split() --- src/lib.rs | 7 ++- src/stream.rs | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 7772d35..b13b3e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,12 @@ mod stream; pub use stream::{ AsyncStream, -// EncryptedStream, + Stream, + // EncryptedStream, + WriteHalf, + EncryptedWriteHalf, + ReadHalf, + EncryptedReadHalf, }; diff --git a/src/stream.rs b/src/stream.rs index e910a59..5d73571 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -22,6 +22,142 @@ use crypt::{ pub trait AsyncStream: AsyncRead + AsyncWrite{} impl AsyncStream for T{} +/// A type that can split itself into other types, and combine back from those types. +pub trait Split: Sized +{ + /// First half of the split + type First; + /// Second half of the split + type Second; + + fn split(self) -> (Self::First, Self::Second); + fn unsplit(a: Self::First, b: Self::Second) -> Self; + + #[inline(always)] fn split_reverse(self) -> (Self::Second, Self::First) + { + let (tx, rx) = self.split(); + (rx, tx) + } + #[inline(always)] fn unsplit_reverse(b: Self::Second, a: Self::First) -> Self + { + Self::unsplit(a, b) + } +} +impl Split for (T, U) +{ + type First = T; + type Second = U; + #[inline] fn split(self) -> (Self::First, Self::Second) { + self + } + #[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self { + (a, b) + } +} + +/// Combined Read + Write encryptable async stream. +/// +/// # Exchange +/// A combined stream is the only way to exchange pubkeys and enabling the creation of encrypted read/write wrappers on the combined stream or splits. +#[pin_project] +#[derive(Debug)] +pub struct Stream +{ + meta: EncryptedStreamMeta, + #[pin] stream: S, +} + +impl Split for Stream +where S: Split, + S::First: AsyncWrite, + S::Second: AsyncRead +{ + type First = WriteHalf; + type Second = ReadHalf; + + #[inline] fn split(self) -> (Self::First, Self::Second) { + self.split() + } + #[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self { + Self::unsplit(a, b) + } +} + +impl Stream +where S: Split, + S::First: AsyncWrite, + S::Second: AsyncRead +{ + /// Combine a previously split `EncryptedStream`'s halves back into a single type. + /// + /// # Panics + /// If the two halves didn't originally come from the same `EncryptedStream`. + pub fn unsplit(tx: WriteHalf, rx: ReadHalf) -> Self + { + #[inline(never)] fn panic_not_ptr_eq() -> ! + { + panic!("Cannot join halves from different splits") + } + if !Arc::ptr_eq(&tx.meta, &rx.meta) { + panic_not_ptr_eq(); + } + + let WriteHalf { meta: _meta, backing_write: tx } = tx; + drop(_meta); + let ReadHalf { meta, backing_read: rx } = rx; + + let meta = Arc::try_unwrap(meta).unwrap(); + Self { + meta, + stream: S::unsplit(tx, rx), + } + } + /// Split this `EncryptedStream` into a read and a write half. + pub fn split(self) -> (WriteHalf, ReadHalf) + { + let meta = Arc::new(self.meta); + let (tx, rx) = self.stream.split(); + + (WriteHalf { + meta: meta.clone(), + backing_write: tx, + }, ReadHalf { + meta, + backing_read: rx, + }) + } +} + +impl AsyncRead for Stream +{ + #[inline] fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.project().stream.poll_read(cx, buf) + } + #[inline] fn poll_read_buf(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll> + where + Self: Sized, { + self.project().stream.poll_read_buf(cx, buf) + } +} + +impl AsyncWrite for Stream +{ + #[inline] fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.project().stream.poll_write(cx, buf) + } + #[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_flush(cx) + } + #[inline] fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_shutdown(cx) + } + #[inline] fn poll_write_buf(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll> + where + Self: Sized, { + self.project().stream.poll_write_buf(cx, buf) + } +} + /// Inner rsa data for encrypted stream read+write halves /// /// # Exchange / mutation @@ -30,6 +166,7 @@ impl AsyncStream for T{} /// Therefore exchange should happen before the original stream is split at all. /// /// Only the combined stream can mutate this structure. The halves hold it behind an immutable shared reference. +#[derive(Debug)] struct EncryptedStreamMeta { us: RsaPrivateKey, @@ -38,6 +175,7 @@ struct EncryptedStreamMeta /// Writable half of `EncryptedStream`. #[pin_project] +#[derive(Debug)] pub struct WriteHalf where S: AsyncWrite { @@ -160,6 +298,7 @@ impl<'a, S: AsyncWrite> AsyncWrite for EncryptedWriteHalf<'a, S> /// Readable half of `EncryptedStream`. #[pin_project] +#[derive(Debug)] pub struct ReadHalf where S: AsyncRead {