diff --git a/Cargo.lock b/Cargo.lock index 853bee2..0bbbac8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "anstream" version = "0.6.18" @@ -38,7 +53,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" dependencies = [ - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -49,7 +64,21 @@ checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" dependencies = [ "anstyle", "once_cell", - "windows-sys", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-compression" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", ] [[package]] @@ -58,6 +87,27 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bytes" version = "1.10.0" @@ -83,6 +133,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "clap" version = "4.5.29" @@ -147,6 +203,83 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getopts" version = "0.2.21" @@ -162,11 +295,17 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", - "wasi", + "wasi 0.9.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + [[package]] name = "half" version = "1.8.3" @@ -201,6 +340,16 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -227,9 +376,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.79" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "linked-hash-map" @@ -239,14 +388,19 @@ checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" [[package]] name = "markov" -version = "0.2.1" +version = "0.2.1+1" dependencies = [ + "async-compression", "bytes", "clap", + "futures", "markov 1.1.0", "num_cpus", + "os_pipe", "serde", "serde_cbor", + "tokio", + "tokio-uring", "zstd", ] @@ -265,6 +419,32 @@ dependencies = [ "serde_yaml", ] +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3b1c9bd4fe1f0f8b387f6eb9eb3b4a1aa26185e5750efb9140301703f62cd1b" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.52.0", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -275,12 +455,31 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +[[package]] +name = "os_pipe" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "petgraph" version = "0.5.1" @@ -291,6 +490,18 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.31" @@ -362,6 +573,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "serde" version = "1.0.217" @@ -404,6 +621,35 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "strsim" version = "0.11.1" @@ -421,6 +667,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "pin-project-lite", + "socket2 0.5.8", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-uring" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" +dependencies = [ + "bytes", + "futures-util", + "io-uring", + "libc", + "slab", + "socket2 0.4.10", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.16" @@ -445,6 +721,43 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index ab11ee7..6de0a65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "markov" -version = "0.2.1" +version = "0.2.1+1" description = "Generate string of text from Markov chain fed by stdin or file(s)" authors = ["Avril "] edition = "2018" @@ -20,15 +20,21 @@ lto = "fat" strip = false [features] -default = ["threads"] +default = ["threads", "io_uring"] threads = ["zstd/zstdmt", "dep:num_cpus"] +io_uring = ["dep:tokio-uring", "dep:async-compression", "dep:futures", "dep:tokio"] [dependencies] +async-compression = { version = "0.4.18", features = ["tokio", "zstd", "zstdmt"], optional = true } bytes = { version = "1.10.0", features = ["serde"] } chain = {package = "markov", version = "1.1.0" } clap = { version = "4.5.29", features = ["derive"] } +futures = { version = "0.3.31", default-features = false, optional = true, features = ["alloc", "async-await", "std"] } num_cpus = { version = "1.16.0", optional = true } +os_pipe = "1.2.1" serde = { version = "1.0.217", features = ["derive"] } serde_cbor = { version = "0.11.2", features = ["alloc"] } +tokio = { version = "1.43.0", features = ["io-util"], optional = true } +tokio-uring = { version = "0.5.0", optional = true, features = ["bytes"] } zstd = { version = "0.13.2", features = [] } diff --git a/src/format.rs b/src/format.rs index df6610d..3bc1906 100644 --- a/src/format.rs +++ b/src/format.rs @@ -376,4 +376,290 @@ where S: io::Write + ?Sized stream.flush() } -//TODO: Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file. +#[cfg(feature="io_uring")] +mod io_uring { + use super::*; + use std::{ + path::Path, + }; + use futures::{ + prelude::*, + io::{ + //AsyncRead, + //AsyncBufRead, + //AsyncWrite, + + AllowStdIo, + }, + }; + use tokio::{ + io::{ + AsyncRead, + AsyncBufRead, + AsyncWrite, + + ReadHalf, WriteHalf, + SimplexStream, + simplex, + + AsyncReadExt, + AsyncWriteExt, + }, + + }; + use tokio_uring::{ + fs, + buf, + }; + use async_compression::{ + Level, + zstd::CParameter, + + tokio::{ + bufread::{ + ZstdDecoder, + ZstdEncoder, + }, + write as zstd_write, + }, + }; + // Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file. (Maybe default-feature-gate this?) + + /// Creates an future that, when executed, reads all data from `reader` via `io_uring`, into generic byte sink `writer`. + /// + /// # Task dispatch + /// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context. + /// + /// # Returns + /// The future polls to return an `io::Result` of the number of bytes read from `reader` and written to `writer`. + #[inline] + #[must_use] + fn create_read_pump<'f, B>(reader: &'f fs::File, writer: B) -> impl Future> + use<'f, B> + where B: AsyncWrite + { + async move { + let mut pos = 0; + let mut buf = Vec::with_capacity(4096); + + futures::pin_mut!(writer); + + loop { + let (bytes, nbuf) = reader.read_at(buf, pos as u64).await; + match bytes? { + 0 => break, + bytes => { + // Push `&nbuf[..bytes]` to pump. + writer.write_all(&nbuf[..bytes]).await?; + + pos += bytes; + buf = nbuf; + }, + } + } + Ok::<_, std::io::Error>(pos) + }//).map(|_| ()) + } + + + /// Creates an future that, when executed, reads all data from generic byte source `reader` and writes them into `writer` through `io_uring`. + /// + /// # Task dispatch + /// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context. + /// + /// # Returns + /// The future polls to return an `io::Result` of the number of bytes read from `reader` and written to `writer`. + fn create_write_pump<'f, B>(reader: B, writer: &'f fs::File) -> impl Future> + use<'f, B> + where B: AsyncRead + { + async move { + let mut pos =0; + let mut buf = vec![0u8; 4096]; + + futures::pin_mut!(reader); + + loop { + let sz = reader.read(&mut buf[..]).await?; + if sz == 0 { + break; + } + + buf.truncate(sz); + + let (res, nbuf) = writer.write_all_at(buf, pos as u64).await; + res?; + pos += sz; + buf = nbuf; + } + + Ok::(pos) + } + } + + /// Spawns a task that reads all bytes from `reader` via `io_uring` and writes them into generic byte sink `writer`. + /// Returns a handle to the async task that completes when the task has completed. + /// + /// # Runtime + /// The file is closed after the operation completes, ownership is transfered into the background task. + /// + /// __NOTE__: **Must** be called from within an active `tokio_uring` runtime. + fn spawn_read_pump(reader: fs::File, writer: B) -> impl Future> + Unpin + 'static + where B: AsyncWrite + Send + 'static + { + tokio_uring::spawn(async move { + let sz = create_read_pump(&reader, writer).await?; + let _ = reader.sync_data().await; + reader.close().await.map(move |_| sz) + }).map(|handle| handle.expect("Background reading task panic")) + } + + /// Spawns a task that reads all bytes from generic byte source `reader` and writes them into `writer` via `io_uring`. + /// Returns a handle to the async task that completes when the task has completed. + /// + /// # Runtime + /// The file is closed after the operation completes, ownership is transfered into the background task. + /// + /// __NOTE__: **Must** be called from within an active `tokio_uring` runtime. + fn spawn_write_pump(reader: B, writer: fs::File) -> impl Future> + Unpin + 'static + where B: AsyncRead + Send + 'static + { + tokio_uring::spawn(async move { + let sz = create_write_pump(reader, &writer).await?; + let _ = writer.sync_data().await; + writer.close().await.map(move |_| sz) + }).map(|handle| handle.expect("Background writing task panic")) + } + + /// Create a wrapper over `output_stream` in which bytes written are compressed via `zstd`. + fn create_write_encoder(output_stream: S) -> zstd_write::ZstdEncoder + where S: AsyncWrite + { + use zstd_write::*; + if cfg!(feature="threads") { + ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)]) + } else { + ZstdEncoder::with_quality(output_stream, Level::Precise(22)) + } + } + + /// Creates a wrapper over `output_stream` in which bytes written to are **de**compressed via `zstd`. + fn create_write_decoder(output_stream: S) -> zstd_write::ZstdDecoder + where S: AsyncWrite + { + use zstd_write::*; + ZstdDecoder::new(output_stream) + } + + /// Create a wrapper over `output_stream` in which bytes read are compressed via `zstd`. + fn create_read_encoder(output_stream: S) -> ZstdEncoder + where S: AsyncBufRead + { + if cfg!(feature="threads") { + ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)]) + } else { + ZstdEncoder::with_quality(output_stream, Level::Precise(22)) + } + } + + /// Creates a wrapper over `output_stream` in which bytes read from are **de**compressed via `zstd`. + fn create_read_decoder(output_stream: S) -> ZstdDecoder + where S: AsyncBufRead + { + ZstdDecoder::new(output_stream) + } + + pub fn pipe_async_uring() -> std::io::Result<(fs::File, fs::File)> + { + use std::os::unix::io::*; + + let (rx, tx) = os_pipe::pipe()?; + + let tx = unsafe { + std::fs::File::from_raw_fd(tx.into_raw_fd()) + }; + let rx = unsafe { + std::fs::File::from_raw_fd(rx.into_raw_fd()) + }; + + Ok((fs::File::from_std(rx), fs::File::from_std(tx))) + } + + /// Create a pipe *reader* that decompresses the bytes read from `input_file` on a backing task. + /// + /// Must be called within a `tokio_uring` context. + /// + /// # Returns + /// A tuple of: + /// - Backing task join-handle. It will complete when all bytes have been copied (__XXX__: and the returned read stream has been closed? Does it?) + /// - Read end of async pipe. When all data has been read, it should be closed and then the handle should be awaited to ensure the data is flushed. + fn pipe_to_decoder(input_file: fs::File) -> (impl Future> + Unpin + Send + 'static, ReadHalf) + { + let (rx, tx) = simplex(4096 << 1); + let tx = create_write_decoder(tx); + + let pump = spawn_read_pump(input_file, tx); + + (pump, rx) + } + + /// Create a pipe *writer* that compresses the bytes written into `output_file` on a backing task. + /// + /// Must be called within a `tokio_uring` context. + /// + /// # Returns + /// A tuple of: + /// - Backing task join-handle. It will complete when all bytes have been copied and the returned write stream has been closed. + /// - Write end of async pipe. When all data has been written, it should be closed and then the handle should be awaited to ensure the data is flushed. + fn pipe_to_encoder(output_file: fs::File) -> (impl Future> + Unpin + Send + 'static, WriteHalf) + { + let (rx, tx) = simplex(4096 << 1); + let rx = create_read_encoder(tokio::io::BufReader::new(rx)); + + let pump = spawn_write_pump(rx, output_file); + + (pump, tx) + } + + /// Write `data` to file `output` using `io_uring` (overwriting if `force` is `true`.) + /// + /// # Synchonicity + /// This runs on a background thread, the encoding of the object bytes into the stream can be done on the current thread, the compression and writing to the file is done on a backing thread. + /// + /// # Returns + /// A synchonous writer to write the data to, and a `JoinHandle` to the backing thread that completes to the number of bytes written to the file. + pub fn create_write_compressed(output_file: std::fs::File) -> std::io::Result<(os_pipe::PipeWriter, std::thread::JoinHandle>)> + { + let (rx, tx) = os_pipe::pipe()?; + + let b = std::thread::spawn(move || { + tokio_uring::start(async move { + // let output_file = fs::OpenOptions::new() + // .create_new(! force) + // .create(true) + // .truncate(force) + // .write(true) + // .open(output).await?; + let output_file = fs::File::from_std(output_file); + + let (bg, tx) = pipe_to_encoder(output_file); + let rx = fs::File::from_std(unsafe { + use std::os::unix::io::*; + std::fs::File::from_raw_fd(rx.into_raw_fd()) + }); + + let bgt = spawn_read_pump(rx, tx); + // Await the two spawned tasks together. + let (sz, szf) = futures::future::try_join(bgt, bg).await?; + + if sz != szf { + //XXX: Should we expect these to be the same? + panic!("Invalid size transfer! {} bytes sent to backing, {} bytes sent to file.", sz, szf); + } + + Ok::<_, std::io::Error>(szf) + }) + }); + Ok((tx, b)) + } + + //TODO: create_read_compressed() +}