Compare commits
No commits in common. 'io-uring-async-support' and 'master' have entirely different histories.
io-uring-a
...
master
@ -1,688 +0,0 @@
|
|||||||
//! Handles the chain load/save format
|
|
||||||
use super::*;
|
|
||||||
use std::{
|
|
||||||
io::{
|
|
||||||
self,
|
|
||||||
Read, Write, BufRead,
|
|
||||||
},
|
|
||||||
fmt,
|
|
||||||
};
|
|
||||||
use bytes::{
|
|
||||||
Buf, BufMut, Bytes,
|
|
||||||
};
|
|
||||||
use zstd::{
|
|
||||||
Encoder, Decoder,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// The chain that can be saved / loaded.
|
|
||||||
pub type Chain<T = String> = crate::Chain<T>;
|
|
||||||
|
|
||||||
/// The version of the encoded format stream
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
|
|
||||||
#[repr(packed)]
|
|
||||||
pub struct Version(pub u8,pub u8,pub u8,pub u8);
|
|
||||||
|
|
||||||
impl fmt::Display for Version
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
||||||
{
|
|
||||||
write!(f, "{}.{}.{}", self.0,self.1, self.2)?;
|
|
||||||
if self.3 != 0 {
|
|
||||||
write!(f, "r{}", self.3)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl Version {
|
|
||||||
/// Current save version
|
|
||||||
pub const CURRENT: Self = Version(0,0,0,0);
|
|
||||||
|
|
||||||
/// Current value as a native integer
|
|
||||||
const CURRENT_VALUE: u32 = Self::CURRENT.as_native();
|
|
||||||
|
|
||||||
pub const fn as_native(&self) -> u32 {
|
|
||||||
u32::from_be_bytes([self.0, self.1, self.2, self.3])
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub const fn from_native(value: u32) -> Self {
|
|
||||||
let [a,b,c,d] = u32::to_be_bytes(value);
|
|
||||||
Self(a,b,c,d)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Version
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn default() -> Self
|
|
||||||
{
|
|
||||||
Self::CURRENT
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub unsafe trait AutoBinaryFormat: Sized {
|
|
||||||
#[inline]
|
|
||||||
fn as_raw_for_encode(&self) -> *const [u8] {
|
|
||||||
let ptr = self as *const Self;
|
|
||||||
std::ptr::slice_from_raw_parts(ptr as *const u8, std::mem::size_of::<Self>())
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn as_raw_for_decode(&mut self) -> *mut [u8] {
|
|
||||||
let ptr = self as *mut Self;
|
|
||||||
std::ptr::slice_from_raw_parts_mut(ptr as *mut u8, std::mem::size_of::<Self>())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn raw_format_read_size(&mut self) -> usize {
|
|
||||||
std::mem::size_of::<Self>()
|
|
||||||
}
|
|
||||||
fn raw_format_write_size(&self) -> usize {
|
|
||||||
std::mem::size_of::<Self>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<T, const N: usize> AutoBinaryFormat for [T; N] {}
|
|
||||||
|
|
||||||
pub trait BinaryFormat {
|
|
||||||
fn read_from<S: ?Sized>(&mut self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Read;
|
|
||||||
|
|
||||||
fn write_to<S: ?Sized>(&self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Write;
|
|
||||||
|
|
||||||
fn binary_format_read_size(&mut self) -> Option<usize>;
|
|
||||||
fn binary_format_write_size(&self) -> usize;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> BinaryFormat for T
|
|
||||||
where T: AutoBinaryFormat
|
|
||||||
{
|
|
||||||
#[inline(always)]
|
|
||||||
fn read_from<S: ?Sized>(&mut self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Read {
|
|
||||||
let ptr = self.as_raw_for_decode();
|
|
||||||
// SAFETY: The read data is guaranteed to be valid here.
|
|
||||||
Ok(unsafe {
|
|
||||||
stream.read_exact(&mut *ptr)?;
|
|
||||||
(*ptr).len()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
#[inline(always)]
|
|
||||||
fn write_to<S: ?Sized>(&self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Write {
|
|
||||||
let ptr = self.as_raw_for_encode();
|
|
||||||
// SAFETY: The written data is guaranteed to be valid here.
|
|
||||||
Ok(unsafe {
|
|
||||||
stream.write_all(&*ptr)?;
|
|
||||||
(*ptr).len()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_read_size(&mut self) -> Option<usize> {
|
|
||||||
Some(self.raw_format_read_size())
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_write_size(&self) -> usize {
|
|
||||||
self.raw_format_write_size()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> BinaryFormat for [T]
|
|
||||||
where T: BinaryFormat
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn read_from<S: ?Sized>(&mut self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Read {
|
|
||||||
let mut sz = 0;
|
|
||||||
for i in self.iter_mut() {
|
|
||||||
sz += i.read_from(stream)?;
|
|
||||||
}
|
|
||||||
Ok(sz)
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn write_to<S: ?Sized>(&self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Write {
|
|
||||||
let mut sz =0;
|
|
||||||
for i in self.iter() {
|
|
||||||
sz += i.write_to(stream)?;
|
|
||||||
}
|
|
||||||
Ok(sz)
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_read_size(&mut self) -> Option<usize> {
|
|
||||||
self.iter_mut().map(|x| x.binary_format_read_size()).try_fold(0, |x, y| {
|
|
||||||
match (x, y) {
|
|
||||||
(x, Some(y)) => Some(x + y),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_write_size(&self) -> usize {
|
|
||||||
self.iter().map(|x| x.binary_format_write_size()).sum()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BinaryFormat for Version {
|
|
||||||
#[inline]
|
|
||||||
fn read_from<S: ?Sized>(&mut self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Read {
|
|
||||||
let mut vi = [0u8; 4];
|
|
||||||
stream.read_exact(&mut vi[..])?;
|
|
||||||
Ok(4)
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn write_to<S: ?Sized>(&self, stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Write {
|
|
||||||
let vi = [self.0,self.1,self.2,self.3];
|
|
||||||
stream.write_all(&vi[..])?;
|
|
||||||
Ok(4)
|
|
||||||
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_read_size(&mut self) -> Option<usize> {
|
|
||||||
Some(std::mem::size_of::<u8>() * 4)
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_write_size(&self) -> usize {
|
|
||||||
std::mem::size_of::<u8>() * 4
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, Default)]
|
|
||||||
#[repr(u32)]
|
|
||||||
pub enum Compressed {
|
|
||||||
#[default]
|
|
||||||
No = 0,
|
|
||||||
Zstd = 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Compressed {
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
const fn to_int(&self) -> u32 {
|
|
||||||
*self as u32
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn try_from_int(val: u32) -> Option<Self> {
|
|
||||||
match val {
|
|
||||||
// SAFETY: These variants are known
|
|
||||||
0..=1 => Some(unsafe {
|
|
||||||
std::mem::transmute(val)
|
|
||||||
}),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
|
|
||||||
pub struct FormatMetadata {
|
|
||||||
pub version: Version,
|
|
||||||
|
|
||||||
pub compressed: Compressed,
|
|
||||||
pub chain_size: usize, // NOTE: Unused
|
|
||||||
pub checksum: u64, // NOTE: Unused
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FormatMetadata {
|
|
||||||
const MAGIC_NUM: &[u8; 8] = b"MARKOV\x00\xcf";
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BinaryFormat for FormatMetadata
|
|
||||||
{
|
|
||||||
fn write_to<S: ?Sized>(&self, mut stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Write {
|
|
||||||
let sz = self.version.write_to(&mut stream)?;
|
|
||||||
|
|
||||||
let mut obuf = [0u8; std::mem::size_of::<u32>() +std::mem::size_of::<u64>() + std::mem::size_of::<u64>()];
|
|
||||||
{
|
|
||||||
let mut obuf = &mut obuf[..];
|
|
||||||
use std::convert::TryInto;
|
|
||||||
|
|
||||||
obuf.put_u32(self.compressed.to_int());
|
|
||||||
obuf.put_u64(self.chain_size.try_into().map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Chain size attribute out-of-bounds for format size"))?);
|
|
||||||
obuf.put_u64(self.checksum);
|
|
||||||
}
|
|
||||||
stream.write_all(&obuf[..])?;
|
|
||||||
|
|
||||||
Ok(sz + obuf.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_from<S: ?Sized>(&mut self, mut stream: &mut S) -> io::Result<usize>
|
|
||||||
where S: io::Read {
|
|
||||||
let sz = self.version.read_from(&mut stream)?;
|
|
||||||
|
|
||||||
if self.version > Version::CURRENT {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Unsupported, format!("Unknown format version {}", self.version)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut ibuf = [0u8; std::mem::size_of::<u32>() +std::mem::size_of::<u64>() + std::mem::size_of::<u64>()];
|
|
||||||
stream.read_exact(&mut ibuf[..])?;
|
|
||||||
{
|
|
||||||
let mut ibuf = &ibuf[..];
|
|
||||||
use std::convert::TryInto;
|
|
||||||
|
|
||||||
self.compressed = Compressed::try_from_int(ibuf.get_u32()).ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid compression attribute"))?;
|
|
||||||
self.chain_size = ibuf.get_u64().try_into().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Chain size attribute out-of-bounds for native size"))?;
|
|
||||||
self.checksum = ibuf.get_u64();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(sz + ibuf.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn binary_format_read_size(&mut self) -> Option<usize> {
|
|
||||||
let szm = self.version.binary_format_read_size()?;
|
|
||||||
Some(szm + std::mem::size_of::<u32>()
|
|
||||||
+ std::mem::size_of::<u64>()
|
|
||||||
+ std::mem::size_of::<u64>())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn binary_format_write_size(&self) -> usize {
|
|
||||||
self.version.binary_format_write_size()
|
|
||||||
+ std::mem::size_of::<u32>()
|
|
||||||
+ std::mem::size_of::<u64>()
|
|
||||||
+ std::mem::size_of::<u64>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Load a chain from a stream
|
|
||||||
#[inline]
|
|
||||||
pub fn load_chain_from_sync<S>(stream: &mut S) -> io::Result<Chain<String>>
|
|
||||||
where S: io::Read + ?Sized
|
|
||||||
{
|
|
||||||
let mut stream = io::BufReader::new(stream);
|
|
||||||
{
|
|
||||||
let mut magic = FormatMetadata::MAGIC_NUM.clone();
|
|
||||||
stream.read_exact(&mut magic[..])?;
|
|
||||||
|
|
||||||
if &magic != FormatMetadata::MAGIC_NUM {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid file header tag magic number"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let metadata = {
|
|
||||||
let mut metadata = FormatMetadata::default();
|
|
||||||
metadata.read_from(&mut stream)?;
|
|
||||||
metadata
|
|
||||||
};
|
|
||||||
|
|
||||||
match metadata.version {
|
|
||||||
Version::CURRENT => {
|
|
||||||
let read = |read: &mut (dyn io::Read)| serde_cbor::from_reader(read).expect("Failed to read chain from input stream"); // TODO: Error type
|
|
||||||
|
|
||||||
match metadata.compressed {
|
|
||||||
Compressed::No =>
|
|
||||||
Ok(read(&mut stream)),
|
|
||||||
Compressed::Zstd => {
|
|
||||||
let mut stream = zstd::Decoder::with_buffer(stream)?;
|
|
||||||
|
|
||||||
//#[cfg(feature="threads")]
|
|
||||||
//stream.multithread(num_cpus::get() as i32);
|
|
||||||
|
|
||||||
//NOTE: Not required here: //stream.finish()?;
|
|
||||||
Ok(read(&mut stream))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
unsup => {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Unsupported, format!("Unsupported payload version {}", unsup)));
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Save a chain to a stream with optional compression.
|
|
||||||
#[inline]
|
|
||||||
pub fn save_chain_to_sync<S>(stream: &mut S, chain: &Chain<String>, compress: bool) -> io::Result<()>
|
|
||||||
where S: io::Write + ?Sized
|
|
||||||
{
|
|
||||||
let mut stream = io::BufWriter::new(stream);
|
|
||||||
|
|
||||||
let metadata = FormatMetadata {
|
|
||||||
compressed: compress
|
|
||||||
.then_some(Compressed::Zstd)
|
|
||||||
.unwrap_or(Compressed::No),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
stream.write_all(FormatMetadata::MAGIC_NUM)?;
|
|
||||||
metadata.write_to(&mut stream)?;
|
|
||||||
|
|
||||||
let write = |stream: &mut (dyn io::Write)| serde_cbor::to_writer(stream, chain).expect("Failed to write chain to output stream"); // TODO: Error type
|
|
||||||
|
|
||||||
let mut stream = match metadata.compressed {
|
|
||||||
Compressed::No => {
|
|
||||||
write(&mut stream);
|
|
||||||
stream
|
|
||||||
},
|
|
||||||
Compressed::Zstd => {
|
|
||||||
let mut stream = zstd::Encoder::new(stream, 22)?;
|
|
||||||
|
|
||||||
#[cfg(feature="threads")]
|
|
||||||
stream.multithread(num_cpus::get() as u32)?;
|
|
||||||
|
|
||||||
write(&mut stream);
|
|
||||||
// XXX: Should we flush after write here..?
|
|
||||||
|
|
||||||
// NOTE: Required here.
|
|
||||||
stream.finish()?
|
|
||||||
},
|
|
||||||
};
|
|
||||||
stream.flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature="io_uring")]
|
|
||||||
#[deprecated = "Runs through compression *twice* (one on the sync side, even) see below. Must be re-written to properly use `io_uring::*`"]
|
|
||||||
pub fn save_chain_to_file(output_file: std::fs::File, chain: &Chain<String>, compress: bool) -> io::Result<()>
|
|
||||||
{
|
|
||||||
compile_error!("TODO: Make `save_to_chain() that takes callback for write stream to create compression stream & write to it, since io_uring backing takes care of that, but we want the uncompressed metadata writing to be the same.");
|
|
||||||
// let output_file = fs::OpenOptions::new()
|
|
||||||
// .create_new(! force)
|
|
||||||
// .create(true)
|
|
||||||
// .truncate(force)
|
|
||||||
// .write(true)
|
|
||||||
// .open(path);
|
|
||||||
|
|
||||||
let (mut stream, bg) = io_uring::create_write_compressed(output_file)?;
|
|
||||||
todo!("^^^ Ehh... No, this isn't right... `stream` is the *encoder* stream, we still need to do the metadata thing done in `_sync`()... XXX: Can we factor that out to something that will take a callback maybe??? Idk... im tired...");
|
|
||||||
|
|
||||||
save_chain_to_sync(&mut stream, chain, compress)?;
|
|
||||||
stream.flush()?;
|
|
||||||
drop(stream); // NOTE: Drop the sending side so the recv side can know there is no more data. **MUST** be done before the `.join()`.
|
|
||||||
|
|
||||||
bg.join().expect("Fatal error in background I/O thread")?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature="io_uring")]
|
|
||||||
mod io_uring {
|
|
||||||
use super::*;
|
|
||||||
use std::{
|
|
||||||
path::Path,
|
|
||||||
};
|
|
||||||
use futures::{
|
|
||||||
prelude::*,
|
|
||||||
io::{
|
|
||||||
//AsyncRead,
|
|
||||||
//AsyncBufRead,
|
|
||||||
//AsyncWrite,
|
|
||||||
|
|
||||||
AllowStdIo,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use tokio::{
|
|
||||||
io::{
|
|
||||||
AsyncRead,
|
|
||||||
AsyncBufRead,
|
|
||||||
AsyncWrite,
|
|
||||||
|
|
||||||
ReadHalf, WriteHalf,
|
|
||||||
SimplexStream,
|
|
||||||
simplex,
|
|
||||||
|
|
||||||
AsyncReadExt,
|
|
||||||
AsyncWriteExt,
|
|
||||||
},
|
|
||||||
|
|
||||||
};
|
|
||||||
use tokio_uring::{
|
|
||||||
fs,
|
|
||||||
buf,
|
|
||||||
};
|
|
||||||
use async_compression::{
|
|
||||||
Level,
|
|
||||||
zstd::CParameter,
|
|
||||||
|
|
||||||
tokio::{
|
|
||||||
bufread::{
|
|
||||||
ZstdDecoder,
|
|
||||||
ZstdEncoder,
|
|
||||||
},
|
|
||||||
write as zstd_write,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
// Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file. (Maybe default-feature-gate this?)
|
|
||||||
|
|
||||||
/// Creates an future that, when executed, reads all data from `reader` via `io_uring`, into generic byte sink `writer`.
|
|
||||||
///
|
|
||||||
/// # Task dispatch
|
|
||||||
/// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
|
|
||||||
#[inline]
|
|
||||||
#[must_use]
|
|
||||||
fn create_read_pump<'f, B>(reader: &'f fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
|
|
||||||
where B: AsyncWrite
|
|
||||||
{
|
|
||||||
async move {
|
|
||||||
let mut pos = 0;
|
|
||||||
let mut buf = Vec::with_capacity(4096);
|
|
||||||
|
|
||||||
futures::pin_mut!(writer);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (bytes, nbuf) = reader.read_at(buf, pos as u64).await;
|
|
||||||
match bytes? {
|
|
||||||
0 => break,
|
|
||||||
bytes => {
|
|
||||||
// Push `&nbuf[..bytes]` to pump.
|
|
||||||
writer.write_all(&nbuf[..bytes]).await?;
|
|
||||||
|
|
||||||
pos += bytes;
|
|
||||||
buf = nbuf;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok::<_, std::io::Error>(pos)
|
|
||||||
}//).map(|_| ())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Creates an future that, when executed, reads all data from generic byte source `reader` and writes them into `writer` through `io_uring`.
|
|
||||||
///
|
|
||||||
/// # Task dispatch
|
|
||||||
/// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
|
|
||||||
fn create_write_pump<'f, B>(reader: B, writer: &'f fs::File) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
|
|
||||||
where B: AsyncRead
|
|
||||||
{
|
|
||||||
async move {
|
|
||||||
let mut pos =0;
|
|
||||||
let mut buf = vec![0u8; 4096];
|
|
||||||
|
|
||||||
futures::pin_mut!(reader);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let sz = reader.read(&mut buf[..]).await?;
|
|
||||||
if sz == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
buf.truncate(sz);
|
|
||||||
|
|
||||||
let (res, nbuf) = writer.write_all_at(buf, pos as u64).await;
|
|
||||||
res?;
|
|
||||||
pos += sz;
|
|
||||||
buf = nbuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<usize, std::io::Error>(pos)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns a task that reads all bytes from `reader` via `io_uring` and writes them into generic byte sink `writer`.
|
|
||||||
/// Returns a handle to the async task that completes when the task has completed.
|
|
||||||
///
|
|
||||||
/// # Runtime
|
|
||||||
/// The file is closed after the operation completes, ownership is transfered into the background task.
|
|
||||||
///
|
|
||||||
/// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
|
|
||||||
fn spawn_read_pump<B>(reader: fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
|
|
||||||
where B: AsyncWrite + Send + 'static
|
|
||||||
{
|
|
||||||
tokio_uring::spawn(async move {
|
|
||||||
let sz = create_read_pump(&reader, writer).await?;
|
|
||||||
let _ = reader.sync_data().await;
|
|
||||||
reader.close().await.map(move |_| sz)
|
|
||||||
}).map(|handle| handle.expect("Background reading task panic"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns a task that reads all bytes from generic byte source `reader` and writes them into `writer` via `io_uring`.
|
|
||||||
/// Returns a handle to the async task that completes when the task has completed.
|
|
||||||
///
|
|
||||||
/// # Runtime
|
|
||||||
/// The file is closed after the operation completes, ownership is transfered into the background task.
|
|
||||||
///
|
|
||||||
/// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
|
|
||||||
fn spawn_write_pump<B>(reader: B, writer: fs::File) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
|
|
||||||
where B: AsyncRead + Send + 'static
|
|
||||||
{
|
|
||||||
tokio_uring::spawn(async move {
|
|
||||||
let sz = create_write_pump(reader, &writer).await?;
|
|
||||||
let _ = writer.sync_data().await;
|
|
||||||
writer.close().await.map(move |_| sz)
|
|
||||||
}).map(|handle| handle.expect("Background writing task panic"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a wrapper over `output_stream` in which bytes written are compressed via `zstd`.
|
|
||||||
fn create_write_encoder<S>(output_stream: S) -> zstd_write::ZstdEncoder<S>
|
|
||||||
where S: AsyncWrite
|
|
||||||
{
|
|
||||||
use zstd_write::*;
|
|
||||||
if cfg!(feature="threads") {
|
|
||||||
ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
|
|
||||||
} else {
|
|
||||||
ZstdEncoder::with_quality(output_stream, Level::Precise(22))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a wrapper over `output_stream` in which bytes written to are **de**compressed via `zstd`.
|
|
||||||
fn create_write_decoder<S>(output_stream: S) -> zstd_write::ZstdDecoder<S>
|
|
||||||
where S: AsyncWrite
|
|
||||||
{
|
|
||||||
use zstd_write::*;
|
|
||||||
ZstdDecoder::new(output_stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a wrapper over `output_stream` in which bytes read are compressed via `zstd`.
|
|
||||||
fn create_read_encoder<S>(output_stream: S) -> ZstdEncoder<S>
|
|
||||||
where S: AsyncBufRead
|
|
||||||
{
|
|
||||||
if cfg!(feature="threads") {
|
|
||||||
ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
|
|
||||||
} else {
|
|
||||||
ZstdEncoder::with_quality(output_stream, Level::Precise(22))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a wrapper over `output_stream` in which bytes read from are **de**compressed via `zstd`.
|
|
||||||
fn create_read_decoder<S>(output_stream: S) -> ZstdDecoder<S>
|
|
||||||
where S: AsyncBufRead
|
|
||||||
{
|
|
||||||
ZstdDecoder::new(output_stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pipe_async_uring() -> std::io::Result<(fs::File, fs::File)>
|
|
||||||
{
|
|
||||||
use std::os::unix::io::*;
|
|
||||||
|
|
||||||
let (rx, tx) = os_pipe::pipe()?;
|
|
||||||
|
|
||||||
let tx = unsafe {
|
|
||||||
std::fs::File::from_raw_fd(tx.into_raw_fd())
|
|
||||||
};
|
|
||||||
let rx = unsafe {
|
|
||||||
std::fs::File::from_raw_fd(rx.into_raw_fd())
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((fs::File::from_std(rx), fs::File::from_std(tx)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a pipe *reader* that decompresses the bytes read from `input_file` on a backing task.
|
|
||||||
///
|
|
||||||
/// Must be called within a `tokio_uring` context.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// A tuple of:
|
|
||||||
/// - Backing task join-handle. It will complete when all bytes have been copied (__XXX__: and the returned read stream has been closed? Does it?)
|
|
||||||
/// - Read end of async pipe. When all data has been read, it should be closed and then the handle should be awaited to ensure the data is flushed.
|
|
||||||
fn pipe_to_decoder(input_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, ReadHalf<SimplexStream>)
|
|
||||||
{
|
|
||||||
let (rx, tx) = simplex(4096 << 1);
|
|
||||||
let tx = create_write_decoder(tx);
|
|
||||||
|
|
||||||
let pump = spawn_read_pump(input_file, tx);
|
|
||||||
|
|
||||||
(pump, rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a pipe *writer* that compresses the bytes written into `output_file` on a backing task.
|
|
||||||
///
|
|
||||||
/// Must be called within a `tokio_uring` context.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// A tuple of:
|
|
||||||
/// - Backing task join-handle. It will complete when all bytes have been copied and the returned write stream has been closed.
|
|
||||||
/// - Write end of async pipe. When all data has been written, it should be closed and then the handle should be awaited to ensure the data is flushed.
|
|
||||||
fn pipe_to_encoder(output_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, WriteHalf<SimplexStream>)
|
|
||||||
{
|
|
||||||
let (rx, tx) = simplex(4096 << 1);
|
|
||||||
let rx = create_read_encoder(tokio::io::BufReader::new(rx));
|
|
||||||
|
|
||||||
let pump = spawn_write_pump(rx, output_file);
|
|
||||||
|
|
||||||
(pump, tx)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write `data` to file `output` using `io_uring` (overwriting if `force` is `true`.)
|
|
||||||
///
|
|
||||||
/// # Synchonicity
|
|
||||||
/// This runs on a background thread, the encoding of the object bytes into the stream can be done on the current thread, the compression and writing to the file is done on a backing thread.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// A synchonous writer to write the data to, and a `JoinHandle` to the backing thread that completes to the number of bytes written to the file.
|
|
||||||
pub fn create_write_compressed(output_file: std::fs::File) -> std::io::Result<(os_pipe::PipeWriter, std::thread::JoinHandle<std::io::Result<usize>>)>
|
|
||||||
{
|
|
||||||
let (rx, tx) = os_pipe::pipe()?;
|
|
||||||
|
|
||||||
let b = std::thread::spawn(move || {
|
|
||||||
tokio_uring::start(async move {
|
|
||||||
// let output_file = fs::OpenOptions::new()
|
|
||||||
// .create_new(! force)
|
|
||||||
// .create(true)
|
|
||||||
// .truncate(force)
|
|
||||||
// .write(true)
|
|
||||||
// .open(output).await?;
|
|
||||||
let output_file = fs::File::from_std(output_file);
|
|
||||||
|
|
||||||
let (bg, tx) = pipe_to_encoder(output_file);
|
|
||||||
let rx = fs::File::from_std(unsafe {
|
|
||||||
use std::os::unix::io::*;
|
|
||||||
std::fs::File::from_raw_fd(rx.into_raw_fd())
|
|
||||||
});
|
|
||||||
|
|
||||||
let bgt = spawn_read_pump(rx, tx);
|
|
||||||
// Await the two spawned tasks together.
|
|
||||||
let (sz, szf) = futures::future::try_join(bgt, bg).await?;
|
|
||||||
|
|
||||||
if sz != szf {
|
|
||||||
//XXX: Should we expect these to be the same?
|
|
||||||
panic!("Invalid size transfer! {} bytes sent to backing, {} bytes sent to file.", sz, szf);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<_, std::io::Error>(szf)
|
|
||||||
})
|
|
||||||
});
|
|
||||||
Ok((tx, b))
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO: create_read_compressed()
|
|
||||||
}
|
|
Loading…
Reference in new issue