From b8fcdb094fed97901c4009e9c4f2235cd44d8443 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 21 Apr 2021 22:38:15 +0100 Subject: [PATCH] added Merge --- src/stream.rs | 67 +++++++++++++--------------- src/stream/traits.rs | 101 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 38 deletions(-) create mode 100644 src/stream/traits.rs diff --git a/src/stream.rs b/src/stream.rs index 2bd3379..85ec774 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -18,43 +18,8 @@ use crypt::{ RsaPrivateKey, }; -/// A type that implements both `AsyncWrite` and `AsyncRead` -pub trait AsyncStream: AsyncRead + AsyncWrite{} -impl AsyncStream for T{} - -/// A type that can split itself into other types, and combine back from those types. -pub trait Split: Sized -{ - /// First half of the split - type First; - /// Second half of the split - type Second; - - fn split(self) -> (Self::First, Self::Second); - fn unsplit(a: Self::First, b: Self::Second) -> Self; - - #[inline(always)] fn split_reverse(self) -> (Self::Second, Self::First) - { - let (tx, rx) = self.split(); - (rx, tx) - } - #[inline(always)] fn unsplit_reverse(b: Self::Second, a: Self::First) -> Self - { - Self::unsplit(a, b) - } -} -impl Split for (T, U) -{ - type First = T; - type Second = U; - #[inline] fn split(self) -> (Self::First, Self::Second) { - self - } - #[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self { - (a, b) - } -} -//TODO: Add trait `SplitRef` for exchange, I guess? +mod traits; +pub use traits::*; /// Combined Read + Write encryptable async stream. /// @@ -70,6 +35,20 @@ pub struct Stream #[pin] stream: S, } +impl Stream> +where Tx: AsyncWrite, + Rx: AsyncRead +{ + /// Merge an `AsyncWrite`, and `AsyncRead` stream into `Stream`. + pub fn merged(tx: Tx, rx: Rx) -> Self + { + Self { + meta: EncryptedStreamMeta::new(), + stream: Merge(tx, rx), + } + } +} + impl Stream where S: Split, S::First: AsyncWrite, @@ -104,7 +83,7 @@ impl Stream /// Create a split by cloning `S`. pub fn split_clone(self) -> (WriteHalf, ReadHalf) - where S: Clone + where S: Clone { Stream { stream: (self.stream.clone(), self.stream), @@ -219,6 +198,18 @@ struct EncryptedStreamMeta them: Option, } +impl EncryptedStreamMeta +{ + /// Create a new meta with a newly generated private key. + #[inline(always)] pub fn new() -> Self + { + Self { + them: None, + us: crypt::generate(), + } + } +} + /// Writable half of `EncryptedStream`. #[pin_project] #[derive(Debug)] diff --git a/src/stream/traits.rs b/src/stream/traits.rs new file mode 100644 index 0000000..f2a9e0b --- /dev/null +++ b/src/stream/traits.rs @@ -0,0 +1,101 @@ +//! Stream traits +use super::*; + +/// A type that implements both `AsyncWrite` and `AsyncRead` +pub trait AsyncStream: AsyncRead + AsyncWrite{} +impl AsyncStream for T{} + +/// A type that can split itself into other types, and combine back from those types. +pub trait Split: Sized +{ + /// First half of the split + type First; + /// Second half of the split + type Second; + + fn split(self) -> (Self::First, Self::Second); + fn unsplit(a: Self::First, b: Self::Second) -> Self; + + #[inline(always)] fn split_reverse(self) -> (Self::Second, Self::First) + { + let (tx, rx) = self.split(); + (rx, tx) + } + #[inline(always)] fn unsplit_reverse(b: Self::Second, a: Self::First) -> Self + { + Self::unsplit(a, b) + } +} +impl Split for (T, U) +{ + type First = T; + type Second = U; + #[inline(always)] fn split(self) -> (Self::First, Self::Second) { + self + } + #[inline(always)] fn unsplit(a: Self::First, b: Self::Second) -> Self { + (a, b) + } +} +//TODO: Add trait `SplitRef` for exchange, I guess? + +/// Merges a Read and Write stream in an implementor of `Split`, `AsyncRead`, and `Asyncwrite`. +/// +/// Used for internal of `Stream`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(super) struct Merge(pub Tx, pub Rx); + +impl Merge +{ + fn rx(self: Pin<&mut Self>) -> Pin<&mut Rx> + { + unsafe {self.map_unchecked_mut(|this| &mut this.1)} + } + fn tx(self: Pin<&mut Self>) -> Pin<&mut Tx> + { + unsafe {self.map_unchecked_mut(|this| &mut this.0)} + } +} + +impl Split for Merge +{ + type First = Tx; + type Second = Rx; + + #[inline] fn split(self) -> (Self::First, Self::Second) { + (self.0, self.1) + } + #[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self { + Self(a, b) + } +} +impl AsyncWrite for Merge +where Tx: AsyncWrite +{ + #[inline] fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.tx().poll_write(cx, buf) + } + #[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.tx().poll_flush(cx) + } + #[inline] fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.tx().poll_flush(cx) + } + #[inline] fn poll_write_buf(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll> + where + Self: Sized, { + self.tx().poll_write_buf(cx, buf) + } +} +impl AsyncRead for Merge +where Rx: AsyncRead +{ + #[inline] fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.rx().poll_read(cx, buf) + } + #[inline] fn poll_read_buf(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll> + where + Self: Sized, { + self.rx().poll_read_buf(cx, buf) + } +}