You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

447 lines
11 KiB

//! Encodings
use super::*;
use ext::*;
use std::{fmt, error};
use bytes::{
use std::io;
use tokio::io::{
AsyncRead, AsyncWrite,
AsyncReadExt, AsyncWriteExt,
use serde::{
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)]
pub enum CompressionKind
impl Default for CompressionKind
fn default() -> Self
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum EncryptionKind
Chacha20((key::Key, key::IV))
impl Default for EncryptionKind
fn default() -> Self
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum SerialFormat
/// CBOR
/// JSON
impl Default for SerialFormat
fn default() -> Self
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, Default)]
pub struct SendOpt
pub comp: Option<CompressionKind>,
pub encrypt: Option<EncryptionKind>,
pub format: SerialFormat,
impl SendOpt
/// Does the binary data of this format require special handling?
/// True if encryption and/or compression are specified.
fn is_spec(&self) -> bool
self.comp.is_some() || self.encrypt.is_some()
pub type RecvOpt = SendOpt;
/// Default buffer size for encryption transform stream copying.
pub const DEFAULT_BUFSIZE: usize = 4096;
async fn cha_copy<F, T, const BUFSIZE: usize, const DECRYPT: bool>(from: &mut F, to: &mut T, key: &key::Key, iv: &key::IV) -> io::Result<(usize, usize)>
where F: AsyncRead + Unpin + ?Sized,
T: AsyncWrite + Unpin + ?Sized
let mut written=0;
let mut read=0;
let mut r;
let mut buffer = [0u8; BUFSIZE];
let mut cbuffer = [0u8; BUFSIZE];
let mut crypter = if DECRYPT {
cha::decrypter(key, iv)
} else {
cha::encrypter(key, iv)
while { r = buffer[..]).await?; r > 0 } {
read += r;
r = crypter.update(&buffer[..r], &mut cbuffer[..])?;
written += r;
Ok((written, read))
async fn de_singleton_inner<T: DeserializeOwned, B, F>(buf: F, mut from: &[u8], how: &RecvOpt) -> Result<T, TransformErrorKind>
where B: AsRef<[u8]> + AsyncWrite + Unpin + Default,
F: FnOnce(&[u8]) -> B
// Decompressor
// The output is written to this (through writer)
let mut is_spec = false; // This is set later. The value will sometimes differ from `how.is_spec()` depending on combinations of options.
// The `spec` output buffer. Used if there are transformations that need to be done to the data before deserialisation
let mut buf = if how.is_spec() {
} else {
//let mut buf = Vec::with_capacity(from.len());
from = {
let mut b;
let writer: &mut (dyn AsyncWrite + Unpin) =
if let Some(comp) = &how.comp {
is_spec = true;
match comp {
CompressionKind::Brotli => {
b = async_compression::tokio::write::BrotliDecoder::new(&mut buf);
&mut b
_ => unimplemented!(),
} else {
&mut buf
// Decrypt into `writer`.
if let Some(dec) = &how.encrypt {
// There is decryption to be done, decrypt into `writer` (which will handle decompression if needed).
// Return its output buffer
match dec {
EncryptionKind::Chacha20((k, iv)) => {
self::cha_copy::<_, _, DEFAULT_BUFSIZE, true>(&mut &from[..], writer, k, iv).await?;
// Required for decompression to complete
} else if is_spec {
// There is decompression to be done through `writer`. Return its output buffer
// Required for decompression to complete
} else {
// There is neither decompression nor decryption to be done, return the input reference itself
// Deserialise
let v = match how.format {
SerialFormat::Text => serde_json::from_slice(&from[..])?,
SerialFormat::Binary => serde_cbor::from_slice(&from[..])?,
async fn ser_singleton_inner<T: Serialize, V: AsyncWrite + Unpin, F>(to: F, value: &T, how: impl AsRef<SendOpt>) -> Result<(V, usize), TransformErrorKind>
where F: FnOnce(&Vec<u8>) -> V
let how = how.as_ref();
let ser = match how.format {
SerialFormat::Text => serde_json::to_vec(value)?,
SerialFormat::Binary => serde_cbor::to_vec(value)?,
let mut a;
let mut b;
let reader: &mut (dyn AsyncRead + Unpin) =
if let Some(comp) = &how.comp {
match comp {
CompressionKind::Brotli => {
a = async_compression::tokio::bufread::BrotliEncoder::new(tokio::io::BufReader::new(&ser[..]));
&mut a
_ => unimplemented!("Xz and GZip currently unimplemented."),
} else {
b = &ser[..];
&mut b
let mut ser = to(&ser);
let w= if let Some(enc) = &how.encrypt {
let n = match enc {
EncryptionKind::Chacha20((k, iv)) => {
self::cha_copy::<_, _, DEFAULT_BUFSIZE, false>(reader, &mut ser, k, iv).await?.0
// Required for compression to complete
} else {
tokio::io::copy(reader, &mut ser).await? as usize
Ok((ser, w))
// inner(value, how).map(|res| res.map_err(|k| SendError(Box::new((k, how.clone())))))
#[inline(always)] pub fn de_singleton<'a, T: DeserializeOwned + 'a, B: ?Sized + AsRef<[u8]> + 'a>(from: &'a B, how: &'a RecvOpt) -> impl Future<Output = Result<T, RecvError>> + 'a
use futures::prelude::*;
de_singleton_inner(|from| Vec::with_capacity(from.as_ref().len()), from.as_ref(), how)
.map_err(|k| RecvError(Box::new((k, how.clone()))))
#[inline(always)] pub fn ser_singleton<'a, T: Serialize>(value: &'a T, how: &'a SendOpt) -> impl Future<Output = Result<Vec<u8>, SendError>> + 'a
use futures::prelude::*;
// hack to avoid having to enable `try{}` feature :/
ser_singleton_inner(|c| Vec::with_capacity(c.len()), value, how)
.map_ok(|(v, _)| v)
.map_err(|k| SendError(Box::new((k, how.clone()))))
/// Deserialise a single object from a stream with the method described by `how`.
/// # Returns
/// The deserialised value and the number of bytes read from the stream.
pub async fn read_singleton<T: DeserializeOwned, S: ?Sized + AsyncRead + Unpin>(from: &mut S, how: &RecvOpt) -> Result<(T, usize), RecvError>
let (r, v) = async move {
let mut ibuf = [0u8; std::mem::size_of::<u64>()];
from.read_exact(&mut ibuf[..]).await?;
let n = u64::from_be_bytes(ibuf);
let mut v = Vec::with_capacity(n as usize);
tokio::io::copy(&mut from.take(n), &mut v).await
.map(move |_| (v.len() + ibuf.len(), v))
.map_err(|err| RecvError(Box::new((err.into(), how.to_owned()))))?;
let v = de_singleton(&v[..], how).await?;
Ok((v, r))
/// Serialise a single object to a stream with the method described by `how`.
#[inline] pub async fn write_singleton<T: Serialize, S: ?Sized + AsyncWrite + Unpin>(to: &mut S, value: &T, how: &SendOpt) -> Result<usize, SendError>
let (cont, v) = ser_singleton_inner(|n| Vec::with_capacity(n.len()), value, &how).await
.map_err(|k| SendError(Box::new((k, how.to_owned()))))?;
let n = async move {
to.write_all(&(v as u64).to_be_bytes()[..]).await?;
.map(|_| std::mem::size_of::<u64>() + cont.len())
.map_err(|k| SendError(Box::new((k.into(), how.to_owned()))))?;
/// Kind of error for a send (serialise) or receive (deserialise) operation
pub enum TransformErrorKind
/// Invalid serialised format
/// Compression
/// Encryption
/// Misc. IO
//TODO: Disambiguate when this happens into the two above cases.
/// An error when sending / serialising an object.
pub struct RecvError(Box<(TransformErrorKind, RecvOpt)>);
impl RecvError
#[inline] pub fn kind(&self) -> &TransformErrorKind
impl SendError
#[inline] pub fn kind(&self) -> &TransformErrorKind
impl error::Error for RecvError
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self.0.0
TransformErrorKind::IO(io) => io,
_ => return None,
impl fmt::Display for RecvError
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
write!(f, "error when deserialising object with params {:?}: ", self.0.1)?;
match self.0.0 {
TransformErrorKind::Format => write!(f, "failed to deserialise object to data"),
TransformErrorKind::Compress => write!(f, "failed to decompress data"),
TransformErrorKind::Encrypt => write!(f, "failed to decrypt data"),
TransformErrorKind::IO(_) => write!(f, "i/o failure"),
/// An error when sending / serialising an object.
pub struct SendError(Box<(TransformErrorKind, SendOpt)>);
impl error::Error for SendError
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self.0.0
TransformErrorKind::IO(io) => io,
_ => return None,
impl fmt::Display for SendError
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
write!(f, "error when serialising object with params {:?}: ", self.0.1)?;
match self.0.0 {
TransformErrorKind::Format => write!(f, "failed to serialise object to data"),
TransformErrorKind::Compress => write!(f, "failed to compress data"),
TransformErrorKind::Encrypt => write!(f, "failed to encrypt data"),
TransformErrorKind::IO(_) => write!(f, "i/o failure"),
impl From<io::Error> for TransformErrorKind
fn from(from: io::Error) -> Self
impl From<serde_cbor::Error> for TransformErrorKind
#[inline] fn from(_: serde_cbor::Error) -> Self
impl From<serde_json::Error> for TransformErrorKind
#[inline] fn from(_: serde_json::Error) -> Self
mod test
use super::*;
async fn ser_de_with(how: SendOpt) -> eyre::Result<()>
use ext::*;
let obj = String::from("Hello world");
let var = ser_singleton(&obj, &how).await?;
eprintln!("Ser: {}", var.hex());
let des: String = de_singleton(&var, &how).await?;
eprintln!("De: {:?}", des);
assert_eq!(obj, des);
async fn ser_de() -> eyre::Result<()>
async fn ser_de_comp() -> eyre::Result<()>
ser_de_with(SendOpt {
comp: Some(CompressionKind::Brotli),
async fn ser_de_enc() -> eyre::Result<()>
ser_de_with(SendOpt {
encrypt: Some(EncryptionKind::Chacha20(cha::keygen())),
async fn ser_de_comp_enc() -> eyre::Result<()>
ser_de_with(SendOpt {
encrypt: Some(EncryptionKind::Chacha20(cha::keygen())),
comp: Some(CompressionKind::Brotli),