diff --git a/src/handle.rs b/src/handle.rs index f77e309..5be0634 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -9,8 +9,9 @@ use tokio_uring::fs::{ }; use tokio::sync::{ mpsc, - Semaphore + Semaphore, }; +use std::num::NonZeroUsize; use tokio_stream::wrappers::ReceiverStream; use futures::prelude::*; use futures::future::OptionFuture; @@ -38,6 +39,67 @@ where F: FnMut(&[u8]) -> io::Result<()> Ok(read) } +/// Options for spanwed file hasher +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Options +{ + /// Maximum number of operations allowed to be running at a time. + /// Or `None` for unlimited. + /// + /// The hasher uses a single thread. + pub max_operations: Option, + /// The maximum buffer size of the **output** stream. + pub back_pressure: NonZeroUsize, + /// The maximum buffer size of the **input** stream. + pub forward_pressure: NonZeroUsize, +} + +impl Options +{ + pub const DEFAULT: Self = Self::new(); + pub const fn new() -> Self + { + Self { + max_operations: NonZeroUsize::new(32), + back_pressure: unsafe { NonZeroUsize::new_unchecked(64) }, + forward_pressure: unsafe { NonZeroUsize::new_unchecked(64) }, + } + } +} + +impl Default for Options +{ + #[inline] + fn default() -> Self + { + Self::new() + } +} + +impl From<()> for Options +{ + #[inline] fn from(_: ()) -> Self + { + Self::new() + } +} + + +/// Spawn a thread with a `io_uring` file hasher using these options. +/// +/// # Returns +/// The input stream and output stream `(tx, rx)`. +pub fn spawn(opt: impl Into) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +{ + let opt = opt.into(); + let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); + let rx = file_handler(rx, match opt.max_operations { + Some(n) => n.into(), + None => 0, + }, opt.back_pressure.into()); + (tx, rx) +} + /// Raw handler for io_uring file hashing. /// /// # Parameters @@ -47,7 +109,7 @@ where F: FnMut(&[u8]) -> io::Result<()> /// /// # Returns /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure -fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: usize) -> impl Stream)> +fn file_handler(mut recv: mpsc::Receiver, max_ops: usize, backpressure: usize) -> impl Stream)> + Send + Sync + 'static { let (r_tx, r_rx) = mpsc::channel(backpressure);