From 45b55b85a86b483bb3a6e90ea4f2c9d5626a1330 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 20 Sep 2021 18:11:13 +0100 Subject: [PATCH] Added semaphore to control max concurrent operations. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for lazy-rebuild's current commit: Small curse − 小凶 --- src/.# *lsp-ui-doc-23068998* | 1 + src/handle.rs | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 4 deletions(-) create mode 120000 src/.# *lsp-ui-doc-23068998* diff --git a/src/.# *lsp-ui-doc-23068998* b/src/.# *lsp-ui-doc-23068998* new file mode 120000 index 0000000..7e0cfda --- /dev/null +++ b/src/.# *lsp-ui-doc-23068998* @@ -0,0 +1 @@ +avril@eientei.14071:1631818511 \ No newline at end of file diff --git a/src/handle.rs b/src/handle.rs index e00e7a8..4ad01a1 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -9,10 +9,11 @@ use tokio_uring::fs::{ }; use tokio::sync::{ mpsc, - oneshot, + Semaphore }; use tokio_stream::wrappers::ReceiverStream; use futures::prelude::*; +use futures::future::OptionFuture; async fn uring_read(file: &mut File, mut to: F) -> io::Result where F: FnMut(&[u8]) -> io::Result<()> @@ -37,16 +38,34 @@ where F: FnMut(&[u8]) -> io::Result<()> Ok(read) } -fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl Stream)> +/// Raw handler for io_uring file hashing. +/// +/// # Parameters +/// * `recv` - Takes the incoming file path to hash +/// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.) +/// * `backpressure` - The maximum allowed number of **successful** results that are published to the output stream. A failed operation pushes into the stream in the background if it has to. +/// +/// # Returns +/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure +fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: usize) -> impl Stream)> { let (r_tx, r_rx) = mpsc::channel(backpressure); std::thread::spawn(move || { tokio_uring::start(async move { - - while let Some(path) = recv.recv().await { + // No need for Arc, this is single threaded. + let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops))); + + while let Some(path) = recv.recv().await { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks. let ret = r_tx.clone(); + let sem = sem.clone(); tokio_uring::spawn(async move { + let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await { + Some(Err(_)) => return, // Semaphore has been closed. + Some(Ok(v)) => Some(v), + None => None, + }; + let mut file = OpenOptions::new() .read(true) .open(&path).await.unwrap(); @@ -79,8 +98,14 @@ fn file_handler(mut recv: mpsc::Receiver, backpressure: usize) -> impl ]; }); } + // --- End of new inputs + // Drop sender that we're cloning from drop(r_tx); + // Drop the semaphore source refcount + if let Some(sem) = sem { + sem.close(); + } }); }); ReceiverStream::new(r_rx)