|
|
@ -37,7 +37,7 @@ where F: FnMut(&[u8]) -> io::Result<()>
|
|
|
|
Ok(read)
|
|
|
|
Ok(read)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn file_handler(mut recv: mpsc::Receiver<PathBuf>, backpressure: usize) -> impl Stream<Item = (PathBuf, sha256::Sha256Hash)>
|
|
|
|
fn file_handler(mut recv: mpsc::Receiver<PathBuf>, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)>
|
|
|
|
{
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
let (r_tx, r_rx) = mpsc::channel(backpressure);
|
|
|
|
let (r_tx, r_rx) = mpsc::channel(backpressure);
|
|
|
@ -51,32 +51,38 @@ fn file_handler(mut recv: mpsc::Receiver<PathBuf>, backpressure: usize) -> impl
|
|
|
|
.read(true)
|
|
|
|
.read(true)
|
|
|
|
.open(&path).await.unwrap();
|
|
|
|
.open(&path).await.unwrap();
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
match uring_read(&mut file, |buffer| {
|
|
|
|
let ring_res = uring_read(&mut file, |buffer| {
|
|
|
|
if ret.is_closed() {
|
|
|
|
if ret.is_closed() {
|
|
|
|
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation"));
|
|
|
|
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation"));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
hasher.update(buffer);
|
|
|
|
hasher.update(buffer);
|
|
|
|
Ok(())
|
|
|
|
Ok(())
|
|
|
|
}).await {
|
|
|
|
}).await;
|
|
|
|
|
|
|
|
let _ = tokio::join![
|
|
|
|
|
|
|
|
file.close(), // We are in a unique task per file, so awaiting this here concurrently with the returning async block is fine.
|
|
|
|
|
|
|
|
async move {
|
|
|
|
|
|
|
|
match ring_res {
|
|
|
|
Ok(_n) => {
|
|
|
|
Ok(_n) => {
|
|
|
|
eprintln!("Proc {} bytes from {:?}", _n, path);
|
|
|
|
eprintln!("Proc {} bytes from {:?}", _n, path);
|
|
|
|
let _ = ret.send((path, hasher.into())).await;
|
|
|
|
let _ = ret.send((path, Ok(hasher.into()))).await;
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Err(e) => {
|
|
|
|
Err(e) => {
|
|
|
|
eprintln!("Proc for {:?} failed: {}", path, e);
|
|
|
|
eprintln!("Proc for {:?} failed: {}", path, e);
|
|
|
|
|
|
|
|
match ret.try_send((path, Err(e))) {
|
|
|
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
|
|
|
Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); },
|
|
|
|
|
|
|
|
_ => return,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Close the file.
|
|
|
|
}
|
|
|
|
// We are in a unique task per file, so awaiting this here is fine.
|
|
|
|
}
|
|
|
|
let _ = file.close().await;
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Drop sender that we're cloning from
|
|
|
|
// Drop sender that we're cloning from
|
|
|
|
drop(r_tx);
|
|
|
|
drop(r_tx);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
//TODO: Change stream item to io::Result<(path, hash)>
|
|
|
|
|
|
|
|
ReceiverStream::new(r_rx)
|
|
|
|
ReceiverStream::new(r_rx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//TODO: Higher-level wrapper around `file_handler()`
|
|
|
|
//TODO: Higher-level wrapper around `file_handler()`
|
|
|
|