//! For serializing use super::*; use tokio::prelude::*; use std::ops::{Deref, DerefMut}; use serde::de::DeserializeOwned; use async_compression::tokio_02::write::{ BzEncoder, BzDecoder, }; type Compressor = BzEncoder; type Decompressor = BzDecoder; const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB const DESERIALISE_OBJECT_READ_LIMIT: usize = 1024 * 1024 * 1024 * 2; // 2GB const BUFFER_SIZE: usize = 4096; #[derive(Debug)] enum MaybeCompressor<'a, T> { Compressing(Compressor<&'a mut T>), Decompressing(Decompressor<&'a mut T>), Raw(&'a mut T), } /// Compress or decompress? #[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)] enum CompKind { Compress, Decompress } impl Default for CompKind { #[inline] fn default() -> Self { Self::Compress } } impl<'a, T> MaybeCompressor<'a, T> { /// What kind is this compressor set to pub fn kind(&self) -> Option { Some(match self { Self::Raw(_) => return None, Self::Compressing(_) => CompKind::Compress, Self::Decompressing(_) => CompKind::Decompress, }) } } impl<'a, T> MaybeCompressor<'a, T> where T: AsyncWrite +Send + Unpin + 'a { pub fn new(raw: &'a mut T, compress: Option) -> Self { match compress { Some(CompKind::Compress) => Self::Compressing(Compressor::new(raw)), Some(CompKind::Decompress) => Self::Decompressing(Decompressor::new(raw)), None => Self::Raw(raw), } } } impl<'a, T> DerefMut for MaybeCompressor<'a, T> where T: AsyncWrite + Send + Unpin + 'a { fn deref_mut(&mut self) -> &mut Self::Target { match self { Self::Compressing(t) => t, Self::Decompressing(t) => t, Self::Raw(o) => o, } } } impl<'a, T> Deref for MaybeCompressor<'a, T> where T: AsyncWrite + Unpin + Send + 'a { type Target = dyn AsyncWrite + Send + Unpin+ 'a; fn deref(&self) -> &Self::Target { match self { Self::Compressing(t) => t, Self::Decompressing(t) => t, Self::Raw(o) => o, } } } async fn copy_with_limit(mut from: R, mut to: W) -> io::Result where R: AsyncRead + Unpin, W: AsyncWrite + Unpin { let mut buffer = [0u8; BUFFER_SIZE]; let mut read; let mut done=0; while {read = from.read(&mut buffer[..]).await?; read>0} { to.write_all(&buffer[..read]).await?; done+=read; if done > DESERIALISE_OBJECT_READ_LIMIT { return Err(io::Error::new(io::ErrorKind::ConnectionAborted, eyre!("Exceeded limit, aborting.") .with_section(|| DESERIALISE_OBJECT_READ_LIMIT.header("Object read size limit was")) .with_section(|| done.header("Currently read")))); } } Ok(done) } /// Deserialise an object from this stream asynchronously /// /// # Note /// If the stream is compressed, `compressed` must be set to true or an error will be produced. /// An autodetect feature may be added in the future pub async fn read_async(mut from: R, compressed: bool) -> eyre::Result where R: AsyncRead + Unpin + Send { let sect_type_name = || std::any::type_name::().header("Type trying to deserialise was"); let sect_stream_type_name = || std::any::type_name::().header("Stream type was"); let vec = { let mut vec = Vec::new(); let mut writer = MaybeCompressor::new(&mut vec, compressed.then(|| CompKind::Decompress)); copy_with_limit(&mut from, writer.deref_mut()).await .wrap_err(eyre!("Failed to copy stream into in-memory buffer")) .with_section(sect_type_name.clone()) .with_section(sect_stream_type_name.clone())?; writer.flush().await.wrap_err(eyre!("Failed to flush decompression stream"))?; writer.shutdown().await.wrap_err(eyre!("Failed to shutdown decompression stream"))?; vec }; tokio::task::spawn_blocking(move || { (serde_cbor::from_slice(&vec[..]) .wrap_err(eyre!("Failed to deseralised decompressed data")) .with_section(sect_type_name.clone()) .with_section(sect_stream_type_name.clone()), {drop!(vec vec);}).0 }).await.wrap_err(eyre!("Panic while deserialising decompressed data"))? } /// Serialise this object asynchronously /// /// # Note /// This compresses the output stream. /// It cannot be used by `prealloc` read/write functions, as they do not compress. pub async fn write_async(mut to: W, item: &T, compress: bool) -> eyre::Result<()> where W: AsyncWrite + Unpin + Send { let sect_type_name = || std::any::type_name::().header("Type trying to serialise was"); let sect_stream_type_name = || std::any::type_name::().header("Stream type was"); let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item)) .wrap_err(eyre!("Failed to serialise item")) .with_section(sect_stream_type_name.clone()) .with_section(sect_type_name.clone())?; { let mut stream = MaybeCompressor::new(&mut to, compress.then(|| CompKind::Compress)); cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::(), std::any::type_name::()); stream.write_all(&vec[..]) .await .wrap_err(eyre!("Failed to write serialised memory to stream")) .with_section(|| vec.len().to_string().header("Size of the serialised object was")) .with_section(sect_stream_type_name.clone()) .with_section(sect_type_name.clone())?; stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?; stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?; } // Extremely overcomplicated concurrent flush+drop. use futures::FutureExt; let flush_fut = async { to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?; to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?; Ok::<(), eyre::Report>(()) }.fuse(); tokio::pin!(flush_fut); tokio::select!{ res = &mut flush_fut => { return res; } _ = async move { drop!(async vec vec); } => {} } flush_fut.await } #[cfg(feature="prealloc")] mod prealloc { use super::*; use std::os::unix::prelude::*; use std::fs::File; use memmap::{MmapMut, Mmap}; /// Write this object as-is to this file descriptor. /// /// # Note /// This does not compress like `write_aynsc()` does. It is just a 1-1 dump of the serialisation. /// Therefore, data written with `write_prealloc()` cannot be then read used with `read_async()`. /// /// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups. pub fn write_prealloc(file: &mut File, item: &T) -> eyre::Result<()> { let sect_type_name = || std::any::type_name::().header("Type trying to serialise was"); let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item)) .wrap_err(eyre!("Failed to serialise item")) .with_section(sect_type_name.clone())?; let fd = file.as_raw_fd(); cfg_eprintln!(Verbose; config::get_global(), "Writing (raw) {} bytes of type {:?} to fd {}", vec.len(), std::any::type_name::(), fd); unsafe { if libc::fallocate(fd, 0, 0, vec.len().try_into() .wrap_err(eyre!("Failed to cast buffer size to `off_t`")) .with_section(|| vec.len().header("Buffer size was")) .with_section(|| libc::off_t::MAX.to_string().header("Max value of `off_t` is")) .with_warning(|| "Usually `off_t` is a signed 64 bit integer. Whereas the buffer's size is unsigned. On systems where `off_t` is 64 bits or higher, this should realistically never happen and probably indicates a bug.")?) < 0 { // Error Err(std::io::Error::last_os_error()) } else { Ok(()) } }.wrap_err("fallocate() failed") .with_section(|| vec.len().header("Bytes to allocate was")) .with_suggestion(|| "Make sure there is enough space for the fallocate() call") .with_suggestion(|| "Make sure we are able to write to the file")?; // fallocate() succeeded in allocating `vec.len()` bytes to map. let mut map = unsafe { MmapMut::map_mut(file) } .wrap_err(eyre!("Failed to map file for read + write")) .with_section(|| fd.header("fd was")) .with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?; eyre_assert!(tokio::task::block_in_place(|| unsafe { bytes::copy_nonoverlapping_unchecked(&vec[..], &mut map[..]) }) == vec.len(); "Length mismatch") .with_section(|| vec.len().header("Expected")) .with_section(|| map.len().header("Got")) .with_warning(|| "This should never happen, it indicates a bug")?; tokio::task::block_in_place(move || map.flush()) .wrap_err(eyre!("Failed to flush map in place"))?; //map is dropped here drop!(vec vec); Ok(()) } /// Read this object as-is from this file descriptor. /// /// # Note /// This does not decompress like `read_aynsc()` does. It is just a 1-1 read of the serialisation. /// Therefore, `read_prealloc()` cannot be used with data written by `write_async()`. /// /// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups. // This must be `DeserializeOwned` because the lifetime it is bound to is that of the memory map created and destroyed in the function, not of the fd `file` itself. pub fn read_prealloc(file: &File) -> eyre::Result { let map = unsafe { Mmap::map(file) } .wrap_err(eyre!("Failed to map file for read")) .with_section(|| file.as_raw_fd().header("fd was")) .with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?; tokio::task:: block_in_place(move || serde_cbor::from_slice(&map[..])) .wrap_err(eyre!("Failed to deserialise from map")) .with_note(|| "The prealloc read and write functions handle only *uncompressed* data. Make sure you're not feeding it compressed data (written with the non-prealloc read and write functions)") } } #[cfg(feature="prealloc")] pub use prealloc::{ write_prealloc as write_sync_map, read_prealloc as read_sync_map, };