Added graceful shutdown to background hashing task.

Fortune for lazy-rebuild's current commit: Curse − 凶
rust-version
Avril 3 years ago
parent 2440cdeebb
commit 2362a3d2d3

@ -9,6 +9,7 @@ use tokio_uring::fs::{
}; };
use tokio::sync::{ use tokio::sync::{
mpsc, mpsc,
oneshot,
Semaphore, Semaphore,
}; };
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
@ -87,19 +88,41 @@ impl From<()> for Options
/// Spawn a thread with a `io_uring` file hasher using these options. /// Spawn a thread with a `io_uring` file hasher using these options.
/// ///
/// When the `cancel` future completes, the operation shuts down gracefully. Otherwise it continues until all senders to this handle are dropped.
///
/// # Returns /// # Returns
/// The input stream and output stream `(tx, rx)`. /// The input stream and output stream `(tx, rx)`.
pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static) pub fn spawn_with_cancel(opt: impl Into<Options>, cancel: impl Future<Output = ()> + 'static + Send + Unpin) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{ {
let opt = opt.into(); let opt = opt.into();
let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
let rx = file_handler(rx, match opt.max_operations { let rx = file_handler(rx,cancel , match opt.max_operations {
Some(n) => n.into(), Some(n) => n.into(),
None => 0, None => 0,
}, opt.back_pressure.into()); }, opt.back_pressure.into());
(tx, rx) (tx, rx)
} }
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
#[derive(Debug)]
struct NeverFuture;
use std::task::Poll;
impl Future for NeverFuture
{
type Output = super::Infallible;
fn poll(self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
spawn_with_cancel(opt.into(), NeverFuture.map(|_| ()))
}
/// Raw handler for io_uring file hashing. /// Raw handler for io_uring file hashing.
/// ///
/// # Parameters /// # Parameters
@ -109,19 +132,29 @@ pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Ite
/// ///
/// # Returns /// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler(mut recv: mpsc::Receiver<PathBuf>, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static fn file_handler<C>(mut recv: mpsc::Receiver<PathBuf>, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Unpin + Send + Sync + 'static
where C: Future<Output = ()> + 'static + Unpin + Send
{ {
let (r_tx, r_rx) = mpsc::channel(backpressure); let (r_tx, r_rx) = mpsc::channel(backpressure);
let (h_tx, h_rx) = oneshot::channel::<super::Infallible>();
std::thread::spawn(move || { std::thread::spawn(move || {
tokio_uring::start(async move { tokio_uring::start(async move {
tokio::pin!(cancel);
// No need for Arc, this is single threaded. // No need for Arc, this is single threaded.
let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops))); let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops)));
let h_tx = std::rc::Rc::new(h_tx);
while let Some(path) = recv.recv().await { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks. while let Some(path) = tokio::select!{
n = recv.recv() => n,
_ = &mut cancel => None,
} { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks.
let ret = r_tx.clone(); let ret = r_tx.clone();
let sem = sem.clone(); let sem = sem.clone();
let h_tx = h_tx.clone();
tokio_uring::spawn(async move { tokio_uring::spawn(async move {
let _h_tx = h_tx;
let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await { let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await {
Some(Err(_)) => return, // Semaphore has been closed. Some(Err(_)) => return, // Semaphore has been closed.
Some(Ok(v)) => Some(v), Some(Ok(v)) => Some(v),
@ -166,12 +199,18 @@ fn file_handler(mut recv: mpsc::Receiver<PathBuf>, max_ops: usize, backpressure:
} }
// --- End of new inputs // --- End of new inputs
// Drop the master refcount of `h_tx`.
drop(h_tx);
// Drop sender that we're cloning from // Drop sender that we're cloning from
drop(r_tx); drop(r_tx);
// Drop the semaphore source refcount // Drop the semaphore source refcount
if let Some(sem) = sem { if let Some(sem) = sem {
sem.close(); sem.close();
} }
// When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error.
let _ = h_rx.await;
}); });
}); });
ReceiverStream::new(r_rx) ReceiverStream::new(r_rx)

Loading…
Cancel
Save