#[ macro_use ] extern crate log ;
#[ macro_use ] extern crate lazy_static ;
use color_eyre ::{
eyre ::{
self ,
eyre ,
WrapErr as _ ,
} ,
SectionExt as _ ,
Help as _ ,
} ;
use tokio ::{
sync ::{
mpsc ,
} ,
} ;
use jemallocator ::Jemalloc ;
#[ global_allocator ]
static GLOBAL : Jemalloc = Jemalloc ;
mod args ;
mod order ;
mod work ;
mod walk ;
fn init_logging ( ) -> eyre ::Result < ( ) >
{
color_eyre ::install ( ) ? ;
pretty_env_logger ::init ( ) ;
Ok ( ( ) )
}
async fn work_on ( cfg : work ::Config , mut into : mpsc ::Receiver < work ::FileInfo > ) -> eyre ::Result < work ::FSTimeMap >
{
use work ::* ;
let mut map = FSTimeMap ::new ( cfg ) ;
while let Some ( file ) = into . recv ( ) . await {
if cfg! ( debug_assertions ) {
trace ! ( "insert +{}" , map . len ( ) ) ;
}
map . insert ( file ) ;
}
Ok ( map )
}
async fn walk_paths < I , P > ( paths : I , cfg : walk ::Config , worker_cfg : & std ::sync ::Arc < work ::Config > , to : mpsc ::Sender < work ::FileInfo > ) -> eyre ::Result < usize >
where I : futures ::stream ::Stream < Item = P > ,
P : AsRef < std ::path ::Path >
{
use futures ::prelude ::* ;
let children : Vec < usize > =
paths . map ( | path | futures ::stream ::once ( walk ::start_walk ( cfg . clone ( ) , std ::sync ::Arc ::clone ( worker_cfg ) , path , to . clone ( ) ) ) . boxed_local ( ) ) . flatten_unordered ( None ) . try_collect ( ) . await ? ;
Ok ( children . into_iter ( ) . sum ( ) )
}
fn print_help < W : ? Sized > ( to : & mut W ) -> std ::io ::Result < ( ) >
where W : std ::io ::Write ,
{
let execp = args ::prog_name ( ) ;
writeln! ( to , "{} v{} - {}" , env! ( "CARGO_PKG_NAME" ) , env! ( "CARGO_PKG_VERSION" ) , env! ( "CARGO_PKG_DESCRIPTION" ) ) ? ;
writeln! ( to , " GPL'd with <3 by {}" , env! ( "CARGO_PKG_AUTHORS" ) ) ? ;
writeln! ( to , "\nUsage:" ) ? ;
writeln! ( to , "{execp} [OPTIONS] [--] [<files...>]" ) ? ;
writeln! ( to , "\tAccording to OPTIONS, given input file paths `files...` (or, if empty, paths read from `stdin`), write them to `stdout` ordered by their metadata's timecodes" ) ? ;
writeln! ( to , "{execp} --help" ) ? ;
writeln! ( to , "\tPrint this message to `stderr`, then exit with code 0." ) ? ;
writeln! ( to , "" ) ? ;
writeln! ( to , "OPTIONS:" ) ? ;
macro_rules! write_opt {
( $( $name :literal ) , + = > $explain :literal $(, $format :expr ) * ) = > {
{
let names = [ $( $name ) , + ] . into_iter ( ) . fold ( String ::default ( ) , | prev , n | if prev . is_empty ( ) { n . to_string ( ) } else { format! ( "{prev}, {n}" ) } ) ;
writeln! ( to , concat! ( " {}\t" , $explain ) , names $(, $format ) * )
}
} ;
}
write_opt ! ( "-r" , "--recursive <limit>" = > "Recursively sort input files, up to `<limit>` (set to 0 for infniite); if limit is not specified, recursion is infinite" ) ? ;
write_opt ! ( "-z" , "-0" , "--nul" = > "Seperate lines when reading/writing by the ascii NUL character (0) instead of a newline. This applies to reading input from `stdin` as well as writing output" ) ? ;
write_opt ! ( "-I" , "--delim ifs" = > "Read the first byte of the IFS environment variable as the I/O line seperator." ) ? ;
write_opt ! ( "--delim <byte>" = > "Use this user-provided byte as the I/O line seperator" ) ? ;
write_opt ! ( "-a" , "--atime" = > "Sort by atime" ) ? ;
write_opt ! ( "-c" , "--ctime" = > "Sort by ctime" ) ? ;
write_opt ! ( "-m" , "--mtime" = > "Sort by mtime" ) ? ;
write_opt ! ( "-b" , "--btime" = > "Sort by birth (default)" ) ? ;
write_opt ! ( "-n" , "--reverse" = > "Print output in reverse" ) ? ;
write_opt ! ( "-p" , "--parallel cpus|<max tasks>" = > "Run tasks in parallel, with a max number of tasks being equal `<max tasks>`, or, if 0, to infinity (see `-P`), if 'cpus', to the number of logical CPU cores ({}, default)" , * walk ::NUM_CPUS ) ? ;
write_opt ! ( "-P" , "--parallel 0" = > "Run tasks with unbounded parallelism, no limit to the number of walker tasks running at once (note: the physical thread pool will always be the same size regardless of these flags)" ) ? ;
write_opt ! ( "-1" , "--parallel 1" = > "Only let one directory be processed at once" ) ? ;
write_opt ! ( "-" , "--" = > "Stop parsing arguments, treat the rest as input paths" ) ? ;
//TODO: Allow controlling failure modes (currently it's hardcoded when walking will fail and why and also kind arbitary; it being controllable would be better).
writeln! ( to , "" ) ? ;
writeln! ( to , "ENV VARS:" ) ? ;
writeln! ( to , "`RUST_LOG` - Control the logging (to stderr) level." ) ? ;
writeln! ( to , r#" "none" - No output .
"error" - Errors only .
"warn" - Warnings and above .
"info" - Information and above .
"debug" - Debug information and above .
"trace" - All recorded information . " #) ? ;
Ok ( ( ) )
}
#[ tokio::main ]
async fn main ( ) -> eyre ::Result < ( ) > {
init_logging ( ) . wrap_err ( "Failed to set logging handlers" ) ? ;
let ( tx , rx ) = mpsc ::channel ( 4096 ) ;
//Read main config from args
let args = match args ::parse_args ( )
. wrap_err ( "Failed to parse command line args" )
. with_suggestion ( | | "Try `--help`" ) ?
{
args ::Mode ::Normal ( n ) = > n ,
args ::Mode ::Help = > return print_help ( & mut std ::io ::stderr ( ) . lock ( ) ) . wrap_err ( "Failed to write help to stderr" ) ,
} ;
debug ! ( "Parsed args: {:?}" , args ) ;
let worker_cfg = {
//Read worker config from main config
std ::sync ::Arc ::new ( args . worker . clone ( ) )
} ;
let walker_cfg = {
//Read walker config from main config
args . walker . clone ( )
} ;
// Spin up ordering task.
let ordering = {
let cfg = ( * worker_cfg ) . clone ( ) ;
tokio ::spawn ( async move {
trace ! ( "spun up ordering backing thread with config: {:?}" , & cfg ) ;
work_on ( cfg , rx ) . await //TODO: Parse config from args
} )
} ;
trace ! ( "Starting recursive walk of input locations with config: {:?}" , & walker_cfg ) ;
//TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx`
let walk = walk_paths ( args . paths ( ) , walker_cfg , & worker_cfg , tx ) ;
tokio ::pin ! ( walk ) ;
let set = async move {
ordering . await . wrap_err ( "Ordering task panic" ) ?
. wrap_err ( eyre ! ( "Failed to collect ordered files" ) )
} ;
tokio ::pin ! ( set ) ;
/* let set = tokio::select! {
n = walk = > {
let n = n . wrap_err ( "Walker failed" ) ? ;
info ! ( "Walked {} files" , n ) ;
} ,
res = set = > {
let set = res . wrap_err ( "Ordering task failed before walk completed" ) ? ;
return Err ( eyre ! ( "Ordering task exited before walker task" ) ) ;
}
} ; * /
let ( walk , set ) = {
let ( w , s ) = tokio ::join ! ( walk , set ) ;
( w ? , s ? )
} ;
info ! ( "Walked {} files" , walk ) ;
// Write the output in a blocking task - There's not much benefit from async here.
tokio ::task ::spawn_blocking ( move | | -> eyre ::Result < ( ) > {
use std ::io ::Write ;
use std ::os ::unix ::prelude ::* ;
let mut stdout = {
let lock = std ::io ::stdout ( ) . lock ( ) ;
std ::io ::BufWriter ::new ( lock )
} ;
trace ! ( "Writing ordered results to stdout... (buffered, sync, rev: {})" , args . reverse ) ;
#[ inline ]
fn operate_on < W : ? Sized , I > ( stdout : & mut W , set : I , delim : & [ u8 ] ) -> eyre ::Result < ( ) >
where W : Write ,
I : IntoIterator < Item = work ::FileInfo > + ExactSizeIterator + DoubleEndedIterator + std ::iter ::FusedIterator + ' static
{
for info in set
{
stdout . write_all ( info . path ( ) . as_os_str ( ) . as_bytes ( ) )
. and_then ( | _ | stdout . write_all ( delim ) )
. wrap_err ( "Failed to write raw pathname for entry to stdout" )
. with_context ( | | format! ( "{:?}" , info . path ( ) ) . header ( "Pathname was" ) ) ? ;
}
Ok ( ( ) )
}
let delim = & [ args . delim ] ;
if args . reverse {
operate_on ( & mut stdout , set . into_iter ( ) . rev ( ) , delim )
} else {
operate_on ( & mut stdout , set . into_iter ( ) , delim )
} . wrap_err ( "Abandoning output write due to intermittent failure" ) ? ;
stdout . flush ( ) . wrap_err ( "Failed to flush buffered output to stdout" ) ? ;
Ok ( ( ) )
} ) . await . wrap_err ( "Writer (blocking) task panic" ) ?
. wrap_err ( "Failed to write results to stdout" ) ? ;
trace ! ( "Finished output task" ) ;
Ok ( ( ) )
}