Error results now wait for the mpsc channel to have space instead of deferring themselves.

Fortune for lazy-rebuild's current commit: Small blessing − 小吉
rust-version
Avril 3 years ago
parent 45b55b85a8
commit 884b58d1f4
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1 +0,0 @@
avril@eientei.14071:1631818511

@ -43,7 +43,7 @@ where F: FnMut(&[u8]) -> io::Result<()>
/// # Parameters /// # Parameters
/// * `recv` - Takes the incoming file path to hash /// * `recv` - Takes the incoming file path to hash
/// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.) /// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.)
/// * `backpressure` - The maximum allowed number of **successful** results that are published to the output stream. A failed operation pushes into the stream in the background if it has to. /// * `backpressure` - The maximum backing size of the output stream. Operations sending results will wait for there to be space before returning them. If results are not taken from the stream, the operation will wait until there is space.
/// ///
/// # Returns /// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
@ -87,11 +87,15 @@ fn file_handler(mut recv: mpsc::Receiver<PathBuf>, max_ops: usize, backpressure:
}, },
Err(e) => { Err(e) => {
eprintln!("Proc for {:?} failed: {}", path, e); eprintln!("Proc for {:?} failed: {}", path, e);
// To prevent DOSing this task, we do not defer the writing of failed results like we used to. If the stream is full, we wait regardless of the result.
let _ = ret.send((path, Err(e))).await;
/*
match ret.try_send((path, Err(e))) { match ret.try_send((path, Err(e))) {
Ok(_) => (), Ok(_) => (),
Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); }, Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); },
_ => return, _ => return,
} }*/
} }
} }
} }

Loading…
Cancel
Save