@ -12,9 +12,11 @@ use futures::{
Future ,
Future ,
OptionFuture ,
OptionFuture ,
FutureExt ,
FutureExt ,
BoxFuture ,
join_all ,
join_all ,
} ,
} ,
stream ::{
stream ::{
self ,
Stream ,
Stream ,
StreamExt ,
StreamExt ,
} ,
} ,
@ -22,6 +24,7 @@ use futures::{
use tokio ::{
use tokio ::{
sync ::{
sync ::{
Semaphore ,
Semaphore ,
mpsc ,
} ,
} ,
fs ::{
fs ::{
OpenOptions ,
OpenOptions ,
@ -39,6 +42,8 @@ cfg_if!{
}
}
}
}
#[ cfg(feature= " recursive " ) ] use recurse ::{ MAX_DEPTH , Recursion } ;
fn gensem ( ) -> Option < Arc < Semaphore > >
fn gensem ( ) -> Option < Arc < Semaphore > >
{
{
trace ! ( "Limiting concurrency to {:?}" , MAX_WORKERS ) ;
trace ! ( "Limiting concurrency to {:?}" , MAX_WORKERS ) ;
@ -90,7 +95,7 @@ async fn work<P: AsRef<Path>>(apath: P, sem: Option<Arc<Semaphore>>) -> Result<(
}
}
async fn join_stream < I : Stream > ( stream : I ) -> impl Iterator < Item = < I ::Item as Future > ::Output > + ExactSizeIterator
async fn join_stream < I : Stream > ( stream : I ) -> impl Iterator < Item = < I ::Item as Future > ::Output > + ExactSizeIterator
where I ::Item : Future
where I ::Item : Future
{
{
//gotta be a better way than heap allocating here, right?
//gotta be a better way than heap allocating here, right?
stream . then ( | x | async move { x . await } ) . collect ::< Vec < _ > > ( ) . await . into_iter ( )
stream . then ( | x | async move { x . await } ) . collect ::< Vec < _ > > ( ) . await . into_iter ( )
@ -108,7 +113,6 @@ pub async fn main<I: Stream<Item=String>>(list: I) -> eyre::Result<()>
for ( i , res ) in ( 0 usize .. ) . zip ( join_stream ( list . map ( | file | tokio ::spawn ( work ( file , sem . clone ( ) ) ) ) )
for ( i , res ) in ( 0 usize .. ) . zip ( join_stream ( list . map ( | file | tokio ::spawn ( work ( file , sem . clone ( ) ) ) ) )
. map ( | x | { trace ! ( "--- {} Finished ---" , x . len ( ) ) ; x } ) . await )
. map ( | x | { trace ! ( "--- {} Finished ---" , x . len ( ) ) ; x } ) . await )
{
{
//trace!("Done on {:?}", res);
match res {
match res {
Ok ( Ok ( ( path , true ) ) ) = > info ! ( "<{:?}> OK (processed)" , path ) ,
Ok ( Ok ( ( path , true ) ) ) = > info ! ( "<{:?}> OK (processed)" , path ) ,
Ok ( Ok ( ( path , false ) ) ) = > info ! ( "<{:?}> OK (skipped)" , path ) ,
Ok ( Ok ( ( path , false ) ) ) = > info ! ( "<{:?}> OK (skipped)" , path ) ,
@ -156,3 +160,73 @@ pub async fn main<I: Stream<Item=String>>(list: I) -> eyre::Result<()>
Ok ( ( ) )
Ok ( ( ) )
}
}
#[ cfg(feature= " recursive " ) ]
fn push_dir < ' a > ( path : & ' a Path , depth : usize , to : mpsc ::Sender < String > ) -> BoxFuture < ' a , tokio ::io ::Result < ( ) > >
{
async move {
let mut dir = fs ::read_dir ( path ) . await ? ;
let mut workers = match dir . size_hint ( ) {
( 0 , Some ( 0 ) ) | ( 0 , None ) = > Vec ::new ( ) ,
( x , None ) | ( _ , Some ( x ) ) = > Vec ::with_capacity ( x ) ,
} ;
let can_recurse = match MAX_DEPTH {
Recursion ::All = > true ,
Recursion ::N ( n ) if depth < usize ::from ( n ) = > true ,
_ = > false ,
} ;
while let Some ( item ) = dir . next_entry ( ) . await ? {
let mut to = to . clone ( ) ;
workers . push ( async move {
match path . join ( item . file_name ( ) ) . into_os_string ( ) . into_string ( ) {
Ok ( name ) = > {
if item . file_type ( ) . await ? . is_dir ( ) {
if can_recurse {
if let Err ( e ) = push_dir ( name . as_ref ( ) , depth + 1 , to ) . await {
error ! ( "Walking dir {:?} failed: {}" , item . file_name ( ) , e ) ;
}
}
} else {
to . send ( name ) . await . unwrap ( ) ;
}
} ,
Err ( err ) = > {
error ! ( "Couldn't process file {:?} because it contains invalid UTF-8" , err ) ;
} ,
}
Ok ::< _ , std ::io ::Error > ( ( ) )
} ) ;
}
join_all ( workers ) . await ;
Ok ( ( ) )
} . boxed ( )
}
pub async fn expand_dir ( p : String ) -> impl Stream < Item = String >
{
cfg_if ! {
if #[ cfg(feature= " recursive " ) ] {
let ( mut tx , rx ) = mpsc ::channel ( 16 ) ;
tokio ::spawn ( async move {
let path = Path ::new ( & p ) ;
if path . is_dir ( ) {
if let Err ( err ) = push_dir ( path , 0 , tx ) . await {
error ! ( "Walking dir {:?} failed: {}" , path , err ) ;
}
} else {
tx . send ( p ) . await . unwrap ( ) ;
}
} ) ;
rx
} else {
stream ::iter ( iter ::once ( p ) . filter_map ( | p | {
if Path ::new ( & p ) . is_dir ( ) {
warn ! ( "{:?} is a directory, skipping" , p ) ;
None
} else {
Some ( p )
}
} ) )
}
}
}