|
|
|
@ -2,6 +2,7 @@
|
|
|
|
|
use super::*;
|
|
|
|
|
use tokio::prelude::*;
|
|
|
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
|
use serde::de::DeserializeOwned;
|
|
|
|
|
|
|
|
|
|
use async_compression::tokio_02::write::{
|
|
|
|
|
BzEncoder,
|
|
|
|
@ -12,6 +13,9 @@ type Compressor<T> = BzEncoder<T>;
|
|
|
|
|
type Decompressor<T> = BzDecoder<T>;
|
|
|
|
|
|
|
|
|
|
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
|
|
|
|
|
const DESERIALISE_OBJECT_READ_LIMIT: usize = 1024 * 1024 * 1024 * 2; // 2GB
|
|
|
|
|
|
|
|
|
|
const BUFFER_SIZE: usize = 4096;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
enum MaybeCompressor<'a, T>
|
|
|
|
@ -89,6 +93,63 @@ where T: AsyncWrite + Unpin + Send + 'a
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn copy_with_limit<R,W>(mut from: R, mut to: W) -> io::Result<usize>
|
|
|
|
|
where R: AsyncRead + Unpin,
|
|
|
|
|
W: AsyncWrite + Unpin
|
|
|
|
|
{
|
|
|
|
|
let mut buffer = [0u8; BUFFER_SIZE];
|
|
|
|
|
|
|
|
|
|
let mut read;
|
|
|
|
|
let mut done=0;
|
|
|
|
|
while {read = from.read(&mut buffer[..]).await?; read>0}
|
|
|
|
|
{
|
|
|
|
|
to.write_all(&buffer[..read]).await?;
|
|
|
|
|
done+=read;
|
|
|
|
|
|
|
|
|
|
if done > DESERIALISE_OBJECT_READ_LIMIT {
|
|
|
|
|
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, eyre!("Exceeded limit, aborting.")
|
|
|
|
|
.with_section(|| DESERIALISE_OBJECT_READ_LIMIT.header("Object read size limit was"))
|
|
|
|
|
.with_section(|| done.header("Currently read"))));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(done)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Deserialise an object from this stream asynchronously
|
|
|
|
|
///
|
|
|
|
|
/// # Note
|
|
|
|
|
/// If the stream is compressed, `compressed` must be set to true or an error will be produced.
|
|
|
|
|
/// An autodetect feature may be added in the future
|
|
|
|
|
pub async fn read_async<T: DeserializeOwned + Send + 'static, R>(mut from: R, compressed: bool) -> eyre::Result<T>
|
|
|
|
|
where R: AsyncRead + Unpin + Send
|
|
|
|
|
{
|
|
|
|
|
let sect_type_name = || std::any::type_name::<T>().header("Type trying to deserialise was");
|
|
|
|
|
let sect_stream_type_name = || std::any::type_name::<R>().header("Stream type was");
|
|
|
|
|
|
|
|
|
|
let vec = {
|
|
|
|
|
let mut vec = Vec::new();
|
|
|
|
|
let mut writer = MaybeCompressor::new(&mut vec, compressed.then(|| CompKind::Decompress));
|
|
|
|
|
|
|
|
|
|
copy_with_limit(&mut from, writer.deref_mut()).await
|
|
|
|
|
.wrap_err(eyre!("Failed to copy stream into in-memory buffer"))
|
|
|
|
|
.with_section(sect_type_name.clone())
|
|
|
|
|
.with_section(sect_stream_type_name.clone())?;
|
|
|
|
|
|
|
|
|
|
writer.flush().await.wrap_err(eyre!("Failed to flush decompression stream"))?;
|
|
|
|
|
writer.shutdown().await.wrap_err(eyre!("Failed to shutdown decompression stream"))?;
|
|
|
|
|
vec
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
|
|
|
(serde_cbor::from_slice(&vec[..])
|
|
|
|
|
.wrap_err(eyre!("Failed to deseralised decompressed data"))
|
|
|
|
|
.with_section(sect_type_name.clone())
|
|
|
|
|
.with_section(sect_stream_type_name.clone()),
|
|
|
|
|
|
|
|
|
|
{drop!(vec vec);}).0
|
|
|
|
|
}).await.wrap_err(eyre!("Panic while deserialising decompressed data"))?
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Serialise this object asynchronously
|
|
|
|
|
///
|
|
|
|
|
/// # Note
|
|
|
|
@ -211,7 +272,7 @@ mod prealloc {
|
|
|
|
|
pub fn read_prealloc<T: serde::de::DeserializeOwned>(file: &File) -> eyre::Result<T>
|
|
|
|
|
{
|
|
|
|
|
let map = unsafe { Mmap::map(file) }
|
|
|
|
|
.wrap_err(eyre!("Failed to map file for read + write"))
|
|
|
|
|
.wrap_err(eyre!("Failed to map file for read"))
|
|
|
|
|
.with_section(|| file.as_raw_fd().header("fd was"))
|
|
|
|
|
.with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;
|
|
|
|
|
|
|
|
|
|