added raw / no-compressing mode to write_async

arg-parsing-better
Avril 4 years ago
parent 00798609b2
commit 18c7c12025
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -101,7 +101,7 @@ async fn normal(cfg: config::Config) -> eyre::Result<()>
// Some(path) -> prealloc // Some(path) -> prealloc
Some((stream_fut, None)) => { Some((stream_fut, None)) => {
let stream = stream_fut.await?; let stream = stream_fut.await?;
serial::write_async(stream, &graph).await serial::write_async(stream, &graph, true).await // TODO: raw version
.wrap_err(eyre!("Failed to serialise graph to stream"))?; .wrap_err(eyre!("Failed to serialise graph to stream"))?;
}, },
#[cfg(feature="prealloc")] Some((_task_fut, Some(output_file))) => { #[cfg(feature="prealloc")] Some((_task_fut, Some(output_file))) => {

@ -1,6 +1,7 @@
//! For serializing //! For serializing
use super::*; use super::*;
use tokio::prelude::*; use tokio::prelude::*;
use std::ops::{Deref, DerefMut};
use async_compression::tokio_02::write::{ use async_compression::tokio_02::write::{
BzEncoder, BzEncoder,
@ -12,12 +13,55 @@ type Decompressor<T> = BzDecoder<T>;
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
#[derive(Debug)]
enum MaybeCompressor<'a, T>
{
Compressing(Compressor<&'a mut T>),
Raw(&'a mut T),
}
impl<'a, T> MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a
{
pub fn new(raw: &'a mut T, compress: bool) -> Self
{
if compress {
Self::Compressing(Compressor::new(raw))
} else {
Self::Raw(raw)
}
}
}
impl<'a, T> DerefMut for MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a
{
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::Compressing(t) => t,
Self::Raw(o) => o,
}
}
}
impl<'a, T> Deref for MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a
{
type Target = dyn AsyncWrite + Unpin+ 'a;
fn deref(&self) -> &Self::Target {
match self {
Self::Compressing(t) => t,
Self::Raw(o) => o,
}
}
}
/// Serialise this object asynchronously /// Serialise this object asynchronously
/// ///
/// # Note /// # Note
/// This compresses the output stream. /// This compresses the output stream.
/// It cannot be used by `prealloc` read/write functions, as they do not compress. /// It cannot be used by `prealloc` read/write functions, as they do not compress.
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T) -> eyre::Result<()> pub async fn write_async<T: Serialize, W>(mut to: W, item: &T, compress: bool) -> eyre::Result<()>
where W: AsyncWrite + Unpin where W: AsyncWrite + Unpin
{ {
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was"); let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
@ -29,7 +73,7 @@ where W: AsyncWrite + Unpin
.with_section(sect_type_name.clone())?; .with_section(sect_type_name.clone())?;
{ {
let mut stream = Compressor::new(&mut to); let mut stream = MaybeCompressor::new(&mut to, compress);
cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>()); cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>());

Loading…
Cancel
Save