@ -9,10 +9,11 @@ use tokio_uring::fs::{
} ;
} ;
use tokio ::sync ::{
use tokio ::sync ::{
mpsc ,
mpsc ,
oneshot,
Semaphore
} ;
} ;
use tokio_stream ::wrappers ::ReceiverStream ;
use tokio_stream ::wrappers ::ReceiverStream ;
use futures ::prelude ::* ;
use futures ::prelude ::* ;
use futures ::future ::OptionFuture ;
async fn uring_read < F > ( file : & mut File , mut to : F ) -> io ::Result < usize >
async fn uring_read < F > ( file : & mut File , mut to : F ) -> io ::Result < usize >
where F : FnMut ( & [ u8 ] ) -> io ::Result < ( ) >
where F : FnMut ( & [ u8 ] ) -> io ::Result < ( ) >
@ -37,16 +38,34 @@ where F: FnMut(&[u8]) -> io::Result<()>
Ok ( read )
Ok ( read )
}
}
fn file_handler ( mut recv : mpsc ::Receiver < PathBuf > , backpressure : usize ) -> impl Stream < Item = ( PathBuf , io ::Result < sha256 ::Sha256Hash > ) >
/// Raw handler for io_uring file hashing.
///
/// # Parameters
/// * `recv` - Takes the incoming file path to hash
/// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.)
/// * `backpressure` - The maximum allowed number of **successful** results that are published to the output stream. A failed operation pushes into the stream in the background if it has to.
///
/// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler ( mut recv : mpsc ::Receiver < PathBuf > , max_ops : usize , backpressure : usize ) -> impl Stream < Item = ( PathBuf , io ::Result < sha256 ::Sha256Hash > ) >
{
{
let ( r_tx , r_rx ) = mpsc ::channel ( backpressure ) ;
let ( r_tx , r_rx ) = mpsc ::channel ( backpressure ) ;
std ::thread ::spawn ( move | | {
std ::thread ::spawn ( move | | {
tokio_uring ::start ( async move {
tokio_uring ::start ( async move {
// No need for Arc, this is single threaded.
while let Some ( path ) = recv . recv ( ) . await {
let sem = Some ( std ::rc ::Rc ::new ( Semaphore ::new ( max_ops ) ) ) ;
while let Some ( path ) = recv . recv ( ) . await { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks.
let ret = r_tx . clone ( ) ;
let ret = r_tx . clone ( ) ;
let sem = sem . clone ( ) ;
tokio_uring ::spawn ( async move {
tokio_uring ::spawn ( async move {
let _sem = match OptionFuture ::from ( sem . as_ref ( ) . map ( | x | Semaphore ::acquire ( x ) ) ) . await {
Some ( Err ( _ ) ) = > return , // Semaphore has been closed.
Some ( Ok ( v ) ) = > Some ( v ) ,
None = > None ,
} ;
let mut file = OpenOptions ::new ( )
let mut file = OpenOptions ::new ( )
. read ( true )
. read ( true )
. open ( & path ) . await . unwrap ( ) ;
. open ( & path ) . await . unwrap ( ) ;
@ -79,8 +98,14 @@ fn file_handler(mut recv: mpsc::Receiver<PathBuf>, backpressure: usize) -> impl
] ;
] ;
} ) ;
} ) ;
}
}
// --- End of new inputs
// Drop sender that we're cloning from
// Drop sender that we're cloning from
drop ( r_tx ) ;
drop ( r_tx ) ;
// Drop the semaphore source refcount
if let Some ( sem ) = sem {
sem . close ( ) ;
}
} ) ;
} ) ;
} ) ;
} ) ;
ReceiverStream ::new ( r_rx )
ReceiverStream ::new ( r_rx )