added Merge

master
Avril 3 years ago
parent ee89469b3f
commit b8fcdb094f
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -18,43 +18,8 @@ use crypt::{
RsaPrivateKey,
};
/// A type that implements both `AsyncWrite` and `AsyncRead`
pub trait AsyncStream: AsyncRead + AsyncWrite{}
impl<T: AsyncRead + AsyncWrite + ?Sized> AsyncStream for T{}
/// A type that can split itself into other types, and combine back from those types.
pub trait Split: Sized
{
/// First half of the split
type First;
/// Second half of the split
type Second;
fn split(self) -> (Self::First, Self::Second);
fn unsplit(a: Self::First, b: Self::Second) -> Self;
#[inline(always)] fn split_reverse(self) -> (Self::Second, Self::First)
{
let (tx, rx) = self.split();
(rx, tx)
}
#[inline(always)] fn unsplit_reverse(b: Self::Second, a: Self::First) -> Self
{
Self::unsplit(a, b)
}
}
impl<T, U> Split for (T, U)
{
type First = T;
type Second = U;
#[inline] fn split(self) -> (Self::First, Self::Second) {
self
}
#[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self {
(a, b)
}
}
//TODO: Add trait `SplitRef` for exchange, I guess?
mod traits;
pub use traits::*;
/// Combined Read + Write encryptable async stream.
///
@ -70,6 +35,20 @@ pub struct Stream<S>
#[pin] stream: S,
}
impl<Tx, Rx> Stream<Merge<Tx, Rx>>
where Tx: AsyncWrite,
Rx: AsyncRead
{
/// Merge an `AsyncWrite`, and `AsyncRead` stream into `Stream`.
pub fn merged(tx: Tx, rx: Rx) -> Self
{
Self {
meta: EncryptedStreamMeta::new(),
stream: Merge(tx, rx),
}
}
}
impl<S> Stream<S>
where S: Split,
S::First: AsyncWrite,
@ -104,7 +83,7 @@ impl<S: AsyncStream> Stream<S>
/// Create a split by cloning `S`.
pub fn split_clone(self) -> (WriteHalf<S>, ReadHalf<S>)
where S: Clone
where S: Clone
{
Stream {
stream: (self.stream.clone(), self.stream),
@ -219,6 +198,18 @@ struct EncryptedStreamMeta
them: Option<RsaPublicKey>,
}
impl EncryptedStreamMeta
{
/// Create a new meta with a newly generated private key.
#[inline(always)] pub fn new() -> Self
{
Self {
them: None,
us: crypt::generate(),
}
}
}
/// Writable half of `EncryptedStream`.
#[pin_project]
#[derive(Debug)]

@ -0,0 +1,101 @@
//! Stream traits
use super::*;
/// A type that implements both `AsyncWrite` and `AsyncRead`
pub trait AsyncStream: AsyncRead + AsyncWrite{}
impl<T: AsyncRead + AsyncWrite + ?Sized> AsyncStream for T{}
/// A type that can split itself into other types, and combine back from those types.
pub trait Split: Sized
{
/// First half of the split
type First;
/// Second half of the split
type Second;
fn split(self) -> (Self::First, Self::Second);
fn unsplit(a: Self::First, b: Self::Second) -> Self;
#[inline(always)] fn split_reverse(self) -> (Self::Second, Self::First)
{
let (tx, rx) = self.split();
(rx, tx)
}
#[inline(always)] fn unsplit_reverse(b: Self::Second, a: Self::First) -> Self
{
Self::unsplit(a, b)
}
}
impl<T, U> Split for (T, U)
{
type First = T;
type Second = U;
#[inline(always)] fn split(self) -> (Self::First, Self::Second) {
self
}
#[inline(always)] fn unsplit(a: Self::First, b: Self::Second) -> Self {
(a, b)
}
}
//TODO: Add trait `SplitRef` for exchange, I guess?
/// Merges a Read and Write stream in an implementor of `Split`, `AsyncRead`, and `Asyncwrite`.
///
/// Used for internal of `Stream`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(super) struct Merge<Tx, Rx>(pub Tx, pub Rx);
impl<Tx, Rx> Merge<Tx, Rx>
{
fn rx(self: Pin<&mut Self>) -> Pin<&mut Rx>
{
unsafe {self.map_unchecked_mut(|this| &mut this.1)}
}
fn tx(self: Pin<&mut Self>) -> Pin<&mut Tx>
{
unsafe {self.map_unchecked_mut(|this| &mut this.0)}
}
}
impl<Tx, Rx> Split for Merge<Tx, Rx>
{
type First = Tx;
type Second = Rx;
#[inline] fn split(self) -> (Self::First, Self::Second) {
(self.0, self.1)
}
#[inline] fn unsplit(a: Self::First, b: Self::Second) -> Self {
Self(a, b)
}
}
impl<Tx, Rx> AsyncWrite for Merge<Tx, Rx>
where Tx: AsyncWrite
{
#[inline] fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
self.tx().poll_write(cx, buf)
}
#[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.tx().poll_flush(cx)
}
#[inline] fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.tx().poll_flush(cx)
}
#[inline] fn poll_write_buf<B: Buf>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<Result<usize, io::Error>>
where
Self: Sized, {
self.tx().poll_write_buf(cx, buf)
}
}
impl<Tx, Rx> AsyncRead for Merge<Tx, Rx>
where Rx: AsyncRead
{
#[inline] fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.rx().poll_read(cx, buf)
}
#[inline] fn poll_read_buf<B: BufMut>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<io::Result<usize>>
where
Self: Sized, {
self.rx().poll_read_buf(cx, buf)
}
}
Loading…
Cancel
Save