You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
60 lines
2.0 KiB
60 lines
2.0 KiB
//! For serializing
|
|
use super::*;
|
|
use tokio::prelude::*;
|
|
|
|
use async_compression::tokio_02::write::{
|
|
BzEncoder,
|
|
BzDecoder,
|
|
};
|
|
|
|
type Compressor<T> = BzEncoder<T>;
|
|
type Decompressor<T> = BzDecoder<T>;
|
|
|
|
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
|
|
|
|
/// Serialise this object asynchronously
|
|
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T) -> eyre::Result<()>
|
|
where W: AsyncWrite + Unpin
|
|
{
|
|
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
|
|
let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was");
|
|
|
|
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
|
|
.wrap_err(eyre!("Failed to serialise item"))
|
|
.with_section(sect_stream_type_name.clone())
|
|
.with_section(sect_type_name.clone())?;
|
|
|
|
{
|
|
let mut stream = Compressor::new(&mut to);
|
|
|
|
cfg_eprintln!(Verbose; config::get_global(), "Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>());
|
|
|
|
stream.write_all(&vec[..])
|
|
.await
|
|
.wrap_err(eyre!("Failed to write serialised memory to stream"))
|
|
.with_section(|| vec.len().to_string().header("Size of the serialised object was"))
|
|
.with_section(sect_stream_type_name.clone())
|
|
.with_section(sect_type_name.clone())?;
|
|
|
|
stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?;
|
|
stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?;
|
|
}
|
|
|
|
// Extremely overcomplicated concurrent flush+drop.
|
|
use futures::FutureExt;
|
|
let flush_fut = async {
|
|
to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?;
|
|
to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?;
|
|
Ok::<(), eyre::Report>(())
|
|
}.fuse();
|
|
|
|
tokio::pin!(flush_fut);
|
|
tokio::select!{
|
|
res = &mut flush_fut => {
|
|
return res;
|
|
}
|
|
_ = async move { drop!(async vec vec); } => {}
|
|
}
|
|
flush_fut.await
|
|
}
|