You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
enumerate-ordered/src/main.rs

216 lines
7.5 KiB

#[macro_use] extern crate log;
#[macro_use] extern crate lazy_static;
use color_eyre::{
eyre::{
self,
eyre,
WrapErr as _,
},
SectionExt as _,
Help as _,
};
use tokio::{
sync::{
mpsc,
},
};
use jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
mod args;
mod order;
mod work;
mod walk;
fn init_logging() -> eyre::Result<()>
{
color_eyre::install()?;
pretty_env_logger::init();
Ok(())
}
async fn work_on(cfg: work::Config, mut into: mpsc::Receiver<work::FileInfo>) -> eyre::Result<work::FSTimeMap>
{
use work::*;
let mut map = FSTimeMap::new(cfg);
while let Some(file) = into.recv().await {
if cfg!(debug_assertions) {
trace!("insert +{}", map.len());
}
map.insert(file);
}
Ok(map)
}
async fn walk_paths<I, P>(paths: I, cfg: walk::Config, worker_cfg: &std::sync::Arc<work::Config>, to: mpsc::Sender<work::FileInfo>) -> eyre::Result<usize>
where I: futures::stream::Stream<Item = P>,
P: AsRef<std::path::Path>
{
use futures::prelude::*;
let children: Vec<usize> =
paths.map(|path| futures::stream::once(walk::start_walk(cfg.clone(), std::sync::Arc::clone(worker_cfg), path, to.clone())).boxed_local()).flatten_unordered(None).try_collect().await?;
Ok(children.into_iter().sum())
}
fn print_help<W: ?Sized>(to: &mut W) -> std::io::Result<()>
where W: std::io::Write,
{
let execp = args::prog_name();
writeln!(to, "{} v{} - {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_DESCRIPTION"))?;
writeln!(to, " GPL'd with <3 by {}", env!("CARGO_PKG_AUTHORS"))?;
writeln!(to, "\nUsage:")?;
writeln!(to, "{execp} [OPTIONS] [--] [<files...>]")?;
writeln!(to, "\tAccording to OPTIONS, given input file paths `files...` (or, if empty, paths read from `stdin`), write them to `stdout` ordered by their metadata's timecodes")?;
writeln!(to, "{execp} --help")?;
writeln!(to, "\tPrint this message to `stderr`, then exit with code 0.")?;
writeln!(to, "")?;
writeln!(to, "OPTIONS:")?;
macro_rules! write_opt {
($($name:literal),+ => $explain:literal $(, $format:expr)*) => {
{
let names = [$($name),+].into_iter().fold(String::default(), |prev, n| if prev.is_empty() { n.to_string() } else { format!("{prev}, {n}") });
writeln!(to, concat!(" {}\t", $explain), names $(, $format)*)
}
};
}
write_opt!("-r", "--recursive <limit>" => "Recursively sort input files, up to `<limit>` (set to 0 for infniite); if limit is not specified, recursion is infinite")?;
write_opt!("-z", "-0", "--nul" => "Seperate lines when reading/writing by the ascii NUL character (0) instead of a newline. This applies to reading input from `stdin` as well as writing output")?;
write_opt!("-I", "--delim ifs" => "Read the first byte of the IFS environment variable as the I/O line seperator.")?;
write_opt!("--delim <byte>" => "Use this user-provided byte as the I/O line seperator")?;
write_opt!("-a", "--atime" => "Sort by atime")?;
write_opt!("-c", "--ctime" => "Sort by ctime")?;
write_opt!("-m", "--mtime" => "Sort by mtime")?;
write_opt!("-b", "--btime" => "Sort by birth (default)")?;
write_opt!("-n", "--reverse" => "Print output in reverse")?;
write_opt!("-p", "--parallel cpus|<max tasks>" => "Run tasks in parallel, with a max number of tasks being equal `<max tasks>`, or, if 0, to infinity (see `-P`), if 'cpus', to the number of logical CPU cores ({}, default)", *walk::NUM_CPUS)?;
write_opt!("-P", "--parallel 0" => "Run tasks with unbounded parallelism, no limit to the number of walker tasks running at once (note: the physical thread pool will always be the same size regardless of these flags)")?;
write_opt!("-1", "--parallel 1" => "Only let one directory be processed at once")?;
write_opt!("-", "--" => "Stop parsing arguments, treat the rest as input paths")?;
//TODO: Allow controlling failure modes (currently it's hardcoded when walking will fail and why and also kind arbitary; it being controllable would be better).
writeln!(to, "")?;
writeln!(to, "ENV VARS:")?;
writeln!(to, "`RUST_LOG` - Control the logging (to stderr) level.")?;
writeln!(to, r#" "none" - No output.
"error" - Errors only.
"warn" - Warnings and above.
"info" - Information and above.
"debug" - Debug information and above.
"trace" - All recorded information."#)?;
Ok(())
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
init_logging().wrap_err("Failed to set logging handlers")?;
let (tx, rx) = mpsc::channel(4096);
//Read main config from args
let args = match args::parse_args()
.wrap_err("Failed to parse command line args")
.with_suggestion(|| "Try `--help`")?
{
args::Mode::Normal(n) => n,
args::Mode::Help => return print_help(&mut std::io::stderr().lock()).wrap_err("Failed to write help to stderr"),
};
debug!("Parsed args: {:?}", args);
let worker_cfg = {
//Read worker config from main config
std::sync::Arc::new(args.worker.clone())
};
let walker_cfg = {
//Read walker config from main config
args.walker.clone()
};
// Spin up ordering task.
let ordering = {
let cfg = (*worker_cfg).clone();
tokio::spawn(async move {
trace!("spun up ordering backing thread with config: {:?}", &cfg);
work_on(cfg, rx).await //TODO: Parse config from args
})
};
trace!("Starting recursive walk of input locations with config: {:?}", &walker_cfg);
//TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx`
let walk = walk_paths(args.paths(), walker_cfg, &worker_cfg, tx);
tokio::pin!(walk);
let set = async move {
ordering.await.wrap_err("Ordering task panic")?
.wrap_err(eyre!("Failed to collect ordered files"))
};
tokio::pin!(set);
/*let set = tokio::select! {
n = walk => {
let n =n.wrap_err("Walker failed")?;
info!("Walked {} files", n);
},
res = set => {
let set = res.wrap_err("Ordering task failed before walk completed")?;
return Err(eyre!("Ordering task exited before walker task"));
}
};*/
let (walk, set) = {
let (w, s) = tokio::join!(walk, set);
(w?, s?)
};
info!("Walked {} files", walk);
// Write the output in a blocking task - There's not much benefit from async here.
tokio::task::spawn_blocking(move || -> eyre::Result<()> {
use std::io::Write;
use std::os::unix::prelude::*;
let mut stdout = {
let lock = std::io::stdout().lock();
std::io::BufWriter::new(lock)
};
trace!("Writing ordered results to stdout... (buffered, sync, rev: {})", args.reverse);
#[inline]
fn operate_on<W: ?Sized, I>(stdout: &mut W, set: I, delim: &[u8]) -> eyre::Result<()>
where W: Write,
I: IntoIterator<Item = work::FileInfo> + ExactSizeIterator + DoubleEndedIterator + std::iter::FusedIterator + 'static
{
for info in set
{
stdout.write_all(info.path().as_os_str().as_bytes())
.and_then(|_| stdout.write_all(delim))
.wrap_err("Failed to write raw pathname for entry to stdout")
.with_context(|| format!("{:?}", info.path()).header("Pathname was"))?;
}
Ok(())
}
let delim = &[args.delim];
if args.reverse {
operate_on(&mut stdout, set.into_iter().rev(), delim)
} else {
operate_on(&mut stdout, set.into_iter(), delim)
}.wrap_err("Abandoning output write due to intermittent failure")?;
stdout.flush().wrap_err("Failed to flush buffered output to stdout")?;
Ok(())
}).await.wrap_err("Writer (blocking) task panic")?
.wrap_err("Failed to write results to stdout")?;
trace!("Finished output task");
Ok(())
}