diff --git a/src/enc.rs b/src/enc.rs index f78a0ff..864094d 100644 --- a/src/enc.rs +++ b/src/enc.rs @@ -117,10 +117,54 @@ where F: AsyncRead + Unpin + ?Sized, Ok((written, read)) } -async fn de_singleton_inner(from: S, how: impl AsRef) -> Result -where S: AsRef<[u8]> //TODO: Should we use bytes::Buf or something instead? +async fn de_singleton_inner(mut from: &[u8], how: &RecvOpt) -> Result { - todo!("Deserialise from `from` using `how`.") + // Decompressor + // The output is written to this (through writer) + let mut is_spec = false; //TODO: Determine this before allocating `buf`. + let mut buf = Vec::with_capacity(from.len()); // The `spec` output buffer. Used if there are transformations that need to be done to the data before deserialisation + from = { + let mut b; + let writer: &mut (dyn AsyncWrite + Unpin) = + if let Some(comp) = &how.comp { + is_spec = true; + match comp { + CompressionKind::Brotli => { + b = async_compression::tokio::write::BrotliDecoder::new(&mut buf); + &mut b + }, + _ => unimplemented!(), + } + } else { + &mut buf + }; + // Decrypt into `writer`. + if let Some(dec) = &how.handle_encrypted { + // There is decryption to be done, decrypt into `writer` (which will handle decompression if needed). + // Return its output buffer + match dec { + EncryptionKind::Chacha20((k, iv)) => { + self::cha_copy::<_, _, DEFAULT_BUFSIZE, true>(&mut &from[..], writer, k, iv).await?; + }, + } + &buf[..] + } else if is_spec { + // There is decompression to be done through `writer`. Return its output buffer + writer.write_all(from).await?; + &buf[..] + } else { + // There is neither decompression nor decryption to be done, return the input reference itself + from + } + }; + + // Deserialise + let v = match how.format { + SerialFormat::Text => serde_json::from_slice(&from[..])?, + SerialFormat::Binary => serde_cbor::from_slice(&from[..])?, + }; + + Ok(v) } async fn ser_singleton_inner(to: F, value: &T, how: impl AsRef) -> Result<(V, usize), TransformErrorKind> @@ -160,6 +204,13 @@ where F: FnOnce(&Vec) -> V // inner(value, how).map(|res| res.map_err(|k| SendError(Box::new((k, how.clone()))))) } +#[inline(always)] pub fn de_singleton<'a, T: DeserializeOwned + 'a, B: ?Sized + AsRef<[u8]> + 'a>(from: &'a B, how: &'a RecvOpt) -> impl Future> + 'a +{ + use futures::prelude::*; + de_singleton_inner(from.as_ref(), how) + .map_err(|k| RecvError(Box::new((k, how.clone())))) +} + #[inline(always)] pub fn ser_singleton<'a, T: Serialize>(value: &'a T, how: &'a SendOpt) -> impl Future, SendError>> + 'a { use futures::prelude::*; @@ -169,6 +220,25 @@ where F: FnOnce(&Vec) -> V .map_err(|k| SendError(Box::new((k, how.clone())))) } +/// Deserialise a single object from a stream with the method described by `how`. +/// +/// # Returns +/// The deserialised value and the number of bytes read from the stream. +pub async fn read_singleton(from: &mut S, how: &RecvOpt) -> Result<(T, usize), RecvError> +{ + let (r, v) = async move { + let mut ibuf = [0u8; std::mem::size_of::()]; + from.read_exact(&mut ibuf[..]).await?; + let n = u64::from_be_bytes(ibuf); + let mut v = Vec::with_capacity(n as usize); + tokio::io::copy(&mut from.take(n), &mut v).await + .map(move |_| (v.len() + ibuf.len(), v)) + }.await + .map_err(|err| RecvError(Box::new((err.into(), how.to_owned()))))?; + let v = de_singleton(&v[..], how).await?; + Ok((v, r)) +} + /// Serialise a single object to a stream with the method described by `how`. #[inline] pub async fn write_singleton(to: &mut S, value: &T, how: &SendOpt) -> Result {