diff --git a/Cargo.toml b/Cargo.toml index 7794383..d668c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,4 +9,5 @@ edition = "2018" cryptohelpers = { version = "1.8.2", features = ["full", "async"] } futures = "0.3.17" tokio = { version = "1.11.0", features = ["full"] } +tokio-stream = "0.1.7" tokio-uring = "0.1.0" diff --git a/src/handle.rs b/src/handle.rs index 744bbe4..3c021bc 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -9,9 +9,10 @@ use tokio_uring::fs::{ }; use tokio::sync::{ mpsc, - oneshot, + oneshot, }; -use futures::Future; +use tokio_stream::wrappers::ReceiverStream; +use futures::prelude::*; async fn uring_read(file: &mut File, mut to: F) -> io::Result where F: FnMut(&[u8]) -> io::Result<()> @@ -36,17 +37,16 @@ where F: FnMut(&[u8]) -> io::Result<()> Ok(read) } -fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender)>) -> impl Future +fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl Stream { - let (comp_tx, comp_rx) = oneshot::channel(); + + let (r_tx, r_rx) = mpsc::channel(backpressure); std::thread::spawn(move || { tokio_uring::start(async move { - let (r_tx, mut r_rx) = mpsc::unbounded_channel::(); - while let Some((path, ret)) = recv.recv().await { - let sem = r_tx.clone(); + while let Some(path) = recv.recv().await { + let ret = r_tx.clone(); tokio_uring::spawn(async move { - let _sem = sem; let mut file = OpenOptions::new() .read(true) .open(&path).await.unwrap(); @@ -60,7 +60,7 @@ fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender { eprintln!("Proc {} bytes from {:?}", _n, path); - let _ = ret.send(hasher.into()); + let _ = ret.send((path, hasher.into())).await; }, Err(e) => { eprintln!("Proc for {:?} failed: {}", path, e); @@ -72,17 +72,11 @@ fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender + ReceiverStream::new(r_rx) } //TODO: Higher-level wrapper around `file_handler()`