Rework `file_handle()` to return a stream of `(path, hash)` instead of taking a oneshot for return

Fortune for lazy-rebuild's current commit: Great curse − 大凶
rust-version
Avril 3 years ago
parent d5b7f870db
commit fce66309d1

@ -9,4 +9,5 @@ edition = "2018"
cryptohelpers = { version = "1.8.2", features = ["full", "async"] }
futures = "0.3.17"
tokio = { version = "1.11.0", features = ["full"] }
tokio-stream = "0.1.7"
tokio-uring = "0.1.0"

@ -9,9 +9,10 @@ use tokio_uring::fs::{
};
use tokio::sync::{
mpsc,
oneshot,
oneshot,
};
use futures::Future;
use tokio_stream::wrappers::ReceiverStream;
use futures::prelude::*;
async fn uring_read<F>(file: &mut File, mut to: F) -> io::Result<usize>
where F: FnMut(&[u8]) -> io::Result<()>
@ -36,17 +37,16 @@ where F: FnMut(&[u8]) -> io::Result<()>
Ok(read)
}
fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender<sha256::Sha256Hash>)>) -> impl Future<Output = ()>
fn file_handler(mut recv: mpsc::Receiver<PathBuf>, backpressure: usize) -> impl Stream<Item = (PathBuf, sha256::Sha256Hash)>
{
let (comp_tx, comp_rx) = oneshot::channel();
let (r_tx, r_rx) = mpsc::channel(backpressure);
std::thread::spawn(move || {
tokio_uring::start(async move {
let (r_tx, mut r_rx) = mpsc::unbounded_channel::<super::Infallible>();
while let Some((path, ret)) = recv.recv().await {
let sem = r_tx.clone();
while let Some(path) = recv.recv().await {
let ret = r_tx.clone();
tokio_uring::spawn(async move {
let _sem = sem;
let mut file = OpenOptions::new()
.read(true)
.open(&path).await.unwrap();
@ -60,7 +60,7 @@ fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender<sha256::Sha25
}).await {
Ok(_n) => {
eprintln!("Proc {} bytes from {:?}", _n, path);
let _ = ret.send(hasher.into());
let _ = ret.send((path, hasher.into())).await;
},
Err(e) => {
eprintln!("Proc for {:?} failed: {}", path, e);
@ -72,17 +72,11 @@ fn file_handler(mut recv: mpsc::Receiver<(PathBuf, oneshot::Sender<sha256::Sha25
});
}
// Drop sender
// Drop sender that we're cloning from
drop(r_tx);
// Calling recv() will wait until all `_sem` 'permits' inside the tasks have been dropped before returning `None`.
debug_assert_eq!(r_rx.recv().await, None);
});
comp_tx.send(()).unwrap();
});
{
use futures::prelude::*;
comp_rx.map(|_| ())
}
//TODO: Change stream item to io::Result<(path, hash)>
ReceiverStream::new(r_rx)
}
//TODO: Higher-level wrapper around `file_handler()`

Loading…
Cancel
Save