diff --git a/src/handle.rs b/src/handle.rs index 3c021bc..e00e7a8 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -37,7 +37,7 @@ where F: FnMut(&[u8]) -> io::Result<()> Ok(read) } -fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl Stream +fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl Stream)> { let (r_tx, r_rx) = mpsc::channel(backpressure); @@ -51,32 +51,38 @@ fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl .read(true) .open(&path).await.unwrap(); let mut hasher = Sha256::new(); - match uring_read(&mut file, |buffer| { + let ring_res = uring_read(&mut file, |buffer| { if ret.is_closed() { return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation")); } hasher.update(buffer); Ok(()) - }).await { - Ok(_n) => { - eprintln!("Proc {} bytes from {:?}", _n, path); - let _ = ret.send((path, hasher.into())).await; - }, - Err(e) => { - eprintln!("Proc for {:?} failed: {}", path, e); + }).await; + let _ = tokio::join![ + file.close(), // We are in a unique task per file, so awaiting this here concurrently with the returning async block is fine. + async move { + match ring_res { + Ok(_n) => { + eprintln!("Proc {} bytes from {:?}", _n, path); + let _ = ret.send((path, Ok(hasher.into()))).await; + }, + Err(e) => { + eprintln!("Proc for {:?} failed: {}", path, e); + match ret.try_send((path, Err(e))) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); }, + _ => return, + } + } + } } - } - // Close the file. - // We are in a unique task per file, so awaiting this here is fine. - let _ = file.close().await; - + ]; }); } // Drop sender that we're cloning from drop(r_tx); }); }); - //TODO: Change stream item to io::Result<(path, hash)> ReceiverStream::new(r_rx) } //TODO: Higher-level wrapper around `file_handler()`