From 884b58d1f404db068516ed96d98cb8286913d3c9 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 20 Sep 2021 18:14:19 +0100 Subject: [PATCH] Error results now wait for the mpsc channel to have space instead of deferring themselves. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for lazy-rebuild's current commit: Small blessing − 小吉 --- src/.# *lsp-ui-doc-23068998* | 1 - src/handle.rs | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) delete mode 120000 src/.# *lsp-ui-doc-23068998* diff --git a/src/.# *lsp-ui-doc-23068998* b/src/.# *lsp-ui-doc-23068998* deleted file mode 120000 index 7e0cfda..0000000 --- a/src/.# *lsp-ui-doc-23068998* +++ /dev/null @@ -1 +0,0 @@ -avril@eientei.14071:1631818511 \ No newline at end of file diff --git a/src/handle.rs b/src/handle.rs index 4ad01a1..f77e309 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -43,7 +43,7 @@ where F: FnMut(&[u8]) -> io::Result<()> /// # Parameters /// * `recv` - Takes the incoming file path to hash /// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.) -/// * `backpressure` - The maximum allowed number of **successful** results that are published to the output stream. A failed operation pushes into the stream in the background if it has to. +/// * `backpressure` - The maximum backing size of the output stream. Operations sending results will wait for there to be space before returning them. If results are not taken from the stream, the operation will wait until there is space. /// /// # Returns /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure @@ -87,11 +87,15 @@ fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: }, Err(e) => { eprintln!("Proc for {:?} failed: {}", path, e); + // To prevent DOSing this task, we do not defer the writing of failed results like we used to. If the stream is full, we wait regardless of the result. + + let _ = ret.send((path, Err(e))).await; + /* match ret.try_send((path, Err(e))) { Ok(_) => (), Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); }, _ => return, - } + }*/ } } }