diff --git a/src/handle.rs b/src/handle.rs index 5be0634..1c379f2 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -9,6 +9,7 @@ use tokio_uring::fs::{ }; use tokio::sync::{ mpsc, + oneshot, Semaphore, }; use std::num::NonZeroUsize; @@ -87,19 +88,41 @@ impl From<()> for Options /// Spawn a thread with a `io_uring` file hasher using these options. /// +/// When the `cancel` future completes, the operation shuts down gracefully. Otherwise it continues until all senders to this handle are dropped. +/// /// # Returns /// The input stream and output stream `(tx, rx)`. -pub fn spawn(opt: impl Into) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +pub fn spawn_with_cancel(opt: impl Into, cancel: impl Future + 'static + Send + Unpin) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) { let opt = opt.into(); let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); - let rx = file_handler(rx, match opt.max_operations { + let rx = file_handler(rx,cancel , match opt.max_operations { Some(n) => n.into(), None => 0, }, opt.back_pressure.into()); (tx, rx) } + +/// Spawn a thread with a `io_uring` file hasher using these options. +/// +/// # Returns +/// The input stream and output stream `(tx, rx)`. +pub fn spawn(opt: impl Into) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +{ + #[derive(Debug)] + struct NeverFuture; + use std::task::Poll; + impl Future for NeverFuture + { + type Output = super::Infallible; + fn poll(self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll { + Poll::Pending + } + } + spawn_with_cancel(opt.into(), NeverFuture.map(|_| ())) +} + /// Raw handler for io_uring file hashing. /// /// # Parameters @@ -109,19 +132,29 @@ pub fn spawn(opt: impl Into) -> (mpsc::Sender, impl Stream, max_ops: usize, backpressure: usize) -> impl Stream)> + Send + Sync + 'static +fn file_handler(mut recv: mpsc::Receiver, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream)> + Unpin + Send + Sync + 'static +where C: Future + 'static + Unpin + Send { let (r_tx, r_rx) = mpsc::channel(backpressure); + let (h_tx, h_rx) = oneshot::channel::(); std::thread::spawn(move || { + tokio_uring::start(async move { + tokio::pin!(cancel); + // No need for Arc, this is single threaded. let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops))); - - while let Some(path) = recv.recv().await { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks. + let h_tx = std::rc::Rc::new(h_tx); + while let Some(path) = tokio::select!{ + n = recv.recv() => n, + _ = &mut cancel => None, + } { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks. let ret = r_tx.clone(); let sem = sem.clone(); + let h_tx = h_tx.clone(); tokio_uring::spawn(async move { + let _h_tx = h_tx; let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await { Some(Err(_)) => return, // Semaphore has been closed. Some(Ok(v)) => Some(v), @@ -154,10 +187,10 @@ fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: let _ = ret.send((path, Err(e))).await; /* match ret.try_send((path, Err(e))) { - Ok(_) => (), - Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); }, - _ => return, - }*/ + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); }, + _ => return, + }*/ } } } @@ -165,6 +198,9 @@ fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: }); } // --- End of new inputs + + // Drop the master refcount of `h_tx`. + drop(h_tx); // Drop sender that we're cloning from drop(r_tx); @@ -172,6 +208,9 @@ fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: if let Some(sem) = sem { sem.close(); } + + // When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error. + let _ = h_rx.await; }); }); ReceiverStream::new(r_rx)