diff --git a/src/main.rs b/src/main.rs index 6b304b4..b4cfba8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,7 +101,7 @@ async fn normal(cfg: config::Config) -> eyre::Result<()> // Some(path) -> prealloc Some((stream_fut, None)) => { let stream = stream_fut.await?; - serial::write_async(stream, &graph).await + serial::write_async(stream, &graph, true).await // TODO: raw version .wrap_err(eyre!("Failed to serialise graph to stream"))?; }, #[cfg(feature="prealloc")] Some((_task_fut, Some(output_file))) => { diff --git a/src/serial.rs b/src/serial.rs index 364b4d6..47f3b69 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -1,6 +1,7 @@ //! For serializing use super::*; use tokio::prelude::*; +use std::ops::{Deref, DerefMut}; use async_compression::tokio_02::write::{ BzEncoder, @@ -12,12 +13,55 @@ type Decompressor = BzDecoder; const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB +#[derive(Debug)] +enum MaybeCompressor<'a, T> +{ + Compressing(Compressor<&'a mut T>), + Raw(&'a mut T), +} + +impl<'a, T> MaybeCompressor<'a, T> +where T: AsyncWrite + Unpin + 'a +{ + pub fn new(raw: &'a mut T, compress: bool) -> Self + { + if compress { + Self::Compressing(Compressor::new(raw)) + } else { + Self::Raw(raw) + } + } +} + +impl<'a, T> DerefMut for MaybeCompressor<'a, T> +where T: AsyncWrite + Unpin + 'a +{ + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::Compressing(t) => t, + Self::Raw(o) => o, + } + } +} +impl<'a, T> Deref for MaybeCompressor<'a, T> +where T: AsyncWrite + Unpin + 'a +{ + type Target = dyn AsyncWrite + Unpin+ 'a; + + fn deref(&self) -> &Self::Target { + match self { + Self::Compressing(t) => t, + Self::Raw(o) => o, + } + } +} + /// Serialise this object asynchronously /// /// # Note /// This compresses the output stream. /// It cannot be used by `prealloc` read/write functions, as they do not compress. -pub async fn write_async(mut to: W, item: &T) -> eyre::Result<()> +pub async fn write_async(mut to: W, item: &T, compress: bool) -> eyre::Result<()> where W: AsyncWrite + Unpin { let sect_type_name = || std::any::type_name::().header("Type trying to serialise was"); @@ -29,7 +73,7 @@ where W: AsyncWrite + Unpin .with_section(sect_type_name.clone())?; { - let mut stream = Compressor::new(&mut to); + let mut stream = MaybeCompressor::new(&mut to, compress); cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::(), std::any::type_name::());