//! Directory walking
use super ::* ;
use std ::num ::NonZeroUsize ;
use std ::{
sync ::Arc ,
path ::{
Path , PathBuf ,
} ,
} ;
use tokio ::{
sync ::{
RwLock ,
Semaphore ,
} ,
} ;
use futures ::future ::{
Future ,
OptionFuture ,
BoxFuture ,
} ;
#[ derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Default) ]
pub struct Config
{
// None: unlimited. 0: no recursion
pub recursion_depth : Option < usize > ,
// None: Unlimited tasks.
pub max_walkers : Option < NonZeroUsize > ,
}
#[ derive(Debug, Clone) ]
struct UniqueState
{
// Decrease with each new depth until 0 is hit then do not proceed.
recursion_depth : Option < usize > ,
output_sender : mpsc ::Sender < work ::FileInfo > ,
}
#[ derive(Debug) ]
struct SharedState
{
worker_config : Arc < work ::Config > ,
config : Config ,
new_worker_semaphore : Option < Semaphore > ,
}
#[ derive(Debug, Clone) ]
struct State {
/// Shared state for all iterations
shared : Arc < SharedState > , //TODO: XXX: *another* Arc slowdown...
/// State for current iteration
current : UniqueState ,
}
impl State
{
#[ inline ]
pub fn create_file_info_generator ( & self ) -> impl Fn ( PathBuf , std ::fs ::Metadata ) -> work ::FileInfo + ' _
{
work ::FileInfo ::factory ( Arc ::clone ( & self . shared . worker_config ) ) //XXX: ANOTHER unneeded Arc clone..
}
#[ inline(always) ]
pub fn shared ( & self ) -> & SharedState
{
& self . shared
}
#[ inline(always) ]
pub fn unique ( & self ) -> & UniqueState
{
& self . current
}
#[ inline(always) ]
pub fn unique_mut ( & mut self ) -> & mut UniqueState
{
& mut self . current
}
/// Create a `State` for a recursed child walk.
#[ inline ]
pub fn create_child ( & self ) -> Option < Self >
{
let recursion_depth = match self . current . recursion_depth . map ( | depth | depth . saturating_sub ( 1 ) ) {
// Prevent child creation if recursion bound hits 0.
Some ( 0 ) = > return None ,
x = > x ,
} ;
Some ( Self {
shared : Arc ::clone ( & self . shared ) ,
current : UniqueState { recursion_depth , output_sender : self . current . output_sender . clone ( ) } ,
} )
}
}
/// Walk a single directory in the current task. Dispatch async results to `walk
async fn walk_directory < F > ( state : & State , mut push_child : F , whence : impl AsRef < Path > ) -> eyre ::Result < usize >
where F : FnMut ( State , PathBuf )
{
//use futures::future::FutureExt;
let mut read = tokio ::fs ::read_dir ( whence ) . await . wrap_err ( "Failed to read-dir" ) ? ;
let sender = & state . current . output_sender ;
let file_info = state . create_file_info_generator ( ) ;
let mut n = 0 ;
while let Some ( fso ) = read . next_entry ( ) . await . wrap_err ( "Failed to enumerate directory entry" ) ?
{
let metadata = match fso . metadata ( ) . await {
Ok ( metadata ) = > metadata ,
Err ( e ) = > {
error ! ( "Failed to stat {:?}: {}" , fso . file_name ( ) , e ) ;
continue ;
}
} ;
if metadata . is_file ( ) {
if let Err ( _ ) = sender . send ( file_info ( fso . path ( ) , metadata ) ) . await {
warn ! ( "Worker shut down, stopping iteration" ) ;
break ;
} else {
n + = 1 ;
}
} else if metadata . is_dir ( ) {
let Some ( state ) = state . create_child ( ) else { continue } ;
push_child ( state , fso . path ( ) ) ;
} // Ignore symlinks.
}
Ok ( n )
//todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks")
}
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
async fn walk_inner ( state : State , whence : PathBuf ) -> eyre ::Result < usize >
{
use futures ::prelude ::* ;
//async move
{
let backing_res = tokio ::spawn ( async move {
let _permit = match & state . shared . new_worker_semaphore {
Some ( sem ) = > Some ( sem . acquire ( ) . await . expect ( "Failed to acquire permit" ) ) ,
None = > None ,
} ;
// `JoinHandle<eyre::Result<size>>`s to recursion children
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
let mut children = Vec ::new ( ) ;
// Number of *files* sent to tx from this iteration.
let counted = if whence . is_dir ( ) {
walk_directory ( & state , | state , whence | {
fn walk_inner2 ( state : State , whence : PathBuf ) -> BoxFuture < ' static , eyre ::Result < usize > >
{
walk_inner ( state , whence ) . boxed ( )
}
children . push ( walk_inner2 ( state , whence ) ) ;
} , & whence ) . await ?
} else {
let metadata = tokio ::fs ::metadata ( & whence ) . await . wrap_err ( "Failed to stat top-level file" ) ? ;
if let Err ( _ ) = state . current . output_sender . send ( work ::FileInfo ::new ( Arc ::clone ( & state . shared . worker_config ) , whence , metadata ) ) . await {
return Err ( eyre ! ( "Failed to send top-level file to backing sorter: Sorter closed." ) ) ;
}
1
} ;
Ok ( ( counted , children ) )
} ) . await . expect ( "Panic in backing walker thread" ) ;
trace ! ( "Spawning and collecting child workers" ) ;
match backing_res {
Ok ( ( counted , children ) ) = > {
use futures ::prelude ::* ;
Ok ( counted + futures ::future ::join_all ( children . into_iter ( ) ) . await . into_iter ( ) . filter_map ( | res | match res {
Ok ( n ) = > Some ( n ) ,
Err ( e ) = > {
error ! ( "Child failed to walk: {}" , e ) ;
None
} ,
} ) . sum ::< usize > ( ) )
} ,
Err ( e ) = > Err ( e ) ,
}
}
}
#[ inline ]
pub fn start_walk ( cfg : Config , worker : Arc < work ::Config > , whence : impl AsRef < Path > , whereto : mpsc ::Sender < work ::FileInfo > ) -> impl Future < Output = eyre ::Result < usize > > + ' static
{
use futures ::prelude ::* ;
walk_inner ( State {
current : UniqueState {
recursion_depth : cfg . recursion_depth ,
output_sender : whereto ,
} ,
shared : Arc ::new ( SharedState {
new_worker_semaphore : cfg . max_walkers . as_ref ( ) . map ( | max | Semaphore ::new ( max . get ( ) ) ) ,
config : cfg ,
worker_config : worker ,
} ) ,
} , whence . as_ref ( ) . to_owned ( ) )
//.flatten()
}