From d5b7f870db552a98c71cb12108e21f30aea949ee Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 20 Sep 2021 14:32:04 +0100 Subject: [PATCH] Started Rust verison: raw io_uring async file hashing loop in `handle::file_handler()` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for lazy-rebuild's current commit: Small curse − 小凶 --- .gitignore | 2 + Cargo.toml | 12 ++++ Makefile => go/Makefile | 0 lazy-rebuild.go => go/lazy-rebuild.go | 0 src/handle.rs | 88 +++++++++++++++++++++++++++ src/main.rs | 9 +++ 6 files changed, 111 insertions(+) create mode 100644 Cargo.toml rename Makefile => go/Makefile (100%) rename lazy-rebuild.go => go/lazy-rebuild.go (100%) create mode 100644 src/handle.rs create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore index db111e8..81e3cae 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ test/ lazy-rebuild +Cargo.lock +target/ diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7794383 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "lazy-rebuild" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cryptohelpers = { version = "1.8.2", features = ["full", "async"] } +futures = "0.3.17" +tokio = { version = "1.11.0", features = ["full"] } +tokio-uring = "0.1.0" diff --git a/Makefile b/go/Makefile similarity index 100% rename from Makefile rename to go/Makefile diff --git a/lazy-rebuild.go b/go/lazy-rebuild.go similarity index 100% rename from lazy-rebuild.go rename to go/lazy-rebuild.go diff --git a/src/handle.rs b/src/handle.rs new file mode 100644 index 0000000..744bbe4 --- /dev/null +++ b/src/handle.rs @@ -0,0 +1,88 @@ +use super::*; +use std::io; +use cryptohelpers::sha256; +use cryptohelpers::sha2::{ + Digest, Sha256, +}; +use tokio_uring::fs::{ + File, OpenOptions, +}; +use tokio::sync::{ + mpsc, + oneshot, +}; +use futures::Future; + +async fn uring_read(file: &mut File, mut to: F) -> io::Result +where F: FnMut(&[u8]) -> io::Result<()> +{ + let mut full_buffer = vec![0u8; 4096]; // we need to allocate this so &buffer[0] is always the same. + + let mut read = 0usize; + loop { + let buffer = { + let (res, n_full_buffer) = file.read_at(full_buffer, read as u64).await; + + full_buffer = n_full_buffer; + &full_buffer[..(match res { + Ok(0) => break, + Ok(n) => n, + Err(e) => return Err(e), + })] + }; + to(buffer)?; + read += buffer.len(); + } + Ok(read) +} + +fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender)>) -> impl Future +{ + let (comp_tx, comp_rx) = oneshot::channel(); + std::thread::spawn(move || { + tokio_uring::start(async move { + let (r_tx, mut r_rx) = mpsc::unbounded_channel::(); + + while let Some((path, ret)) = recv.recv().await { + let sem = r_tx.clone(); + tokio_uring::spawn(async move { + let _sem = sem; + let mut file = OpenOptions::new() + .read(true) + .open(&path).await.unwrap(); + let mut hasher = Sha256::new(); + match uring_read(&mut file, |buffer| { + if ret.is_closed() { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation")); + } + hasher.update(buffer); + Ok(()) + }).await { + Ok(_n) => { + eprintln!("Proc {} bytes from {:?}", _n, path); + let _ = ret.send(hasher.into()); + }, + Err(e) => { + eprintln!("Proc for {:?} failed: {}", path, e); + } + } + // Close the file. + // We are in a unique task per file, so awaiting this here is fine. + let _ = file.close().await; + + }); + } + // Drop sender + drop(r_tx); + // Calling recv() will wait until all `_sem` 'permits' inside the tasks have been dropped before returning `None`. + debug_assert_eq!(r_rx.recv().await, None); + + }); + comp_tx.send(()).unwrap(); + }); + { + use futures::prelude::*; + comp_rx.map(|_| ()) + } +} +//TODO: Higher-level wrapper around `file_handler()` diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..9d47c4b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,9 @@ + +use std::convert::Infallible; +use std::path::PathBuf; + +mod handle; + +fn main() { + println!("Hello, world!"); +}