@ -18,9 +18,190 @@ use crypt::{
RsaPrivateKey ,
} ;
/// A type that implements both `AsyncWrite` and `AsyncRead`
pub trait AsyncStream : AsyncRead + AsyncWrite { }
impl < T : AsyncRead + AsyncWrite + ? Sized > AsyncStream for T { }
mod traits ;
pub use traits ::* ;
mod exchange ;
/// Combined Read + Write encryptable async stream.
///
/// The `AsyncRead` and `AsyncWrite` impls of this type forward to the backing impls for `S`.
///
/// # Exchange
/// A combined stream is the only way to exchange pubkeys and enabling the creation of encrypted read/write wrappers on the combined stream or splits.
#[ pin_project ]
#[ derive(Debug) ]
pub struct Stream < S >
{
meta : EncryptedStreamMeta ,
#[ pin ] stream : S ,
}
/// `Stream` with enabled encryption.
pub struct EncryptedStream < ' a , S >
{
read_cipher : Crypter ,
write_cipher : Crypter ,
write_crypt_buf_ptr : SliceMeta < u8 > ,
write_crypt_buffer : Vec < u8 > ,
backing : & ' a mut Stream < S > ,
}
impl < Tx , Rx > Stream < Merge < Tx , Rx > >
where Tx : AsyncWrite ,
Rx : AsyncRead
{
/// Exchange RSA keys through this stream.
pub async fn exchange ( & mut self ) -> io ::Result < ( ) >
{
todo! ( )
}
/// Merge an `AsyncWrite`, and `AsyncRead` stream into `Stream`.
pub fn merged ( tx : Tx , rx : Rx ) -> Self
{
Self {
meta : EncryptedStreamMeta ::new ( ) ,
stream : Merge ( tx , rx ) ,
}
}
}
/*
impl < S > Stream < S >
where S : Split ,
S ::First : AsyncWrite ,
S ::Second : AsyncRead
{
/// Create a new `Stream` from two streams, one implemetor of `AsyncWrite`, and one of `AsyncRead`.
pub fn new ( tx : S ::First , rx : S ::Second ) -> Self
{
Self {
meta : EncryptedStreamMeta {
them : None ,
us : crypt ::generate ( ) ,
} ,
stream : S ::unsplit ( tx , rx ) ,
}
}
}
impl < S : AsyncStream > Stream < S >
{
/// Create a new `Stream` from an implementor of both `AsyncRead` and `AsyncWrite`.
pub fn new_single ( stream : S ) -> Self
{
Self {
meta : EncryptedStreamMeta {
them : None ,
us : crypt ::generate ( ) ,
} ,
stream ,
}
}
/// Create a split by cloning `S`.
pub fn split_clone ( self ) -> ( WriteHalf < S > , ReadHalf < S > )
where S : Clone
{
Stream {
stream : ( self . stream . clone ( ) , self . stream ) ,
meta : self . meta
} . split ( )
}
} * /
impl < S > Split for Stream < S >
where S : Split ,
S ::First : AsyncWrite ,
S ::Second : AsyncRead
{
type First = WriteHalf < S ::First > ;
type Second = ReadHalf < S ::Second > ;
#[ inline ] fn split ( self ) -> ( Self ::First , Self ::Second ) {
self . split ( )
}
#[ inline ] fn unsplit ( a : Self ::First , b : Self ::Second ) -> Self {
Self ::unsplit ( a , b )
}
}
impl < S > Stream < S >
where S : Split ,
S ::First : AsyncWrite ,
S ::Second : AsyncRead
{
/// Combine a previously split `EncryptedStream`'s halves back into a single type.
///
/// # Panics
/// If the two halves didn't originally come from the same `EncryptedStream`.
pub fn unsplit ( tx : WriteHalf < S ::First > , rx : ReadHalf < S ::Second > ) -> Self
{
#[ inline(never) ] fn panic_not_ptr_eq ( ) -> !
{
panic! ( "Cannot join halves from different splits" )
}
if ! Arc ::ptr_eq ( & tx . meta , & rx . meta ) {
panic_not_ptr_eq ( ) ;
}
let WriteHalf { meta : _meta , backing_write : tx } = tx ;
drop ( _meta ) ;
let ReadHalf { meta , backing_read : rx } = rx ;
let meta = Arc ::try_unwrap ( meta ) . unwrap ( ) ;
Self {
meta ,
stream : S ::unsplit ( tx , rx ) ,
}
}
/// Split this `EncryptedStream` into a read and a write half.
pub fn split ( self ) -> ( WriteHalf < S ::First > , ReadHalf < S ::Second > )
{
let meta = Arc ::new ( self . meta ) ;
let ( tx , rx ) = self . stream . split ( ) ;
( WriteHalf {
meta : meta . clone ( ) ,
backing_write : tx ,
} , ReadHalf {
meta ,
backing_read : rx ,
} )
}
}
impl < S : AsyncRead > AsyncRead for Stream < S >
{
#[ inline ] fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < io ::Result < usize > > {
self . project ( ) . stream . poll_read ( cx , buf )
}
#[ inline ] fn poll_read_buf < B : BufMut > ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut B ) -> Poll < io ::Result < usize > >
where
Self : Sized , {
self . project ( ) . stream . poll_read_buf ( cx , buf )
}
}
impl < S : AsyncWrite > AsyncWrite for Stream < S >
{
#[ inline ] fn poll_write ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & [ u8 ] ) -> Poll < Result < usize , io ::Error > > {
self . project ( ) . stream . poll_write ( cx , buf )
}
#[ inline ] fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io ::Error > > {
self . project ( ) . stream . poll_flush ( cx )
}
#[ inline ] fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io ::Error > > {
self . project ( ) . stream . poll_shutdown ( cx )
}
#[ inline ] fn poll_write_buf < B : Buf > ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut B ) -> Poll < Result < usize , io ::Error > >
where
Self : Sized , {
self . project ( ) . stream . poll_write_buf ( cx , buf )
}
}
/// Inner rsa data for encrypted stream read+write halves
///
@ -30,14 +211,28 @@ impl<T: AsyncRead + AsyncWrite + ?Sized> AsyncStream for T{}
/// Therefore exchange should happen before the original stream is split at all.
///
/// Only the combined stream can mutate this structure. The halves hold it behind an immutable shared reference.
#[ derive(Debug) ]
struct EncryptedStreamMeta
{
us : RsaPrivateKey ,
them : Option < RsaPublicKey > ,
}
impl EncryptedStreamMeta
{
/// Create a new meta with a newly generated private key.
#[ inline(always) ] pub fn new ( ) -> Self
{
Self {
them : None ,
us : crypt ::generate ( ) ,
}
}
}
/// Writable half of `EncryptedStream`.
#[ pin_project ]
#[ derive(Debug) ]
pub struct WriteHalf < S >
where S : AsyncWrite
{
@ -160,6 +355,7 @@ impl<'a, S: AsyncWrite> AsyncWrite for EncryptedWriteHalf<'a, S>
/// Readable half of `EncryptedStream`.
#[ pin_project ]
#[ derive(Debug) ]
pub struct ReadHalf < S >
where S : AsyncRead
{
@ -232,114 +428,3 @@ impl<S: AsyncWrite> AsyncWrite for WriteHalf<S>
self . project ( ) . backing_write . poll_write_buf ( cx , buf )
}
}
//TODO: Rework everything past this point:
/*
struct ReadWriteCombined < R , W >
{
/// Since chacha20stream has no AsyncRead counterpart, we have to do it ourselves.
cipher_read : Option < Crypter > ,
backing_read : R ,
backing_write : dual ::DualStream < W > ,
}
/// RSA/chacha20 encrypted stream
pub struct EncryptedStream < R , W >
where R : AsyncRead ,
W : AsyncWrite ,
{
meta : EncryptedStreamMeta ,
// Keep the streams on the heap to keep this type not hueg.
backing : Box < ReadWriteCombined < R , W > > ,
}
//TODO: How do we use this with a single AsyncStream instead of requiring 2? Will we need to make our own Arc wrapper?? Ugh,, for now let's ignore this I guess... Most read+write thingies have a Read/WriteHalf split mechanism.
//
// Note that this does actually work fine with things like tokio's `duplex()` (i think)
impl < R : AsyncRead , W : AsyncWrite > EncryptedStream < R , W >
{
/// Has this stream done its RSA key exchange?
pub fn has_exchanged ( & self ) -> bool
{
self . meta . them . is_some ( )
}
/// Split this stream into a read and writeable half.
pub fn split ( self ) -> ( WriteHalf < W > , ReadHalf < R > )
{
let meta = Arc ::new ( self . meta ) ;
let ( read , write ) = {
let ReadWriteCombined { cipher_read , backing_read , backing_write } = * self . backing ;
( ( cipher_read , backing_read ) , backing_write )
} ;
( WriteHalf {
meta : Arc ::clone ( & meta ) ,
backing_write : Box ::new ( write ) ,
} , ReadHalf {
meta ,
cipher : read . 0 ,
backing_read : Box ::new ( read . 1 ) ,
} )
}
/// Join a split `EncryptedStream` from halves.
///
/// # Panics
/// If the read and write half are not from the same split.
pub fn from_split ( ( write , read ) : ( WriteHalf < W > , ReadHalf < R > ) ) -> Self
{
if ! Arc ::ptr_eq ( & write . meta , & read . meta ) {
panic! ( "Read and Write halves are not from the same split" ) ;
}
todo! ( "Drop write's `meta`, consume read's `meta`. Move the streams into `ReadWriteCombined`" )
}
}
/*
impl < S : AsyncWrite > AsyncWrite for WriteHalf < S >
{
#[ inline ] fn poll_write ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & [ u8 ] ) -> Poll < Result < usize , io ::Error > > {
unsafe { self . map_unchecked_mut ( | this | this . backing_write . as_mut ( ) ) } . poll_write ( cx , buf )
}
#[ inline ] fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io ::Error > > {
unsafe { self . map_unchecked_mut ( | this | this . backing_write . as_mut ( ) ) } . poll_flush ( cx )
}
#[ inline ] fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io ::Error > > {
unsafe { self . map_unchecked_mut ( | this | this . backing_write . as_mut ( ) ) } . poll_shutdown ( cx )
}
}
impl < S : AsyncRead > AsyncRead for ReadHalf < S >
{
fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < io ::Result < usize > > {
let this = self . project ( ) ;
let cipher = this . cipher . as_mut ( ) ;
let stream = unsafe { this . backing_read . map_unchecked_mut ( | f | f . as_mut ( ) ) } ;
let res = stream . poll_read ( cx , buf ) ;
if let Some ( cipher ) = cipher {
// Decrypt the buffer if the read succeeded
res . map ( move | res | res . and_then ( move | sz | {
alloca_limit ( sz , move | obuf | -> io ::Result < usize > {
// This `sz` and old `sz` should always be the same.
let sz = cipher . update ( & buf [ .. sz ] , & mut obuf [ .. ] ) ? ;
let _f = cipher . finalize ( & mut obuf [ .. sz ] ) ? ;
debug_assert_eq! ( _f , 0 ) ;
// Copy decrypted buffer into output buffer
buf . copy_from_slice ( & obuf [ .. sz ] ) ;
Ok ( sz )
} )
} ) )
} else {
res
}
}
}
* /
* /