format: Added `io_uring::create_write_compressed()`: Spawns a background thread that compresses the written data to a file using the `io_uring` subsystem. Compression & IO operations are performed asynchonously.

Fortune for genmarkov's current commit: Future small blessing − 末小吉
io-uring-async-support
Avril 2 months ago
parent 066811444a
commit 0a7d530068
Signed by: flanchan
GPG Key ID: 284488987C31F630

327
Cargo.lock generated

@ -2,6 +2,21 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "addr2line"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "anstream"
version = "0.6.18"
@ -38,7 +53,7 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys",
"windows-sys 0.59.0",
]
[[package]]
@ -49,7 +64,21 @@ checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys",
"windows-sys 0.59.0",
]
[[package]]
name = "async-compression"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
dependencies = [
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"zstd",
"zstd-safe",
]
[[package]]
@ -58,6 +87,27 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if 1.0.0",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytes"
version = "1.10.0"
@ -83,6 +133,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.5.29"
@ -147,6 +203,83 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getopts"
version = "0.2.21"
@ -162,11 +295,17 @@ version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"wasi",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "half"
version = "1.8.3"
@ -201,6 +340,16 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "io-uring"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
@ -227,9 +376,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.79"
version = "0.2.169"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
[[package]]
name = "linked-hash-map"
@ -239,14 +388,19 @@ checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
[[package]]
name = "markov"
version = "0.2.1"
version = "0.2.1+1"
dependencies = [
"async-compression",
"bytes",
"clap",
"futures",
"markov 1.1.0",
"num_cpus",
"os_pipe",
"serde",
"serde_cbor",
"tokio",
"tokio-uring",
"zstd",
]
@ -265,6 +419,32 @@ dependencies = [
"serde_yaml",
]
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3b1c9bd4fe1f0f8b387f6eb9eb3b4a1aa26185e5750efb9140301703f62cd1b"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
@ -275,12 +455,31 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e"
[[package]]
name = "os_pipe"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "petgraph"
version = "0.5.1"
@ -291,6 +490,18 @@ dependencies = [
"indexmap",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.31"
@ -362,6 +573,12 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "serde"
version = "1.0.217"
@ -404,6 +621,35 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "strsim"
version = "0.11.1"
@ -421,6 +667,36 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.43.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"pin-project-lite",
"socket2 0.5.8",
"windows-sys 0.52.0",
]
[[package]]
name = "tokio-uring"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967"
dependencies = [
"bytes",
"futures-util",
"io-uring",
"libc",
"slab",
"socket2 0.4.10",
"tokio",
]
[[package]]
name = "unicode-ident"
version = "1.0.16"
@ -445,6 +721,43 @@ version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.59.0"

@ -1,6 +1,6 @@
[package]
name = "markov"
version = "0.2.1"
version = "0.2.1+1"
description = "Generate string of text from Markov chain fed by stdin or file(s)"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -20,15 +20,21 @@ lto = "fat"
strip = false
[features]
default = ["threads"]
default = ["threads", "io_uring"]
threads = ["zstd/zstdmt", "dep:num_cpus"]
io_uring = ["dep:tokio-uring", "dep:async-compression", "dep:futures", "dep:tokio"]
[dependencies]
async-compression = { version = "0.4.18", features = ["tokio", "zstd", "zstdmt"], optional = true }
bytes = { version = "1.10.0", features = ["serde"] }
chain = {package = "markov", version = "1.1.0" }
clap = { version = "4.5.29", features = ["derive"] }
futures = { version = "0.3.31", default-features = false, optional = true, features = ["alloc", "async-await", "std"] }
num_cpus = { version = "1.16.0", optional = true }
os_pipe = "1.2.1"
serde = { version = "1.0.217", features = ["derive"] }
serde_cbor = { version = "0.11.2", features = ["alloc"] }
tokio = { version = "1.43.0", features = ["io-util"], optional = true }
tokio-uring = { version = "0.5.0", optional = true, features = ["bytes"] }
zstd = { version = "0.13.2", features = [] }

