Started Rust verison: raw io_uring async file hashing loop in `handle::file_handler()`

Fortune for lazy-rebuild's current commit: Small curse − 小凶
rust-version
Avril 3 years ago
parent 11fa5b6724
commit d5b7f870db
Signed by: flanchan
GPG Key ID: 284488987C31F630

2
.gitignore vendored

@ -1,2 +1,4 @@
test/
lazy-rebuild
Cargo.lock
target/

@ -0,0 +1,12 @@
[package]
name = "lazy-rebuild"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cryptohelpers = { version = "1.8.2", features = ["full", "async"] }
futures = "0.3.17"
tokio = { version = "1.11.0", features = ["full"] }
tokio-uring = "0.1.0"

@ -0,0 +1,88 @@
use super::*;
use std::io;
use cryptohelpers::sha256;
use cryptohelpers::sha2::{
Digest, Sha256,
};
use tokio_uring::fs::{
File, OpenOptions,
};
use tokio::sync::{
mpsc,
oneshot,
};
use futures::Future;
async fn uring_read<F>(file: &mut File, mut to: F) -> io::Result<usize>
where F: FnMut(&[u8]) -> io::Result<()>
{
let mut full_buffer = vec![0u8; 4096]; // we need to allocate this so &buffer[0] is always the same.
let mut read = 0usize;
loop {
let buffer = {
let (res, n_full_buffer) = file.read_at(full_buffer, read as u64).await;
full_buffer = n_full_buffer;
&full_buffer[..(match res {
Ok(0) => break,
Ok(n) => n,
Err(e) => return Err(e),
})]
};
to(buffer)?;
read += buffer.len();
}
Ok(read)
}
fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender<sha256::Sha256Hash>)>) -> impl Future<Output = ()>
{
let (comp_tx, comp_rx) = oneshot::channel();
std::thread::spawn(move || {
tokio_uring::start(async move {
let (r_tx, mut r_rx) = mpsc::unbounded_channel::<super::Infallible>();
while let Some((path, ret)) = recv.recv().await {
let sem = r_tx.clone();
tokio_uring::spawn(async move {
let _sem = sem;
let mut file = OpenOptions::new()
.read(true)
.open(&path).await.unwrap();
let mut hasher = Sha256::new();
match uring_read(&mut file, |buffer| {
if ret.is_closed() {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation"));
}
hasher.update(buffer);
Ok(())
}).await {
Ok(_n) => {
eprintln!("Proc {} bytes from {:?}", _n, path);
let _ = ret.send(hasher.into());
},
Err(e) => {
eprintln!("Proc for {:?} failed: {}", path, e);
}
}
// Close the file.
// We are in a unique task per file, so awaiting this here is fine.
let _ = file.close().await;
});
}
// Drop sender
drop(r_tx);
// Calling recv() will wait until all `_sem` 'permits' inside the tasks have been dropped before returning `None`.
debug_assert_eq!(r_rx.recv().await, None);
});
comp_tx.send(()).unwrap();
});
{
use futures::prelude::*;
comp_rx.map(|_| ())
}
}
//TODO: Higher-level wrapper around `file_handler()`

@ -0,0 +1,9 @@
use std::convert::Infallible;
use std::path::PathBuf;
mod handle;
fn main() {
println!("Hello, world!");
}
Loading…
Cancel
Save