From 1a88bc149df8a1249d5bcdce4c8c280ccbdc19f6 Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 19 Feb 2021 01:00:07 +0000 Subject: [PATCH] added async read --- src/info.rs | 13 +++++++++++ src/main.rs | 7 +++--- src/serial.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 src/info.rs diff --git a/src/info.rs b/src/info.rs new file mode 100644 index 0000000..8367cb8 --- /dev/null +++ b/src/info.rs @@ -0,0 +1,13 @@ +//! Prints the information found in graph in different ways +use super::*; + +use data::HierarchicalINodeGraph; +use config::Config; + +/// Print the most basic info +pub fn print_basic_max_info(cfg: &Config, graph: &HierarchicalINodeGraph) +{ + cfg_println!(Quiet; cfg, "Max size file: {:?}", graph.path_max_size_for(data::FsKind::File)); + cfg_println!(Quiet; cfg, "Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory)); + cfg_println!(Quiet; cfg, "Max size all: {:?}", graph.path_max_size()); +} diff --git a/src/main.rs b/src/main.rs index 61078ad..1837d4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,6 +37,7 @@ mod config; mod state; mod arg; mod work; +mod info; #[cfg(feature="inspect")] mod serial; #[cfg(feature="defer-drop")] mod defer_drop; @@ -150,10 +151,8 @@ async fn normal(cfg: config::Config) -> eyre::Result<()> } } }; - - cfg_println!(Quiet; cfg, "Max size file: {:?}", graph.path_max_size_for(data::FsKind::File)); - cfg_println!(Quiet; cfg, "Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory)); - cfg_println!(Quiet; cfg, "Max size all: {:?}", graph.path_max_size()); + + info::print_basic_max_info(&cfg, &graph); #[cfg(feature="inspect")] match writer.await { diff --git a/src/serial.rs b/src/serial.rs index e90effc..c98756e 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -2,6 +2,7 @@ use super::*; use tokio::prelude::*; use std::ops::{Deref, DerefMut}; +use serde::de::DeserializeOwned; use async_compression::tokio_02::write::{ BzEncoder, @@ -12,6 +13,9 @@ type Compressor = BzEncoder; type Decompressor = BzDecoder; const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB +const DESERIALISE_OBJECT_READ_LIMIT: usize = 1024 * 1024 * 1024 * 2; // 2GB + +const BUFFER_SIZE: usize = 4096; #[derive(Debug)] enum MaybeCompressor<'a, T> @@ -89,6 +93,63 @@ where T: AsyncWrite + Unpin + Send + 'a } } +async fn copy_with_limit(mut from: R, mut to: W) -> io::Result +where R: AsyncRead + Unpin, + W: AsyncWrite + Unpin +{ + let mut buffer = [0u8; BUFFER_SIZE]; + + let mut read; + let mut done=0; + while {read = from.read(&mut buffer[..]).await?; read>0} + { + to.write_all(&buffer[..read]).await?; + done+=read; + + if done > DESERIALISE_OBJECT_READ_LIMIT { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, eyre!("Exceeded limit, aborting.") + .with_section(|| DESERIALISE_OBJECT_READ_LIMIT.header("Object read size limit was")) + .with_section(|| done.header("Currently read")))); + } + } + Ok(done) +} + +/// Deserialise an object from this stream asynchronously +/// +/// # Note +/// If the stream is compressed, `compressed` must be set to true or an error will be produced. +/// An autodetect feature may be added in the future +pub async fn read_async(mut from: R, compressed: bool) -> eyre::Result +where R: AsyncRead + Unpin + Send +{ + let sect_type_name = || std::any::type_name::().header("Type trying to deserialise was"); + let sect_stream_type_name = || std::any::type_name::().header("Stream type was"); + + let vec = { + let mut vec = Vec::new(); + let mut writer = MaybeCompressor::new(&mut vec, compressed.then(|| CompKind::Decompress)); + + copy_with_limit(&mut from, writer.deref_mut()).await + .wrap_err(eyre!("Failed to copy stream into in-memory buffer")) + .with_section(sect_type_name.clone()) + .with_section(sect_stream_type_name.clone())?; + + writer.flush().await.wrap_err(eyre!("Failed to flush decompression stream"))?; + writer.shutdown().await.wrap_err(eyre!("Failed to shutdown decompression stream"))?; + vec + }; + + tokio::task::spawn_blocking(move || { + (serde_cbor::from_slice(&vec[..]) + .wrap_err(eyre!("Failed to deseralised decompressed data")) + .with_section(sect_type_name.clone()) + .with_section(sect_stream_type_name.clone()), + + {drop!(vec vec);}).0 + }).await.wrap_err(eyre!("Panic while deserialising decompressed data"))? +} + /// Serialise this object asynchronously /// /// # Note @@ -211,7 +272,7 @@ mod prealloc { pub fn read_prealloc(file: &File) -> eyre::Result { let map = unsafe { Mmap::map(file) } - .wrap_err(eyre!("Failed to map file for read + write")) + .wrap_err(eyre!("Failed to map file for read")) .with_section(|| file.as_raw_fd().header("fd was")) .with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;