@ -376,4 +376,290 @@ where S: io::Write + ?Sized
stream.flush()
}
//TODO: Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file.
#[cfg(feature="io_uring")]
mod io_uring {
use super::*;
use std::{
path::Path,
};
use futures::{
prelude::*,
io::{
//AsyncRead,
//AsyncBufRead,
//AsyncWrite,
AllowStdIo,
},
};
use tokio::{
io::{
AsyncRead,
AsyncBufRead,
AsyncWrite,
ReadHalf, WriteHalf,
SimplexStream,
simplex,
AsyncReadExt,
AsyncWriteExt,
},
};
use tokio_uring::{
fs,
buf,
};
use async_compression::{
Level,
zstd::CParameter,
tokio::{
bufread::{
ZstdDecoder,
ZstdEncoder,
},
write as zstd_write,
},
};
// Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file. (Maybe default-feature-gate this?)
/// Creates an future that, when executed, reads all data from `reader` via `io_uring`, into generic byte sink `writer`.
///
/// # Task dispatch
/// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
///
/// # Returns
/// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
#[inline]
#[must_use]
fn create_read_pump<'f, B>(reader: &'f fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
where B: AsyncWrite
{
async move {
let mut pos = 0;
let mut buf = Vec::with_capacity(4096);
futures::pin_mut!(writer);
loop {
let (bytes, nbuf) = reader.read_at(buf, pos as u64).await;
match bytes? {
0 => break,
bytes => {
// Push `&nbuf[..bytes]` to pump.
writer.write_all(&nbuf[..bytes]).await?;
pos += bytes;
buf = nbuf;
},
}
}
Ok::<_, std::io::Error>(pos)
}//).map(|_| ())
}
/// Creates an future that, when executed, reads all data from generic byte source `reader` and writes them into `writer` through `io_uring`.
///
/// # Task dispatch
/// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
///
/// # Returns
/// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
fn create_write_pump<'f, B>(reader: B, writer: &'f fs::File) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
where B: AsyncRead
{
async move {
let mut pos =0;
let mut buf = vec![0u8; 4096];
futures::pin_mut!(reader);
loop {
let sz = reader.read(&mut buf[..]).await?;
if sz == 0 {
break;
}
buf.truncate(sz);
let (res, nbuf) = writer.write_all_at(buf, pos as u64).await;
res?;
pos += sz;
buf = nbuf;
}
Ok::<usize, std::io::Error>(pos)
}
}
/// Spawns a task that reads all bytes from `reader` via `io_uring` and writes them into generic byte sink `writer`.
/// Returns a handle to the async task that completes when the task has completed.
///
/// # Runtime
/// The file is closed after the operation completes, ownership is transfered into the background task.
///
/// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
fn spawn_read_pump<B>(reader: fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
where B: AsyncWrite + Send + 'static
{
tokio_uring::spawn(async move {
let sz = create_read_pump(&reader, writer).await?;
let _ = reader.sync_data().await;
reader.close().await.map(move |_| sz)
}).map(|handle| handle.expect("Background reading task panic"))
}
/// Spawns a task that reads all bytes from generic byte source `reader` and writes them into `writer` via `io_uring`.
/// Returns a handle to the async task that completes when the task has completed.
///
/// # Runtime
/// The file is closed after the operation completes, ownership is transfered into the background task.
///
/// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
fn spawn_write_pump<B>(reader: B, writer: fs::File) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
where B: AsyncRead + Send + 'static
{
tokio_uring::spawn(async move {
let sz = create_write_pump(reader, &writer).await?;
let _ = writer.sync_data().await;
writer.close().await.map(move |_| sz)
}).map(|handle| handle.expect("Background writing task panic"))
}
/// Create a wrapper over `output_stream` in which bytes written are compressed via `zstd`.
fn create_write_encoder<S>(output_stream: S) -> zstd_write::ZstdEncoder<S>
where S: AsyncWrite
{
use zstd_write::*;
if cfg!(feature="threads") {
ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
} else {
ZstdEncoder::with_quality(output_stream, Level::Precise(22))
}
}
/// Creates a wrapper over `output_stream` in which bytes written to are **de**compressed via `zstd`.
fn create_write_decoder<S>(output_stream: S) -> zstd_write::ZstdDecoder<S>
where S: AsyncWrite
{
use zstd_write::*;
ZstdDecoder::new(output_stream)
}
/// Create a wrapper over `output_stream` in which bytes read are compressed via `zstd`.
fn create_read_encoder<S>(output_stream: S) -> ZstdEncoder<S>
where S: AsyncBufRead
{
if cfg!(feature="threads") {
ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
} else {
ZstdEncoder::with_quality(output_stream, Level::Precise(22))
}
}
/// Creates a wrapper over `output_stream` in which bytes read from are **de**compressed via `zstd`.
fn create_read_decoder<S>(output_stream: S) -> ZstdDecoder<S>
where S: AsyncBufRead
{
ZstdDecoder::new(output_stream)
}
pub fn pipe_async_uring() -> std::io::Result<(fs::File, fs::File)>
{
use std::os::unix::io::*;
let (rx, tx) = os_pipe::pipe()?;
let tx = unsafe {
std::fs::File::from_raw_fd(tx.into_raw_fd())
};
let rx = unsafe {
std::fs::File::from_raw_fd(rx.into_raw_fd())
};
Ok((fs::File::from_std(rx), fs::File::from_std(tx)))
}
/// Create a pipe *reader* that decompresses the bytes read from `input_file` on a backing task.
///
/// Must be called within a `tokio_uring` context.
///
/// # Returns
/// A tuple of:
/// - Backing task join-handle. It will complete when all bytes have been copied (__XXX__: and the returned read stream has been closed? Does it?)
/// - Read end of async pipe. When all data has been read, it should be closed and then the handle should be awaited to ensure the data is flushed.
fn pipe_to_decoder(input_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, ReadHalf<SimplexStream>)
{
let (rx, tx) = simplex(4096 << 1);
let tx = create_write_decoder(tx);
let pump = spawn_read_pump(input_file, tx);
(pump, rx)
}
/// Create a pipe *writer* that compresses the bytes written into `output_file` on a backing task.
///
/// Must be called within a `tokio_uring` context.
///
/// # Returns
/// A tuple of:
/// - Backing task join-handle. It will complete when all bytes have been copied and the returned write stream has been closed.
/// - Write end of async pipe. When all data has been written, it should be closed and then the handle should be awaited to ensure the data is flushed.
fn pipe_to_encoder(output_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, WriteHalf<SimplexStream>)
{
let (rx, tx) = simplex(4096 << 1);
let rx = create_read_encoder(tokio::io::BufReader::new(rx));
let pump = spawn_write_pump(rx, output_file);
(pump, tx)
}
/// Write `data` to file `output` using `io_uring` (overwriting if `force` is `true`.)
///
/// # Synchonicity
/// This runs on a background thread, the encoding of the object bytes into the stream can be done on the current thread, the compression and writing to the file is done on a backing thread.
///
/// # Returns
/// A synchonous writer to write the data to, and a `JoinHandle` to the backing thread that completes to the number of bytes written to the file.
pub fn create_write_compressed(output_file: std::fs::File) -> std::io::Result<(os_pipe::PipeWriter, std::thread::JoinHandle<std::io::Result<usize>>)>
{
let (rx, tx) = os_pipe::pipe()?;
let b = std::thread::spawn(move || {
tokio_uring::start(async move {
// let output_file = fs::OpenOptions::new()
// .create_new(! force)
// .create(true)
// .truncate(force)
// .write(true)
// .open(output).await?;
let output_file = fs::File::from_std(output_file);
let (bg, tx) = pipe_to_encoder(output_file);
let rx = fs::File::from_std(unsafe {
use std::os::unix::io::*;
std::fs::File::from_raw_fd(rx.into_raw_fd())
});
let bgt = spawn_read_pump(rx, tx);
// Await the two spawned tasks together.
let (sz, szf) = futures::future::try_join(bgt, bg).await?;
if sz != szf {
//XXX: Should we expect these to be the same?
panic!("Invalid size transfer! {} bytes sent to backing, {} bytes sent to file.", sz, szf);
}
Ok::<_, std::io::Error>(szf)
})
});
Ok((tx, b))
}
//TODO: create_read_compressed()
}

Loading…
Cancel
Save