You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
5.2 KiB

//! Encodings
use super::*;
use ext::*;
use std::{fmt, error};
use bytes::{
Buf,
Bytes,
};
use std::io;
use tokio::io::{
AsyncRead, AsyncWrite,
AsyncReadExt, AsyncWriteExt,
};
use serde::{
Serialize,
Deserialize
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)]
pub enum CompressionKind
{
Brotli,
Xz,
GZip,
}
impl Default for CompressionKind
{
#[inline]
fn default() -> Self
{
Self::Brotli
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum EncryptionKind
{
Chacha20((key::Key, key::IV))
}
impl Default for EncryptionKind
{
#[inline]
fn default() -> Self
{
Self::Chacha20(cha::keygen())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum SerialFormat
{
/// CBOR
Binary,
/// JSON
Text,
}
impl Default for SerialFormat
{
#[inline]
fn default() -> Self
{
Self::Binary
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, Default)]
pub struct SendOpt
{
pub comp: Option<CompressionKind>,
pub encrypt: Option<EncryptionKind>,
pub format: SerialFormat,
}
ref_self!(SendOpt);
/// Default buffer size for encryption transform stream copying.
pub const DEFAULT_BUFSIZE: usize = 4096;
async fn cha_copy<F, T, const BUFSIZE: usize, const DECRYPT: bool>(from: &mut F, to: &mut T, key: &key::Key, iv: &key::IV) -> io::Result<(usize, usize)>
where F: AsyncRead + Unpin + ?Sized,
T: AsyncWrite + Unpin + ?Sized
{
let mut written=0;
let mut read=0;
let mut r;
let mut buffer = [0u8; BUFSIZE];
let mut cbuffer = [0u8; BUFSIZE];
let mut crypter = if DECRYPT {
cha::decrypter(key, iv)
} else {
cha::encrypter(key, iv)
}?;
while { r = from.read(&mut buffer[..]).await?; r > 0 } {
read += r;
r = crypter.update(&buffer[..r], &mut cbuffer[..])?;
to.write(&cbuffer[..r]).await?;
written += r;
}
Ok((written, read))
}
async fn ser_singleton_inner<T: Serialize, V: AsyncWrite + Unpin, F>(to: F, value: &T, how: impl AsRef<SendOpt>) -> Result<(V, usize), SendErrorKind>
where F: FnOnce(&Vec<u8>) -> V
{
let how = how.as_ref();
let ser = match how.format {
SerialFormat::Text => serde_json::to_vec(value)?,
SerialFormat::Binary => serde_cbor::to_vec(value)?,
};
let mut a;
let mut b;
let reader: &mut (dyn AsyncRead + Unpin) =
if let Some(comp) = &how.comp {
match comp {
CompressionKind::Brotli => {
a = async_compression::tokio::bufread::BrotliEncoder::new(tokio::io::BufReader::new(&ser[..]));
&mut a
},
_ => unimplemented!("Xz and GZip currently unimplemented."),
}
} else {
b = &ser[..];
&mut b
};
let mut ser = to(&ser);
let w= if let Some(enc) = &how.encrypt {
match enc {
EncryptionKind::Chacha20((k, iv)) => {
self::cha_copy::<_, _, DEFAULT_BUFSIZE, false>(reader, &mut ser, k, iv).await?.0
},
}
} else {
tokio::io::copy(reader, &mut ser).await? as usize
};
Ok((ser, w))
// inner(value, how).map(|res| res.map_err(|k| SendError(Box::new((k, how.clone())))))
}
#[inline(always)] pub fn ser_singleton<'a, T: Serialize>(value: &'a T, how: &'a SendOpt) -> impl Future<Output = Result<Vec<u8>, SendError>> + 'a
{
use futures::prelude::*;
// hack to avoid having to enable `try{}` feature :/
ser_singleton_inner(|c| Vec::with_capacity(c.len()), value, how)
.map_ok(|(v, _)| v)
.map_err(|k| SendError(Box::new((k, how.clone()))))
}
/// Serialise a single object to a stream with the method described by `how`.
#[inline] pub async fn write_singleton<T: Serialize, S: ?Sized + AsyncWrite + Unpin>(to: &mut S, value: &T, how: &SendOpt) -> Result<usize, SendError>
{
ser_singleton_inner(|_| to, value, &how).await
.map_err(|k| SendError(Box::new((k, how.to_owned()))))
.map(|(_, n)| n)
}
#[derive(Debug)]
pub enum SendErrorKind
{
/// Invalid serialised format
Format,
/// Compression
Compress,
/// Encryption
Encrypt,
/// Misc. IO
//TODO: Disambiguate when this happens into the two above cases.
IO(io::Error),
}
/// An error when sending / serialising an object.
#[derive(Debug)]
pub struct SendError(Box<(SendErrorKind, SendOpt)>);
impl error::Error for SendError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self.0.0
{
SendErrorKind::IO(io) => io,
_ => return None,
})
}
}
impl fmt::Display for SendError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "error when serialising object with params {:?}: ", self.0.1)?;
match self.0.0 {
SendErrorKind::Format => write!(f, "failed to serialise object to data"),
SendErrorKind::Compress => write!(f, "failed to compress data"),
SendErrorKind::Encrypt => write!(f, "failed to encrypt data"),
SendErrorKind::IO(_) => write!(f, "i/o failure"),
}
}
}
impl From<io::Error> for SendErrorKind
{
fn from(from: io::Error) -> Self
{
Self::IO(from)
}
}
impl From<serde_cbor::Error> for SendErrorKind
{
#[inline] fn from(_: serde_cbor::Error) -> Self
{
Self::Format
}
}
impl From<serde_json::Error> for SendErrorKind
{
#[inline] fn from(_: serde_json::Error) -> Self
{
Self::Format
}
}