You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
7.9 KiB
228 lines
7.9 KiB
//! For serializing
|
|
use super::*;
|
|
use tokio::prelude::*;
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
use async_compression::tokio_02::write::{
|
|
BzEncoder,
|
|
BzDecoder,
|
|
};
|
|
|
|
type Compressor<T> = BzEncoder<T>;
|
|
type Decompressor<T> = BzDecoder<T>;
|
|
|
|
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
|
|
|
|
#[derive(Debug)]
|
|
enum MaybeCompressor<'a, T>
|
|
{
|
|
Compressing(Compressor<&'a mut T>),
|
|
Decompressing(Decompressor<&'a mut T>),
|
|
Raw(&'a mut T),
|
|
}
|
|
|
|
/// Compress or decompress?
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)]
|
|
enum CompKind
|
|
{
|
|
Compress,
|
|
Decompress
|
|
}
|
|
|
|
impl Default for CompKind
|
|
{
|
|
#[inline]
|
|
fn default() -> Self
|
|
{
|
|
Self::Compress
|
|
}
|
|
}
|
|
|
|
impl<'a, T> MaybeCompressor<'a, T>
|
|
{
|
|
/// What kind is this compressor set to
|
|
pub fn kind(&self) -> Option<CompKind>
|
|
{
|
|
Some(match self {
|
|
Self::Raw(_) => return None,
|
|
Self::Compressing(_) => CompKind::Compress,
|
|
Self::Decompressing(_) => CompKind::Decompress,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<'a, T> MaybeCompressor<'a, T>
|
|
where T: AsyncWrite +Send + Unpin + 'a
|
|
{
|
|
pub fn new(raw: &'a mut T, compress: Option<CompKind>) -> Self
|
|
{
|
|
match compress {
|
|
Some(CompKind::Compress) => Self::Compressing(Compressor::new(raw)),
|
|
Some(CompKind::Decompress) => Self::Decompressing(Decompressor::new(raw)),
|
|
None => Self::Raw(raw),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, T> DerefMut for MaybeCompressor<'a, T>
|
|
where T: AsyncWrite + Send + Unpin + 'a
|
|
{
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
match self {
|
|
Self::Compressing(t) => t,
|
|
Self::Decompressing(t) => t,
|
|
Self::Raw(o) => o,
|
|
}
|
|
}
|
|
}
|
|
impl<'a, T> Deref for MaybeCompressor<'a, T>
|
|
where T: AsyncWrite + Unpin + Send + 'a
|
|
{
|
|
type Target = dyn AsyncWrite + Send + Unpin+ 'a;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
match self {
|
|
Self::Compressing(t) => t,
|
|
Self::Decompressing(t) => t,
|
|
Self::Raw(o) => o,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Serialise this object asynchronously
|
|
///
|
|
/// # Note
|
|
/// This compresses the output stream.
|
|
/// It cannot be used by `prealloc` read/write functions, as they do not compress.
|
|
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T, compress: bool) -> eyre::Result<()>
|
|
where W: AsyncWrite + Unpin + Send
|
|
{
|
|
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
|
|
let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was");
|
|
|
|
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
|
|
.wrap_err(eyre!("Failed to serialise item"))
|
|
.with_section(sect_stream_type_name.clone())
|
|
.with_section(sect_type_name.clone())?;
|
|
|
|
{
|
|
let mut stream = MaybeCompressor::new(&mut to, compress.then(|| CompKind::Compress));
|
|
|
|
cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>());
|
|
|
|
stream.write_all(&vec[..])
|
|
.await
|
|
.wrap_err(eyre!("Failed to write serialised memory to stream"))
|
|
.with_section(|| vec.len().to_string().header("Size of the serialised object was"))
|
|
.with_section(sect_stream_type_name.clone())
|
|
.with_section(sect_type_name.clone())?;
|
|
|
|
stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?;
|
|
stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?;
|
|
}
|
|
|
|
// Extremely overcomplicated concurrent flush+drop.
|
|
use futures::FutureExt;
|
|
let flush_fut = async {
|
|
to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?;
|
|
to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?;
|
|
Ok::<(), eyre::Report>(())
|
|
}.fuse();
|
|
|
|
tokio::pin!(flush_fut);
|
|
tokio::select!{
|
|
res = &mut flush_fut => {
|
|
return res;
|
|
}
|
|
_ = async move { drop!(async vec vec); } => {}
|
|
}
|
|
flush_fut.await
|
|
}
|
|
|
|
#[cfg(feature="prealloc")]
|
|
mod prealloc {
|
|
use super::*;
|
|
use std::os::unix::prelude::*;
|
|
use std::fs::File;
|
|
use memmap::{MmapMut, Mmap};
|
|
|
|
/// Write this object as-is to this file descriptor.
|
|
///
|
|
/// # Note
|
|
/// This does not compress like `write_aynsc()` does. It is just a 1-1 dump of the serialisation.
|
|
/// Therefore, data written with `write_prealloc()` cannot be then read used with `read_async()`.
|
|
///
|
|
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
|
|
pub fn write_prealloc<T: Serialize>(file: &mut File, item: &T) -> eyre::Result<()>
|
|
{
|
|
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
|
|
|
|
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
|
|
.wrap_err(eyre!("Failed to serialise item"))
|
|
.with_section(sect_type_name.clone())?;
|
|
|
|
let fd = file.as_raw_fd();
|
|
|
|
cfg_eprintln!(Verbose; config::get_global(), "Writing (raw) {} bytes of type {:?} to fd {}", vec.len(), std::any::type_name::<T>(), fd);
|
|
|
|
unsafe {
|
|
if libc::fallocate(fd, 0, 0, vec.len().try_into()
|
|
.wrap_err(eyre!("Failed to cast buffer size to `off_t`"))
|
|
.with_section(|| vec.len().header("Buffer size was"))
|
|
.with_section(|| libc::off_t::MAX.to_string().header("Max value of `off_t` is"))
|
|
.with_warning(|| "Usually `off_t` is a signed 64 bit integer. Whereas the buffer's size is unsigned. On systems where `off_t` is 64 bits or higher, this should realistically never happen and probably indicates a bug.")?) < 0 {
|
|
// Error
|
|
Err(std::io::Error::last_os_error())
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}.wrap_err("fallocate() failed")
|
|
.with_section(|| vec.len().header("Bytes to allocate was"))
|
|
.with_suggestion(|| "Make sure there is enough space for the fallocate() call")
|
|
.with_suggestion(|| "Make sure we are able to write to the file")?;
|
|
// fallocate() succeeded in allocating `vec.len()` bytes to map.
|
|
let mut map = unsafe { MmapMut::map_mut(file) }
|
|
.wrap_err(eyre!("Failed to map file for read + write"))
|
|
.with_section(|| fd.header("fd was"))
|
|
.with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;
|
|
|
|
eyre_assert!(tokio::task::block_in_place(|| unsafe {
|
|
bytes::copy_nonoverlapping_unchecked(&vec[..], &mut map[..])
|
|
}) == vec.len(); "Length mismatch")
|
|
.with_section(|| vec.len().header("Expected"))
|
|
.with_section(|| map.len().header("Got"))
|
|
.with_warning(|| "This should never happen, it indicates a bug")?;
|
|
|
|
tokio::task::block_in_place(move || map.flush())
|
|
.wrap_err(eyre!("Failed to flush map in place"))?; //map is dropped here
|
|
|
|
drop!(vec vec);
|
|
Ok(())
|
|
}
|
|
|
|
/// Read this object as-is from this file descriptor.
|
|
///
|
|
/// # Note
|
|
/// This does not decompress like `read_aynsc()` does. It is just a 1-1 read of the serialisation.
|
|
/// Therefore, `read_prealloc()` cannot be used with data written by `write_async()`.
|
|
///
|
|
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
|
|
// This must be `DeserializeOwned` because the lifetime it is bound to is that of the memory map created and destroyed in the function, not of the fd `file` itself.
|
|
pub fn read_prealloc<T: serde::de::DeserializeOwned>(file: &File) -> eyre::Result<T>
|
|
{
|
|
let map = unsafe { Mmap::map(file) }
|
|
.wrap_err(eyre!("Failed to map file for read + write"))
|
|
.with_section(|| file.as_raw_fd().header("fd was"))
|
|
.with_suggestion(|| "Do we have the premissions for both reading and writing of this file and fd?")?;
|
|
|
|
tokio::task::
|
|
block_in_place(move || serde_cbor::from_slice(&map[..]))
|
|
.wrap_err(eyre!("Failed to deserialise from map"))
|
|
.with_note(|| "The prealloc read and write functions handle only *uncompressed* data. Make sure you're not feeding it compressed data (written with the non-prealloc read and write functions)")
|
|
}
|
|
}
|
|
#[cfg(feature="prealloc")] pub use prealloc::{
|
|
write_prealloc as write_sync_map,
|
|
read_prealloc as read_sync_map,
|
|
};
|