From 0a7d530068bec2bcc971d64d0eb2c91755fef48d Mon Sep 17 00:00:00 2001
From: Avril <flanchan@cumallover.me>
Date: Thu, 13 Feb 2025 19:03:30 +0000
Subject: [PATCH] format: Added `io_uring::create_write_compressed()`: Spawns a
 background thread that compresses the written data to a file using the
 `io_uring` subsystem. Compression & IO operations are performed
 asynchonously.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Fortune for genmarkov's current commit: Future small blessing − 末小吉
---
 Cargo.lock    | 327 ++++++++++++++++++++++++++++++++++++++++++++++++--
 Cargo.toml    |  10 +-
 src/format.rs | 288 +++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 615 insertions(+), 10 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 853bee2..0bbbac8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2,6 +2,21 @@
 # It is not intended for manual editing.
 version = 4
 
+[[package]]
+name = "addr2line"
+version = "0.24.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
+dependencies = [
+ "gimli",
+]
+
+[[package]]
+name = "adler2"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
+
 [[package]]
 name = "anstream"
 version = "0.6.18"
@@ -38,7 +53,7 @@ version = "1.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
 dependencies = [
- "windows-sys",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -49,7 +64,21 @@ checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
 dependencies = [
  "anstyle",
  "once_cell",
- "windows-sys",
+ "windows-sys 0.59.0",
+]
+
+[[package]]
+name = "async-compression"
+version = "0.4.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
+dependencies = [
+ "futures-core",
+ "memchr",
+ "pin-project-lite",
+ "tokio",
+ "zstd",
+ "zstd-safe",
 ]
 
 [[package]]
@@ -58,6 +87,27 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
 
+[[package]]
+name = "backtrace"
+version = "0.3.74"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
+dependencies = [
+ "addr2line",
+ "cfg-if 1.0.0",
+ "libc",
+ "miniz_oxide",
+ "object",
+ "rustc-demangle",
+ "windows-targets",
+]
+
+[[package]]
+name = "bitflags"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
+
 [[package]]
 name = "bytes"
 version = "1.10.0"
@@ -83,6 +133,12 @@ version = "0.1.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
 
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
 [[package]]
 name = "clap"
 version = "4.5.29"
@@ -147,6 +203,83 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
 
+[[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
+
+[[package]]
+name = "futures-io"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+
+[[package]]
+name = "futures-task"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
+
+[[package]]
+name = "futures-util"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
 [[package]]
 name = "getopts"
 version = "0.2.21"
@@ -162,11 +295,17 @@ version = "0.1.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
 dependencies = [
- "cfg-if",
+ "cfg-if 0.1.10",
  "libc",
- "wasi",
+ "wasi 0.9.0+wasi-snapshot-preview1",
 ]
 
+[[package]]
+name = "gimli"
+version = "0.31.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
+
 [[package]]
 name = "half"
 version = "1.8.3"
@@ -201,6 +340,16 @@ dependencies = [
  "hashbrown",
 ]
 
+[[package]]
+name = "io-uring"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+dependencies = [
+ "bitflags",
+ "libc",
+]
+
 [[package]]
 name = "is_terminal_polyfill"
 version = "1.70.1"
@@ -227,9 +376,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.79"
+version = "0.2.169"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743"
+checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
 
 [[package]]
 name = "linked-hash-map"
@@ -239,14 +388,19 @@ checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
 
 [[package]]
 name = "markov"
-version = "0.2.1"
+version = "0.2.1+1"
 dependencies = [
+ "async-compression",
  "bytes",
  "clap",
+ "futures",
  "markov 1.1.0",
  "num_cpus",
+ "os_pipe",
  "serde",
  "serde_cbor",
+ "tokio",
+ "tokio-uring",
  "zstd",
 ]
 
@@ -265,6 +419,32 @@ dependencies = [
  "serde_yaml",
 ]
 
+[[package]]
+name = "memchr"
+version = "2.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
+
+[[package]]
+name = "miniz_oxide"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3b1c9bd4fe1f0f8b387f6eb9eb3b4a1aa26185e5750efb9140301703f62cd1b"
+dependencies = [
+ "adler2",
+]
+
+[[package]]
+name = "mio"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
+dependencies = [
+ "libc",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "num_cpus"
 version = "1.16.0"
@@ -275,12 +455,31 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "object"
+version = "0.36.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
+dependencies = [
+ "memchr",
+]
+
 [[package]]
 name = "once_cell"
 version = "1.20.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e"
 
+[[package]]
+name = "os_pipe"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982"
+dependencies = [
+ "libc",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "petgraph"
 version = "0.5.1"
@@ -291,6 +490,18 @@ dependencies = [
  "indexmap",
 ]
 
+[[package]]
+name = "pin-project-lite"
+version = "0.2.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
+
+[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
 [[package]]
 name = "pkg-config"
 version = "0.3.31"
@@ -362,6 +573,12 @@ dependencies = [
  "rand_core",
 ]
 
+[[package]]
+name = "rustc-demangle"
+version = "0.1.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
+
 [[package]]
 name = "serde"
 version = "1.0.217"
@@ -404,6 +621,35 @@ dependencies = [
  "yaml-rust",
 ]
 
+[[package]]
+name = "slab"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "socket2"
+version = "0.4.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
+[[package]]
+name = "socket2"
+version = "0.5.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "strsim"
 version = "0.11.1"
@@ -421,6 +667,36 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "tokio"
+version = "1.43.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
+dependencies = [
+ "backtrace",
+ "bytes",
+ "libc",
+ "mio",
+ "pin-project-lite",
+ "socket2 0.5.8",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "tokio-uring"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "io-uring",
+ "libc",
+ "slab",
+ "socket2 0.4.10",
+ "tokio",
+]
+
 [[package]]
 name = "unicode-ident"
 version = "1.0.16"
@@ -445,6 +721,43 @@ version = "0.9.0+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
 
+[[package]]
+name = "wasi"
+version = "0.11.0+wasi-snapshot-preview1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
+[[package]]
+name = "windows-sys"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
+dependencies = [
+ "windows-targets",
+]
+
 [[package]]
 name = "windows-sys"
 version = "0.59.0"
diff --git a/Cargo.toml b/Cargo.toml
index ab11ee7..6de0a65 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "markov"
-version = "0.2.1"
+version = "0.2.1+1"
 description = "Generate string of text from Markov chain fed by stdin or file(s)"
 authors = ["Avril <flanchan@cumallover.me>"]
 edition = "2018"
@@ -20,15 +20,21 @@ lto = "fat"
 strip = false
 
 [features]
-default = ["threads"]
+default = ["threads", "io_uring"]
 
 threads = ["zstd/zstdmt", "dep:num_cpus"]
+io_uring = ["dep:tokio-uring", "dep:async-compression", "dep:futures", "dep:tokio"]
 
 [dependencies]
+async-compression = { version = "0.4.18", features = ["tokio", "zstd", "zstdmt"], optional = true }
 bytes = { version = "1.10.0", features = ["serde"] }
 chain = {package = "markov", version = "1.1.0" }
 clap = { version = "4.5.29", features = ["derive"] }
+futures = { version = "0.3.31", default-features = false, optional = true, features = ["alloc", "async-await", "std"] }
 num_cpus = { version = "1.16.0", optional = true }
+os_pipe = "1.2.1"
 serde = { version = "1.0.217", features = ["derive"] }
 serde_cbor = { version = "0.11.2", features = ["alloc"] }
+tokio = { version = "1.43.0", features = ["io-util"], optional = true }
+tokio-uring = { version = "0.5.0", optional = true, features = ["bytes"] }
 zstd = { version = "0.13.2", features = [] }
diff --git a/src/format.rs b/src/format.rs
index df6610d..3bc1906 100644
--- a/src/format.rs
+++ b/src/format.rs
@@ -376,4 +376,290 @@ where S: io::Write + ?Sized
     stream.flush()
 }
 
-//TODO: Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file.
+#[cfg(feature="io_uring")] 
+mod io_uring {
+    use super::*;
+    use std::{
+	path::Path,
+    };
+    use futures::{
+	prelude::*,
+	io::{
+	    //AsyncRead,
+	    //AsyncBufRead,
+	    //AsyncWrite,
+
+	    AllowStdIo,
+	},
+    };
+    use tokio::{
+	io::{
+	    AsyncRead,
+	    AsyncBufRead,
+	    AsyncWrite,
+
+	    ReadHalf, WriteHalf,
+	    SimplexStream,
+	    simplex,
+
+	    AsyncReadExt,
+	    AsyncWriteExt,
+	},
+	
+    };
+    use tokio_uring::{
+	fs,
+	buf,
+    };
+    use async_compression::{
+	Level,
+	zstd::CParameter,
+
+	tokio::{
+	    bufread::{
+		ZstdDecoder,
+		ZstdEncoder,
+	    },
+	    write as zstd_write,
+	},
+    };
+    // Add `tokio_uring` version of `save_chain_to_file()`/`load_chain_from_file()` that spawns the `tokio_uring` runtime internally to queue reads/writes from/to a file. (Maybe default-feature-gate this?)
+
+    /// Creates an future that, when executed, reads all data from `reader` via `io_uring`, into generic byte sink `writer`.
+    ///
+    /// # Task dispatch
+    /// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
+    ///
+    /// # Returns
+    /// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
+    #[inline] 
+    #[must_use]
+    fn create_read_pump<'f, B>(reader: &'f fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
+    where B: AsyncWrite
+    {
+	async move {
+	    let mut pos = 0;
+	    let mut buf = Vec::with_capacity(4096);
+
+	    futures::pin_mut!(writer);
+
+	    loop {
+		let (bytes, nbuf) = reader.read_at(buf, pos as u64).await;
+		match bytes? {
+		    0 => break,
+		    bytes => {
+			// Push `&nbuf[..bytes]` to pump.
+			writer.write_all(&nbuf[..bytes]).await?;
+			
+			pos += bytes;
+			buf = nbuf;
+		    },
+		}
+	    }
+	    Ok::<_, std::io::Error>(pos)
+	}//).map(|_| ())
+    }
+
+    
+    /// Creates an future that, when executed, reads all data from generic byte source `reader` and writes them into `writer` through `io_uring`.
+    ///
+    /// # Task dispatch
+    /// This function *creates* the async function, it should be executed via `spawn()` *inside* a `tokio_uring` context.
+    ///
+    /// # Returns
+    /// The future polls to return an `io::Result<usize>` of the number of bytes read from `reader` and written to `writer`.
+    fn create_write_pump<'f, B>(reader: B, writer: &'f fs::File) -> impl Future<Output = std::io::Result<usize>> + use<'f, B>
+    where B: AsyncRead
+    {
+	async move {
+	    let mut pos =0;
+	    let mut buf = vec![0u8; 4096];
+
+	    futures::pin_mut!(reader);
+
+	    loop {
+		let sz = reader.read(&mut buf[..]).await?;
+		if sz == 0 {
+		    break;
+		}
+		
+		buf.truncate(sz);
+		
+		let (res, nbuf) = writer.write_all_at(buf, pos as u64).await;
+		res?;
+		pos += sz;
+		buf = nbuf;
+	    }
+
+	    Ok::<usize, std::io::Error>(pos)
+	}
+    }
+
+    /// Spawns a task that reads all bytes from `reader` via `io_uring` and writes them into generic byte sink `writer`.
+    /// Returns a handle to the async task that completes when the task has completed.
+    ///
+    /// # Runtime
+    /// The file is closed after the operation completes, ownership is transfered into the background task.
+    ///
+    /// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
+    fn spawn_read_pump<B>(reader: fs::File, writer: B) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
+    where B: AsyncWrite + Send + 'static
+    {
+	tokio_uring::spawn(async move {
+	    let sz = create_read_pump(&reader, writer).await?;
+	    let _ = reader.sync_data().await;
+	    reader.close().await.map(move |_| sz)
+	}).map(|handle| handle.expect("Background reading task panic"))
+    }
+
+    /// Spawns a task that reads all bytes from generic byte source `reader` and writes them into `writer` via `io_uring`.
+    /// Returns a handle to the async task that completes when the task has completed.
+    ///
+    /// # Runtime
+    /// The file is closed after the operation completes, ownership is transfered into the background task.
+    ///
+    /// __NOTE__: **Must** be called from within an active `tokio_uring` runtime.
+    fn spawn_write_pump<B>(reader: B, writer: fs::File) -> impl Future<Output = std::io::Result<usize>> + Unpin + 'static
+    where B: AsyncRead + Send + 'static
+    {
+	tokio_uring::spawn(async move {
+	    let sz = create_write_pump(reader, &writer).await?;
+	    let _ = writer.sync_data().await;
+	    writer.close().await.map(move |_| sz)
+	}).map(|handle| handle.expect("Background writing task panic"))
+    }
+
+    /// Create a wrapper over `output_stream` in which bytes written are compressed via `zstd`.
+    fn create_write_encoder<S>(output_stream: S) -> zstd_write::ZstdEncoder<S>
+    where S: AsyncWrite
+    {
+	use zstd_write::*;
+	if cfg!(feature="threads") {
+	    ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
+	} else {
+	    ZstdEncoder::with_quality(output_stream, Level::Precise(22))
+	}
+    }
+
+    /// Creates a wrapper over `output_stream` in which bytes written to are **de**compressed via `zstd`.
+    fn create_write_decoder<S>(output_stream: S) -> zstd_write::ZstdDecoder<S>
+    where S: AsyncWrite
+    {
+	use zstd_write::*;
+	ZstdDecoder::new(output_stream)
+    }
+    
+    /// Create a wrapper over `output_stream` in which bytes read are compressed via `zstd`.
+    fn create_read_encoder<S>(output_stream: S) -> ZstdEncoder<S>
+    where S: AsyncBufRead
+    {
+	if cfg!(feature="threads") {
+	    ZstdEncoder::with_quality_and_params(output_stream, Level::Precise(22), &[CParameter::nb_workers(num_cpus::get() as u32)])
+	} else {
+	    ZstdEncoder::with_quality(output_stream, Level::Precise(22))
+	}
+    }
+
+    /// Creates a wrapper over `output_stream` in which bytes read from are **de**compressed via `zstd`.
+    fn create_read_decoder<S>(output_stream: S) -> ZstdDecoder<S>
+    where S: AsyncBufRead
+    {
+	ZstdDecoder::new(output_stream)
+    }
+
+    pub fn pipe_async_uring() -> std::io::Result<(fs::File, fs::File)>
+    {
+	use std::os::unix::io::*;
+	
+	let (rx, tx) = os_pipe::pipe()?;
+	
+	let tx = unsafe {
+	    std::fs::File::from_raw_fd(tx.into_raw_fd())
+	};
+	let rx = unsafe {
+	    std::fs::File::from_raw_fd(rx.into_raw_fd())
+	};
+
+	Ok((fs::File::from_std(rx), fs::File::from_std(tx)))
+    }
+
+    /// Create a pipe *reader* that decompresses the bytes read from `input_file` on a backing task.
+    ///
+    /// Must be called within a `tokio_uring` context.
+    ///
+    /// # Returns
+    /// A tuple of:
+    /// - Backing task join-handle. It will complete when all bytes have been copied (__XXX__: and the returned read stream has been closed? Does it?)
+    /// - Read end of async pipe. When all data has been read, it should be closed and then the handle should be awaited to ensure the data is flushed.
+    fn pipe_to_decoder(input_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, ReadHalf<SimplexStream>)
+    {
+	let (rx, tx) = simplex(4096 << 1);
+	let tx = create_write_decoder(tx);
+	
+	let pump = spawn_read_pump(input_file, tx);
+
+	(pump, rx)
+    }
+
+    /// Create a pipe *writer* that compresses the bytes written into `output_file` on a backing task.
+    ///
+    /// Must be called within a `tokio_uring` context.
+    ///
+    /// # Returns
+    /// A tuple of:
+    /// - Backing task join-handle. It will complete when all bytes have been copied and the returned write stream has been closed.
+    /// - Write end of async pipe. When all data has been written, it should be closed and then the handle should be awaited to ensure the data is flushed.
+    fn pipe_to_encoder(output_file: fs::File) -> (impl Future<Output = std::io::Result<usize>> + Unpin + Send + 'static, WriteHalf<SimplexStream>)
+    {
+	let (rx, tx) = simplex(4096 << 1);
+	let rx = create_read_encoder(tokio::io::BufReader::new(rx));
+	
+	let pump = spawn_write_pump(rx, output_file);
+
+	(pump, tx)
+    }
+    
+    /// Write `data` to file `output` using `io_uring` (overwriting if `force` is `true`.)
+    /// 
+    /// # Synchonicity
+    /// This runs on a background thread, the encoding of the object bytes into the stream can be done on the current thread, the compression and writing to the file is done on a backing thread.
+    ///
+    /// # Returns
+    /// A synchonous writer to write the data to, and a `JoinHandle` to the backing thread that completes to the number of bytes written to the file.
+    pub fn create_write_compressed(output_file: std::fs::File) -> std::io::Result<(os_pipe::PipeWriter, std::thread::JoinHandle<std::io::Result<usize>>)>
+    {
+	let (rx, tx) = os_pipe::pipe()?;
+	
+	let b = std::thread::spawn(move || {
+	    tokio_uring::start(async move {
+		// let output_file = fs::OpenOptions::new()
+		//     .create_new(! force)
+		//     .create(true)
+		//     .truncate(force)
+		//     .write(true)
+		//     .open(output).await?;
+		let output_file = fs::File::from_std(output_file);
+
+		let (bg, tx) = pipe_to_encoder(output_file);
+		let rx = fs::File::from_std(unsafe {
+		    use std::os::unix::io::*;
+		    std::fs::File::from_raw_fd(rx.into_raw_fd())
+		});
+
+		let bgt = spawn_read_pump(rx, tx);
+		// Await the two spawned tasks together.
+		let (sz, szf) = futures::future::try_join(bgt, bg).await?;
+
+		if sz != szf {
+		    //XXX: Should we expect these to be the same?
+		    panic!("Invalid size transfer! {} bytes sent to backing, {} bytes sent to file.", sz, szf);
+		}
+
+		Ok::<_, std::io::Error>(szf)
+	    })
+	});
+	Ok((tx, b))
+    }
+
+    //TODO: create_read_compressed()
+}