handle: Added `spawn`, `Options`.

Fortune for lazy-rebuild's current commit: Blessing − 吉
rust-version
Avril 3 years ago
parent 884b58d1f4
commit 2440cdeebb

@ -9,8 +9,9 @@ use tokio_uring::fs::{
}; };
use tokio::sync::{ use tokio::sync::{
mpsc, mpsc,
Semaphore Semaphore,
}; };
use std::num::NonZeroUsize;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use futures::prelude::*; use futures::prelude::*;
use futures::future::OptionFuture; use futures::future::OptionFuture;
@ -38,6 +39,67 @@ where F: FnMut(&[u8]) -> io::Result<()>
Ok(read) Ok(read)
} }
/// Options for spanwed file hasher
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Options
{
/// Maximum number of operations allowed to be running at a time.
/// Or `None` for unlimited.
///
/// The hasher uses a single thread.
pub max_operations: Option<NonZeroUsize>,
/// The maximum buffer size of the **output** stream.
pub back_pressure: NonZeroUsize,
/// The maximum buffer size of the **input** stream.
pub forward_pressure: NonZeroUsize,
}
impl Options
{
pub const DEFAULT: Self = Self::new();
pub const fn new() -> Self
{
Self {
max_operations: NonZeroUsize::new(32),
back_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
forward_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
}
}
}
impl Default for Options
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
impl From<()> for Options
{
#[inline] fn from(_: ()) -> Self
{
Self::new()
}
}
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
let opt = opt.into();
let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
let rx = file_handler(rx, match opt.max_operations {
Some(n) => n.into(),
None => 0,
}, opt.back_pressure.into());
(tx, rx)
}
/// Raw handler for io_uring file hashing. /// Raw handler for io_uring file hashing.
/// ///
/// # Parameters /// # Parameters
@ -47,7 +109,7 @@ where F: FnMut(&[u8]) -> io::Result<()>
/// ///
/// # Returns /// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler(mut recv: mpsc::Receiver<PathBuf>, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> fn file_handler(mut recv: mpsc::Receiver<PathBuf>, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static
{ {
let (r_tx, r_rx) = mpsc::channel(backpressure); let (r_tx, r_rx) = mpsc::channel(backpressure);

Loading…
Cancel
Save