You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
6.8 KiB

use super::*;
use std::io;
use cryptohelpers::sha256;
use cryptohelpers::sha2::{
Digest, Sha256,
};
use tokio_uring::fs::{
File, OpenOptions,
};
use tokio::sync::{
mpsc,
oneshot,
Semaphore,
};
use std::num::NonZeroUsize;
use tokio_stream::wrappers::ReceiverStream;
use futures::prelude::*;
use futures::future::OptionFuture;
async fn uring_read<F>(file: &mut File, mut to: F) -> io::Result<usize>
where F: FnMut(&[u8]) -> io::Result<()>
{
let mut full_buffer = vec![0u8; 4096]; // we need to allocate this so &buffer[0] is always the same.
let mut read = 0usize;
loop {
let buffer = {
let (res, n_full_buffer) = file.read_at(full_buffer, read as u64).await;
full_buffer = n_full_buffer;
&full_buffer[..(match res {
Ok(0) => break,
Ok(n) => n,
Err(e) => return Err(e),
})]
};
to(buffer)?;
read += buffer.len();
}
Ok(read)
}
/// Options for spanwed file hasher
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Options
{
/// Maximum number of operations allowed to be running at a time.
/// Or `None` for unlimited.
///
/// The hasher uses a single thread.
pub max_operations: Option<NonZeroUsize>,
/// The maximum buffer size of the **output** stream.
pub back_pressure: NonZeroUsize,
/// The maximum buffer size of the **input** stream.
pub forward_pressure: NonZeroUsize,
}
impl Options
{
pub const DEFAULT: Self = Self::new();
pub const fn new() -> Self
{
Self {
max_operations: NonZeroUsize::new(32),
back_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
forward_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
}
}
}
impl Default for Options
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
impl From<()> for Options
{
#[inline] fn from(_: ()) -> Self
{
Self::new()
}
}
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// When the `cancel` future completes, the operation shuts down gracefully. Otherwise it continues until all senders to this handle are dropped.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn_with_cancel(opt: Options, cancel: impl Future<Output = ()> + 'static + Send) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
let rx = file_handler(rx,cancel , match opt.max_operations {
Some(n) => n.into(),
None => 0,
}, opt.back_pressure.into());
(tx, rx)
}
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn(opt: Options) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
#[derive(Debug)]
struct NeverFuture;
use std::task::Poll;
impl Future for NeverFuture
{
type Output = super::Infallible;
fn poll(self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
spawn_with_cancel(opt, NeverFuture.map(|_| ()))
}
/// Raw handler for io_uring file hashing.
///
/// # Parameters
/// * `recv` - Takes the incoming file path to hash
/// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.)
/// * `backpressure` - The maximum backing size of the output stream. Operations sending results will wait for there to be space before returning them. If results are not taken from the stream, the operation will wait until there is space.
///
/// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler<C>(mut recv: mpsc::Receiver<PathBuf>, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Unpin + Send + Sync + 'static
where C: Future<Output = ()> + 'static + Send
{
let (r_tx, r_rx) = mpsc::channel(backpressure);
let (h_tx, h_rx) = oneshot::channel::<super::Infallible>();
std::thread::spawn(move || {
tokio_uring::start(async move {
tokio::pin!(cancel);
// No need for Arc, this is single threaded.
let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops)));
let h_tx = std::rc::Rc::new(h_tx);
while let Some(path) = tokio::select!{
n = recv.recv() => n,
_ = &mut cancel => None,
} {
let ret = r_tx.clone();
let sem = sem.clone();
let h_tx = h_tx.clone();
tokio_uring::spawn(async move {
let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await {
Some(Err(_e)) => return, // Semaphore has been closed.
Some(Ok(v)) => Some(v),
None => None,
};
let _h_tx = h_tx;
let mut file = match OpenOptions::new()
.read(true)
.open(&path).await {
Ok(v) => v,
Err(e) => {
let _ = ret.send((path, Err(e))).await;
return;
},
};
let mut hasher = Sha256::new();
let ring_res = uring_read(&mut file, |buffer| {
if ret.is_closed() {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation"));
}
hasher.update(buffer);
Ok(())
}).await;
let _ = tokio::join![
file.close(), // We are in a unique task per file, so awaiting this here concurrently with the returning async block is fine.
async move {
match ring_res {
Ok(_n) => {
let _ = ret.send((path, Ok(hasher.into()))).await;
},
Err(e) => {
// To prevent DOSing this task, we do not defer the writing of failed results like we used to. If the stream is full, we wait regardless of the result.
let _ = ret.send((path, Err(e))).await;
/*
match ret.try_send((path, Err(e))) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); },
_ => return,
}*/
}
}
}
];
});
}
//Yield the current task to allow the newly spawned one to run.
//XXX: Is this a safe way of passing the semaphore to the task?
tokio::task::yield_now().await;
// --- End of new inputs
//XXX: FUUUUUUUCK why can't i just acquire_owned() without using Arc? Fucking hell...
//let _sem = sem.as_ref().map(|x| x.try_acquire_many(x.available_permits() as u32).unwrap());
// Drop the master refcount of `h_tx`.
drop(h_tx);
// Drop sender that we're cloning from
drop(r_tx);
// When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error.
let _ = h_rx.await;
// Close and drop the semaphore source refcount.
if let Some(sem) = &sem {
sem.close();
}
drop(sem);
});
});
ReceiverStream::new(r_rx)
}