@ -117,10 +117,54 @@ where F: AsyncRead + Unpin + ?Sized,
Ok ( ( written , read ) )
Ok ( ( written , read ) )
}
}
async fn de_singleton_inner < T : DeserializeOwned , S > ( from : S , how : impl AsRef < RecvOpt > ) -> Result < T , TransformErrorKind >
async fn de_singleton_inner < T : DeserializeOwned > ( mut from : & [ u8 ] , how : & RecvOpt ) -> Result < T , TransformErrorKind >
where S : AsRef < [ u8 ] > //TODO: Should we use bytes::Buf or something instead?
{
{
todo! ( "Deserialise from `from` using `how`." )
// Decompressor
// The output is written to this (through writer)
let mut is_spec = false ; //TODO: Determine this before allocating `buf`.
let mut buf = Vec ::with_capacity ( from . len ( ) ) ; // The `spec` output buffer. Used if there are transformations that need to be done to the data before deserialisation
from = {
let mut b ;
let writer : & mut ( dyn AsyncWrite + Unpin ) =
if let Some ( comp ) = & how . comp {
is_spec = true ;
match comp {
CompressionKind ::Brotli = > {
b = async_compression ::tokio ::write ::BrotliDecoder ::new ( & mut buf ) ;
& mut b
} ,
_ = > unimplemented! ( ) ,
}
} else {
& mut buf
} ;
// Decrypt into `writer`.
if let Some ( dec ) = & how . handle_encrypted {
// There is decryption to be done, decrypt into `writer` (which will handle decompression if needed).
// Return its output buffer
match dec {
EncryptionKind ::Chacha20 ( ( k , iv ) ) = > {
self ::cha_copy ::< _ , _ , DEFAULT_BUFSIZE , true > ( & mut & from [ .. ] , writer , k , iv ) . await ? ;
} ,
}
& buf [ .. ]
} else if is_spec {
// There is decompression to be done through `writer`. Return its output buffer
writer . write_all ( from ) . await ? ;
& buf [ .. ]
} else {
// There is neither decompression nor decryption to be done, return the input reference itself
from
}
} ;
// Deserialise
let v = match how . format {
SerialFormat ::Text = > serde_json ::from_slice ( & from [ .. ] ) ? ,
SerialFormat ::Binary = > serde_cbor ::from_slice ( & from [ .. ] ) ? ,
} ;
Ok ( v )
}
}
async fn ser_singleton_inner < T : Serialize , V : AsyncWrite + Unpin , F > ( to : F , value : & T , how : impl AsRef < SendOpt > ) -> Result < ( V , usize ) , TransformErrorKind >
async fn ser_singleton_inner < T : Serialize , V : AsyncWrite + Unpin , F > ( to : F , value : & T , how : impl AsRef < SendOpt > ) -> Result < ( V , usize ) , TransformErrorKind >
@ -160,6 +204,13 @@ where F: FnOnce(&Vec<u8>) -> V
// inner(value, how).map(|res| res.map_err(|k| SendError(Box::new((k, how.clone())))))
// inner(value, how).map(|res| res.map_err(|k| SendError(Box::new((k, how.clone())))))
}
}
#[ inline(always) ] pub fn de_singleton < ' a , T : DeserializeOwned + ' a , B : ? Sized + AsRef < [ u8 ] > + ' a > ( from : & ' a B , how : & ' a RecvOpt ) -> impl Future < Output = Result < T , RecvError > > + ' a
{
use futures ::prelude ::* ;
de_singleton_inner ( from . as_ref ( ) , how )
. map_err ( | k | RecvError ( Box ::new ( ( k , how . clone ( ) ) ) ) )
}
#[ inline(always) ] pub fn ser_singleton < ' a , T : Serialize > ( value : & ' a T , how : & ' a SendOpt ) -> impl Future < Output = Result < Vec < u8 > , SendError > > + ' a
#[ inline(always) ] pub fn ser_singleton < ' a , T : Serialize > ( value : & ' a T , how : & ' a SendOpt ) -> impl Future < Output = Result < Vec < u8 > , SendError > > + ' a
{
{
use futures ::prelude ::* ;
use futures ::prelude ::* ;
@ -169,6 +220,25 @@ where F: FnOnce(&Vec<u8>) -> V
. map_err ( | k | SendError ( Box ::new ( ( k , how . clone ( ) ) ) ) )
. map_err ( | k | SendError ( Box ::new ( ( k , how . clone ( ) ) ) ) )
}
}
/// Deserialise a single object from a stream with the method described by `how`.
///
/// # Returns
/// The deserialised value and the number of bytes read from the stream.
pub async fn read_singleton < T : DeserializeOwned , S : ? Sized + AsyncRead + Unpin > ( from : & mut S , how : & RecvOpt ) -> Result < ( T , usize ) , RecvError >
{
let ( r , v ) = async move {
let mut ibuf = [ 0 u8 ; std ::mem ::size_of ::< u64 > ( ) ] ;
from . read_exact ( & mut ibuf [ .. ] ) . await ? ;
let n = u64 ::from_be_bytes ( ibuf ) ;
let mut v = Vec ::with_capacity ( n as usize ) ;
tokio ::io ::copy ( & mut from . take ( n ) , & mut v ) . await
. map ( move | _ | ( v . len ( ) + ibuf . len ( ) , v ) )
} . await
. map_err ( | err | RecvError ( Box ::new ( ( err . into ( ) , how . to_owned ( ) ) ) ) ) ? ;
let v = de_singleton ( & v [ .. ] , how ) . await ? ;
Ok ( ( v , r ) )
}
/// Serialise a single object to a stream with the method described by `how`.
/// Serialise a single object to a stream with the method described by `how`.
#[ inline ] pub async fn write_singleton < T : Serialize , S : ? Sized + AsyncWrite + Unpin > ( to : & mut S , value : & T , how : & SendOpt ) -> Result < usize , SendError >
#[ inline ] pub async fn write_singleton < T : Serialize , S : ? Sized + AsyncWrite + Unpin > ( to : & mut S , value : & T , how : & SendOpt ) -> Result < usize , SendError >
{
{