#[macro_use] extern crate log; #[macro_use] extern crate lazy_static; use color_eyre::{ eyre::{ self, eyre, WrapErr as _, }, SectionExt as _, Help as _, }; use tokio::{ sync::{ mpsc, }, }; use jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; mod args; mod order; mod work; mod walk; fn init_logging() -> eyre::Result<()> { color_eyre::install()?; pretty_env_logger::init(); Ok(()) } async fn work_on(cfg: work::Config, mut into: mpsc::Receiver) -> eyre::Result { use work::*; let mut map = FSTimeMap::new(cfg); while let Some(file) = into.recv().await { if cfg!(debug_assertions) { trace!("insert +{}", map.len()); } map.insert(file); } Ok(map) } async fn walk_paths(paths: I, cfg: walk::Config, worker_cfg: &std::sync::Arc, to: mpsc::Sender) -> eyre::Result where I: futures::stream::Stream, P: AsRef { use futures::prelude::*; let children: Vec = paths.map(|path| futures::stream::once(walk::start_walk(cfg.clone(), std::sync::Arc::clone(worker_cfg), path, to.clone())).boxed_local()).flatten_unordered(None).try_collect().await?; Ok(children.into_iter().sum()) } fn print_help(to: &mut W) -> std::io::Result<()> where W: std::io::Write, { let execp = args::prog_name(); writeln!(to, "{} v{} - {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_DESCRIPTION"))?; writeln!(to, " GPL'd with <3 by {}", env!("CARGO_PKG_AUTHORS"))?; writeln!(to, "\nUsage:")?; writeln!(to, "{execp} [OPTIONS] [--] []")?; writeln!(to, "\tAccording to OPTIONS, given input file paths `files...` (or, if empty, paths read from `stdin`), write them to `stdout` ordered by their metadata's timecodes")?; writeln!(to, "{execp} --help")?; writeln!(to, "\tPrint this message to `stderr`, then exit with code 0.")?; writeln!(to, "")?; writeln!(to, "OPTIONS:")?; macro_rules! write_opt { ($($name:literal),+ => $explain:literal $(, $format:expr)*) => { { let names = [$($name),+].into_iter().fold(String::default(), |prev, n| if prev.is_empty() { n.to_string() } else { format!("{prev}, {n}") }); writeln!(to, concat!(" {}\t", $explain), names $(, $format)*) } }; } write_opt!("-r", "--recursive " => "Recursively sort input files, up to `` (set to 0 for infniite); if limit is not specified, recursion is infinite")?; write_opt!("-z", "-0", "--nul" => "Seperate lines when reading/writing by the ascii NUL character (0) instead of a newline. This applies to reading input from `stdin` as well as writing output")?; write_opt!("-I", "--delim ifs" => "Read the first byte of the IFS environment variable as the I/O line seperator.")?; write_opt!("--delim " => "Use this user-provided byte as the I/O line seperator")?; write_opt!("-a", "--atime" => "Sort by atime")?; write_opt!("-c", "--ctime" => "Sort by ctime")?; write_opt!("-m", "--mtime" => "Sort by mtime")?; write_opt!("-b", "--btime" => "Sort by birth (default)")?; write_opt!("-n", "--reverse" => "Print output in reverse")?; write_opt!("-p", "--parallel cpus|" => "Run tasks in parallel, with a max number of tasks being equal ``, or, if 0, to infinity (see `-P`), if 'cpus', to the number of logical CPU cores ({}, default)", *walk::NUM_CPUS)?; write_opt!("-P", "--parallel 0" => "Run tasks with unbounded parallelism, no limit to the number of walker tasks running at once (note: the physical thread pool will always be the same size regardless of these flags)")?; write_opt!("-1", "--parallel 1" => "Only let one directory be processed at once")?; write_opt!("-", "--" => "Stop parsing arguments, treat the rest as input paths")?; //TODO: Allow controlling failure modes (currently it's hardcoded when walking will fail and why and also kind arbitary; it being controllable would be better). writeln!(to, "")?; writeln!(to, "ENV VARS:")?; writeln!(to, "`RUST_LOG` - Control the logging (to stderr) level.")?; writeln!(to, r#" "none" - No output. "error" - Errors only. "warn" - Warnings and above. "info" - Information and above. "debug" - Debug information and above. "trace" - All recorded information."#)?; Ok(()) } #[tokio::main] async fn main() -> eyre::Result<()> { init_logging().wrap_err("Failed to set logging handlers")?; let (tx, rx) = mpsc::channel(4096); //Read main config from args let args = match args::parse_args() .wrap_err("Failed to parse command line args") .with_suggestion(|| "Try `--help`")? { args::Mode::Normal(n) => n, args::Mode::Help => return print_help(&mut std::io::stderr().lock()).wrap_err("Failed to write help to stderr"), }; debug!("Parsed args: {:?}", args); let worker_cfg = { //Read worker config from main config std::sync::Arc::new(args.worker.clone()) }; let walker_cfg = { //Read walker config from main config args.walker.clone() }; // Spin up ordering task. let ordering = { let cfg = (*worker_cfg).clone(); tokio::spawn(async move { trace!("spun up ordering backing thread with config: {:?}", &cfg); work_on(cfg, rx).await //TODO: Parse config from args }) }; trace!("Starting recursive walk of input locations with config: {:?}", &walker_cfg); //TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx` let walk = walk_paths(args.paths(), walker_cfg, &worker_cfg, tx); tokio::pin!(walk); let set = async move { ordering.await.wrap_err("Ordering task panic")? .wrap_err(eyre!("Failed to collect ordered files")) }; tokio::pin!(set); /*let set = tokio::select! { n = walk => { let n =n.wrap_err("Walker failed")?; info!("Walked {} files", n); }, res = set => { let set = res.wrap_err("Ordering task failed before walk completed")?; return Err(eyre!("Ordering task exited before walker task")); } };*/ let (walk, set) = { let (w, s) = tokio::join!(walk, set); (w?, s?) }; info!("Walked {} files", walk); // Write the output in a blocking task - There's not much benefit from async here. tokio::task::spawn_blocking(move || -> eyre::Result<()> { use std::io::Write; use std::os::unix::prelude::*; let mut stdout = { let lock = std::io::stdout().lock(); std::io::BufWriter::new(lock) }; trace!("Writing ordered results to stdout... (buffered, sync, rev: {})", args.reverse); #[inline] fn operate_on(stdout: &mut W, set: I, delim: &[u8]) -> eyre::Result<()> where W: Write, I: IntoIterator + ExactSizeIterator + DoubleEndedIterator + std::iter::FusedIterator + 'static { for info in set { stdout.write_all(info.path().as_os_str().as_bytes()) .and_then(|_| stdout.write_all(delim)) .wrap_err("Failed to write raw pathname for entry to stdout") .with_context(|| format!("{:?}", info.path()).header("Pathname was"))?; } Ok(()) } let delim = &[args.delim]; if args.reverse { operate_on(&mut stdout, set.into_iter().rev(), delim) } else { operate_on(&mut stdout, set.into_iter(), delim) }.wrap_err("Abandoning output write due to intermittent failure")?; stdout.flush().wrap_err("Failed to flush buffered output to stdout")?; Ok(()) }).await.wrap_err("Writer (blocking) task panic")? .wrap_err("Failed to write results to stdout")?; trace!("Finished output task"); Ok(()) }