You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dirstat/src/serial.rs

289 lines
10 KiB

//! For serializing
use super::*;
use tokio::prelude::*;
use std::ops::{Deref, DerefMut};
use serde::de::DeserializeOwned;
use async_compression::tokio_02::write::{
BzEncoder,
BzDecoder,
};
type Compressor<T> = BzEncoder<T>;
type Decompressor<T> = BzDecoder<T>;
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
const DESERIALISE_OBJECT_READ_LIMIT: usize = 1024 * 1024 * 1024 * 2; // 2GB
const BUFFER_SIZE: usize = 4096;
#[derive(Debug)]
enum MaybeCompressor<'a, T>
{
Compressing(Compressor<&'a mut T>),
Decompressing(Decompressor<&'a mut T>),
Raw(&'a mut T),
}
/// Compress or decompress?
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)]
enum CompKind
{
Compress,
Decompress
}
impl Default for CompKind
{
#[inline]
fn default() -> Self
{
Self::Compress
}
}
impl<'a, T> MaybeCompressor<'a, T>
{
/// What kind is this compressor set to
pub fn kind(&self) -> Option<CompKind>
{
Some(match self {
Self::Raw(_) => return None,
Self::Compressing(_) => CompKind::Compress,
Self::Decompressing(_) => CompKind::Decompress,
})
}
}
impl<'a, T> MaybeCompressor<'a, T>
where T: AsyncWrite +Send + Unpin + 'a
{
pub fn new(raw: &'a mut T, compress: Option<CompKind>) -> Self
{
match compress {
Some(CompKind::Compress) => Self::Compressing(Compressor::new(raw)),
Some(CompKind::Decompress) => Self::Decompressing(Decompressor::new(raw)),
None => Self::Raw(raw),
}
}
}
impl<'a, T> DerefMut for MaybeCompressor<'a, T>
where T: AsyncWrite + Send + Unpin + 'a
{
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::Compressing(t) => t,
Self::Decompressing(t) => t,
Self::Raw(o) => o,
}
}
}
impl<'a, T> Deref for MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + Send + 'a
{
type Target = dyn AsyncWrite + Send + Unpin+ 'a;
fn deref(&self) -> &Self::Target {
match self {
Self::Compressing(t) => t,
Self::Decompressing(t) => t,
Self::Raw(o) => o,
}
}
}
async fn copy_with_limit<R,W>(mut from: R, mut to: W) -> io::Result<usize>
where R: AsyncRead + Unpin,
W: AsyncWrite + Unpin
{
let mut buffer = [0u8; BUFFER_SIZE];
let mut read;
let mut done=0;
while {read = from.read(&mut buffer[..]).await?; read>0}
{
to.write_all(&buffer[..read]).await?;
done+=read;
if done > DESERIALISE_OBJECT_READ_LIMIT {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, eyre!("Exceeded limit, aborting.")
.with_section(|| DESERIALISE_OBJECT_READ_LIMIT.header("Object read size limit was"))
.with_section(|| done.header("Currently read"))));
}
}
Ok(done)
}
/// Deserialise an object from this stream asynchronously
///
/// # Note
/// If the stream is compressed, `compressed` must be set to true or an error will be produced.
/// An autodetect feature may be added in the future
pub async fn read_async<T: DeserializeOwned + Send + 'static, R>(mut from: R, compressed: bool) -> eyre::Result<T>
where R: AsyncRead + Unpin + Send
{
let sect_type_name = || std::any::type_name::<T>().header("Type trying to deserialise was");
let sect_stream_type_name = || std::any::type_name::<R>().header("Stream type was");
let vec = {
let mut vec = Vec::new();
let mut writer = MaybeCompressor::new(&mut vec, compressed.then(|| CompKind::Decompress));
copy_with_limit(&mut from, writer.deref_mut()).await
.wrap_err(eyre!("Failed to copy stream into in-memory buffer"))
.with_section(sect_type_name.clone())
.with_section(sect_stream_type_name.clone())?;
writer.flush().await.wrap_err(eyre!("Failed to flush decompression stream"))?;
writer.shutdown().await.wrap_err(eyre!("Failed to shutdown decompression stream"))?;
vec
};
tokio::task::spawn_blocking(move || {
(serde_cbor::from_slice(&vec[..])
.wrap_err(eyre!("Failed to deseralised decompressed data"))
.with_section(sect_type_name.clone())
.with_section(sect_stream_type_name.clone()),
{drop!(vec vec);}).0
}).await.wrap_err(eyre!("Panic while deserialising decompressed data"))?
}
/// Serialise this object asynchronously
///
/// # Note
/// This compresses the output stream.
/// It cannot be used by `prealloc` read/write functions, as they do not compress.
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T, compress: bool) -> eyre::Result<()>
where W: AsyncWrite + Unpin + Send
{
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was");
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
.wrap_err(eyre!("Failed to serialise item"))
.with_section(sect_stream_type_name.clone())
.with_section(sect_type_name.clone())?;
{
let mut stream = MaybeCompressor::new(&mut to, compress.then(|| CompKind::Compress));
cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>());
stream.write_all(&vec[..])
.await
.wrap_err(eyre!("Failed to write serialised memory to stream"))
.with_section(|| vec.len().to_string().header("Size of the serialised object was"))
.with_section(sect_stream_type_name.clone())
.with_section(sect_type_name.clone())?;
stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?;
stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?;
}
// Extremely overcomplicated concurrent flush+drop.
use futures::FutureExt;
let flush_fut = async {
to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?;
to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?;
Ok::<(), eyre::Report>(())
}.fuse();
tokio::pin!(flush_fut);
tokio::select!{
res = &mut flush_fut => {
return res;
}
_ = async move { drop!(async vec vec); } => {}
}
flush_fut.await
}
#[cfg(feature="prealloc")]
mod prealloc {
use super::*;
use std::os::unix::prelude::*;
use std::fs::File;
use memmap::{MmapMut, Mmap};
/// Write this object as-is to this file descriptor.
///
/// # Note
/// This does not compress like `write_aynsc()` does. It is just a 1-1 dump of the serialisation.
/// Therefore, data written with `write_prealloc()` cannot be then read used with `read_async()`.
///
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
pub fn write_prealloc<T: Serialize>(file: &mut File, item: &T) -> eyre::Result<()>
{
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
.wrap_err(eyre!("Failed to serialise item"))
.with_section(sect_type_name.clone())?;
let fd = file.as_raw_fd();
cfg_eprintln!(Verbose; config::get_global(), "Writing (raw) {} bytes of type {:?} to fd {}", vec.len(), std::any::type_name::<T>(), fd);
unsafe {
if libc::fallocate(fd, 0, 0, vec.len().try_into()
.wrap_err(eyre!("Failed to cast buffer size to `off_t`"))
.with_section(|| vec.len().header("Buffer size was"))
.with_section(|| libc::off_t::MAX.to_string().header("Max value of `off_t` is"))
.with_warning(|| "Usually `off_t` is a signed 64 bit integer. Whereas the buffer's size is unsigned. On systems where `off_t` is 64 bits or higher, this should realistically never happen and probably indicates a bug.")?) < 0 {
// Error
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}.wrap_err("fallocate() failed")
.with_section(|| vec.len().header("Bytes to allocate was"))
.with_suggestion(|| "Make sure there is enough space for the fallocate() call")
.with_suggestion(|| "Make sure we are able to write to the file")?;
// fallocate() succeeded in allocating `vec.len()` bytes to map.
let mut map = unsafe { MmapMut::map_mut(file) }
.wrap_err(eyre!("Failed to map file for read + write"))
.with_section(|| fd.header("fd was"))
.with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;
eyre_assert!(tokio::task::block_in_place(|| unsafe {
bytes::copy_nonoverlapping_unchecked(&vec[..], &mut map[..])
}) == vec.len(); "Length mismatch")
.with_section(|| vec.len().header("Expected"))
.with_section(|| map.len().header("Got"))
.with_warning(|| "This should never happen, it indicates a bug")?;
tokio::task::block_in_place(move || map.flush())
.wrap_err(eyre!("Failed to flush map in place"))?; //map is dropped here
drop!(vec vec);
Ok(())
}
/// Read this object as-is from this file descriptor.
///
/// # Note
/// This does not decompress like `read_aynsc()` does. It is just a 1-1 read of the serialisation.
/// Therefore, `read_prealloc()` cannot be used with data written by `write_async()`.
///
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
// This must be `DeserializeOwned` because the lifetime it is bound to is that of the memory map created and destroyed in the function, not of the fd `file` itself.
pub fn read_prealloc<T: serde::de::DeserializeOwned>(file: &File) -> eyre::Result<T>
{
let map = unsafe { Mmap::map(file) }
.wrap_err(eyre!("Failed to map file for read"))
.with_section(|| file.as_raw_fd().header("fd was"))
.with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;
tokio::task::
block_in_place(move || serde_cbor::from_slice(&map[..]))
.wrap_err(eyre!("Failed to deserialise from map"))
.with_note(|| "The prealloc read and write functions handle only *uncompressed* data. Make sure you're not feeding it compressed data (written with the non-prealloc read and write functions)")
}
}
#[cfg(feature="prealloc")] pub use prealloc::{
write_prealloc as write_sync_map,
read_prealloc as read_sync_map,
};