//! For serializing
use super ::* ;
use tokio ::prelude ::* ;
use std ::ops ::{ Deref , DerefMut } ;
use serde ::de ::DeserializeOwned ;
use async_compression ::tokio_02 ::write ::{
BzEncoder ,
BzDecoder ,
} ;
type Compressor < T > = BzEncoder < T > ;
type Decompressor < T > = BzDecoder < T > ;
const DEFER_DROP_SIZE_FLOOR : usize = 1024 * 1024 ; // 1 MB
const DESERIALISE_OBJECT_READ_LIMIT : usize = 1024 * 1024 * 1024 * 2 ; // 2GB
const BUFFER_SIZE : usize = 4096 ;
#[ derive(Debug) ]
enum MaybeCompressor < ' a , T >
{
Compressing ( Compressor < & ' a mut T > ) ,
Decompressing ( Decompressor < & ' a mut T > ) ,
Raw ( & ' a mut T ) ,
}
/// Compress or decompress?
#[ derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord) ]
enum CompKind
{
Compress ,
Decompress
}
impl Default for CompKind
{
#[ inline ]
fn default ( ) -> Self
{
Self ::Compress
}
}
impl < ' a , T > MaybeCompressor < ' a , T >
{
/// What kind is this compressor set to
pub fn kind ( & self ) -> Option < CompKind >
{
Some ( match self {
Self ::Raw ( _ ) = > return None ,
Self ::Compressing ( _ ) = > CompKind ::Compress ,
Self ::Decompressing ( _ ) = > CompKind ::Decompress ,
} )
}
}
impl < ' a , T > MaybeCompressor < ' a , T >
where T : AsyncWrite + Send + Unpin + ' a
{
pub fn new ( raw : & ' a mut T , compress : Option < CompKind > ) -> Self
{
match compress {
Some ( CompKind ::Compress ) = > Self ::Compressing ( Compressor ::new ( raw ) ) ,
Some ( CompKind ::Decompress ) = > Self ::Decompressing ( Decompressor ::new ( raw ) ) ,
None = > Self ::Raw ( raw ) ,
}
}
}
impl < ' a , T > DerefMut for MaybeCompressor < ' a , T >
where T : AsyncWrite + Send + Unpin + ' a
{
fn deref_mut ( & mut self ) -> & mut Self ::Target {
match self {
Self ::Compressing ( t ) = > t ,
Self ::Decompressing ( t ) = > t ,
Self ::Raw ( o ) = > o ,
}
}
}
impl < ' a , T > Deref for MaybeCompressor < ' a , T >
where T : AsyncWrite + Unpin + Send + ' a
{
type Target = dyn AsyncWrite + Send + Unpin + ' a ;
fn deref ( & self ) -> & Self ::Target {
match self {
Self ::Compressing ( t ) = > t ,
Self ::Decompressing ( t ) = > t ,
Self ::Raw ( o ) = > o ,
}
}
}
async fn copy_with_limit < R , W > ( mut from : R , mut to : W ) -> io ::Result < usize >
where R : AsyncRead + Unpin ,
W : AsyncWrite + Unpin
{
let mut buffer = [ 0 u8 ; BUFFER_SIZE ] ;
let mut read ;
let mut done = 0 ;
while { read = from . read ( & mut buffer [ .. ] ) . await ? ; read > 0 }
{
to . write_all ( & buffer [ .. read ] ) . await ? ;
done + = read ;
if done > DESERIALISE_OBJECT_READ_LIMIT {
return Err ( io ::Error ::new ( io ::ErrorKind ::ConnectionAborted , eyre ! ( "Exceeded limit, aborting." )
. with_section ( | | DESERIALISE_OBJECT_READ_LIMIT . header ( "Object read size limit was" ) )
. with_section ( | | done . header ( "Currently read" ) ) ) ) ;
}
}
Ok ( done )
}
/// Deserialise an object from this stream asynchronously
///
/// # Note
/// If the stream is compressed, `compressed` must be set to true or an error will be produced.
/// An autodetect feature may be added in the future
pub async fn read_async < T : DeserializeOwned + Send + ' static , R > ( mut from : R , compressed : bool ) -> eyre ::Result < T >
where R : AsyncRead + Unpin + Send
{
let sect_type_name = | | std ::any ::type_name ::< T > ( ) . header ( "Type trying to deserialise was" ) ;
let sect_stream_type_name = | | std ::any ::type_name ::< R > ( ) . header ( "Stream type was" ) ;
let vec = {
let mut vec = Vec ::new ( ) ;
let mut writer = MaybeCompressor ::new ( & mut vec , compressed . then ( | | CompKind ::Decompress ) ) ;
copy_with_limit ( & mut from , writer . deref_mut ( ) ) . await
. wrap_err ( eyre ! ( "Failed to copy stream into in-memory buffer" ) )
. with_section ( sect_type_name . clone ( ) )
. with_section ( sect_stream_type_name . clone ( ) ) ? ;
writer . flush ( ) . await . wrap_err ( eyre ! ( "Failed to flush decompression stream" ) ) ? ;
writer . shutdown ( ) . await . wrap_err ( eyre ! ( "Failed to shutdown decompression stream" ) ) ? ;
vec
} ;
tokio ::task ::spawn_blocking ( move | | {
( serde_cbor ::from_slice ( & vec [ .. ] )
. wrap_err ( eyre ! ( "Failed to deseralised decompressed data" ) )
. with_section ( sect_type_name . clone ( ) )
. with_section ( sect_stream_type_name . clone ( ) ) ,
{ drop ! ( vec vec ) ; } ) . 0
} ) . await . wrap_err ( eyre ! ( "Panic while deserialising decompressed data" ) ) ?
}
/// Serialise this object asynchronously
///
/// # Note
/// This compresses the output stream.
/// It cannot be used by `prealloc` read/write functions, as they do not compress.
pub async fn write_async < T : Serialize , W > ( mut to : W , item : & T , compress : bool ) -> eyre ::Result < ( ) >
where W : AsyncWrite + Unpin + Send
{
let sect_type_name = | | std ::any ::type_name ::< T > ( ) . header ( "Type trying to serialise was" ) ;
let sect_stream_type_name = | | std ::any ::type_name ::< W > ( ) . header ( "Stream type was" ) ;
let vec = tokio ::task ::block_in_place ( | | serde_cbor ::to_vec ( item ) )
. wrap_err ( eyre ! ( "Failed to serialise item" ) )
. with_section ( sect_stream_type_name . clone ( ) )
. with_section ( sect_type_name . clone ( ) ) ? ;
{
let mut stream = MaybeCompressor ::new ( & mut to , compress . then ( | | CompKind ::Compress ) ) ;
cfg_eprintln ! ( Verbose ; config ::get_global ( ) , "Writing {} bytes of type {:?} to stream of type {:?}" , vec . len ( ) , std ::any ::type_name ::< T > ( ) , std ::any ::type_name ::< W > ( ) ) ;
stream . write_all ( & vec [ .. ] )
. await
. wrap_err ( eyre ! ( "Failed to write serialised memory to stream" ) )
. with_section ( | | vec . len ( ) . to_string ( ) . header ( "Size of the serialised object was" ) )
. with_section ( sect_stream_type_name . clone ( ) )
. with_section ( sect_type_name . clone ( ) ) ? ;
stream . flush ( ) . await . wrap_err ( eyre ! ( "Failed to flush output compression stream" ) ) ? ;
stream . shutdown ( ) . await . wrap_err ( eyre ! ( "Failed to shutdown output compression stream" ) ) ? ;
}
// Extremely overcomplicated concurrent flush+drop.
use futures ::FutureExt ;
let flush_fut = async {
to . flush ( ) . await . wrap_err ( eyre ! ( "Failed to flush output backing stream" ) ) ? ;
to . shutdown ( ) . await . wrap_err ( eyre ! ( "Failed to shutdown output backing stream" ) ) ? ;
Ok ::< ( ) , eyre ::Report > ( ( ) )
} . fuse ( ) ;
tokio ::pin ! ( flush_fut ) ;
tokio ::select ! {
res = & mut flush_fut = > {
return res ;
}
_ = async move { drop ! ( async vec vec ) ; } = > { }
}
flush_fut . await
}
#[ cfg(feature= " prealloc " ) ]
mod prealloc {
use super ::* ;
use std ::os ::unix ::prelude ::* ;
use std ::fs ::File ;
use memmap ::{ MmapMut , Mmap } ;
/// Write this object as-is to this file descriptor.
///
/// # Note
/// This does not compress like `write_aynsc()` does. It is just a 1-1 dump of the serialisation.
/// Therefore, data written with `write_prealloc()` cannot be then read used with `read_async()`.
///
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
pub fn write_prealloc < T : Serialize > ( file : & mut File , item : & T ) -> eyre ::Result < ( ) >
{
let sect_type_name = | | std ::any ::type_name ::< T > ( ) . header ( "Type trying to serialise was" ) ;
let vec = tokio ::task ::block_in_place ( | | serde_cbor ::to_vec ( item ) )
. wrap_err ( eyre ! ( "Failed to serialise item" ) )
. with_section ( sect_type_name . clone ( ) ) ? ;
let fd = file . as_raw_fd ( ) ;
cfg_eprintln ! ( Verbose ; config ::get_global ( ) , "Writing (raw) {} bytes of type {:?} to fd {}" , vec . len ( ) , std ::any ::type_name ::< T > ( ) , fd ) ;
unsafe {
if libc ::fallocate ( fd , 0 , 0 , vec . len ( ) . try_into ( )
. wrap_err ( eyre ! ( "Failed to cast buffer size to `off_t`" ) )
. with_section ( | | vec . len ( ) . header ( "Buffer size was" ) )
. with_section ( | | libc ::off_t ::MAX . to_string ( ) . header ( "Max value of `off_t` is" ) )
. with_warning ( | | "Usually `off_t` is a signed 64 bit integer. Whereas the buffer's size is unsigned. On systems where `off_t` is 64 bits or higher, this should realistically never happen and probably indicates a bug." ) ? ) < 0 {
// Error
Err ( std ::io ::Error ::last_os_error ( ) )
} else {
Ok ( ( ) )
}
} . wrap_err ( "fallocate() failed" )
. with_section ( | | vec . len ( ) . header ( "Bytes to allocate was" ) )
. with_suggestion ( | | "Make sure there is enough space for the fallocate() call" )
. with_suggestion ( | | "Make sure we are able to write to the file" ) ? ;
// fallocate() succeeded in allocating `vec.len()` bytes to map.
let mut map = unsafe { MmapMut ::map_mut ( file ) }
. wrap_err ( eyre ! ( "Failed to map file for read + write" ) )
. with_section ( | | fd . header ( "fd was" ) )
. with_suggestion ( | | "Do we have the premissions for both reading and writing of this file and fd?" ) ? ;
eyre_assert ! ( tokio ::task ::block_in_place ( | | unsafe {
bytes ::copy_nonoverlapping_unchecked ( & vec [ .. ] , & mut map [ .. ] )
} ) = = vec . len ( ) ; "Length mismatch" )
. with_section ( | | vec . len ( ) . header ( "Expected" ) )
. with_section ( | | map . len ( ) . header ( "Got" ) )
. with_warning ( | | "This should never happen, it indicates a bug" ) ? ;
tokio ::task ::block_in_place ( move | | map . flush ( ) )
. wrap_err ( eyre ! ( "Failed to flush map in place" ) ) ? ; //map is dropped here
drop ! ( vec vec ) ;
Ok ( ( ) )
}
/// Read this object as-is from this file descriptor.
///
/// # Note
/// This does not decompress like `read_aynsc()` does. It is just a 1-1 read of the serialisation.
/// Therefore, `read_prealloc()` cannot be used with data written by `write_async()`.
///
/// This is a completely synchronous operation. You should use it with `spawn_blocking` et al. to prevent task hangups.
// This must be `DeserializeOwned` because the lifetime it is bound to is that of the memory map created and destroyed in the function, not of the fd `file` itself.
pub fn read_prealloc < T : serde ::de ::DeserializeOwned > ( file : & File ) -> eyre ::Result < T >
{
let map = unsafe { Mmap ::map ( file ) }
. wrap_err ( eyre ! ( "Failed to map file for read" ) )
. with_section ( | | file . as_raw_fd ( ) . header ( "fd was" ) )
. with_suggestion ( | | "Do we have the premissions for both reading and writing of this file and fd?" ) ? ;
tokio ::task ::
block_in_place ( move | | serde_cbor ::from_slice ( & map [ .. ] ) )
. wrap_err ( eyre ! ( "Failed to deserialise from map" ) )
. with_note ( | | "The prealloc read and write functions handle only *uncompressed* data. Make sure you're not feeding it compressed data (written with the non-prealloc read and write functions)" )
}
}
#[ cfg(feature= " prealloc " ) ] pub use prealloc ::{
write_prealloc as write_sync_map ,
read_prealloc as read_sync_map ,
} ;