You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
7.5 KiB
216 lines
7.5 KiB
|
|
#[macro_use] extern crate log;
|
|
#[macro_use] extern crate lazy_static;
|
|
use color_eyre::{
|
|
eyre::{
|
|
self,
|
|
eyre,
|
|
WrapErr as _,
|
|
},
|
|
SectionExt as _,
|
|
Help as _,
|
|
};
|
|
use tokio::{
|
|
sync::{
|
|
mpsc,
|
|
},
|
|
};
|
|
|
|
use jemallocator::Jemalloc;
|
|
|
|
#[global_allocator]
|
|
static GLOBAL: Jemalloc = Jemalloc;
|
|
|
|
mod args;
|
|
mod order;
|
|
mod work;
|
|
mod walk;
|
|
|
|
fn init_logging() -> eyre::Result<()>
|
|
{
|
|
color_eyre::install()?;
|
|
pretty_env_logger::init();
|
|
Ok(())
|
|
}
|
|
|
|
async fn work_on(cfg: work::Config, mut into: mpsc::Receiver<work::FileInfo>) -> eyre::Result<work::FSTimeMap>
|
|
{
|
|
use work::*;
|
|
let mut map = FSTimeMap::new(cfg);
|
|
while let Some(file) = into.recv().await {
|
|
if cfg!(debug_assertions) {
|
|
trace!("insert +{}", map.len());
|
|
}
|
|
map.insert(file);
|
|
}
|
|
Ok(map)
|
|
}
|
|
|
|
async fn walk_paths<I, P>(paths: I, cfg: walk::Config, worker_cfg: &std::sync::Arc<work::Config>, to: mpsc::Sender<work::FileInfo>) -> eyre::Result<usize>
|
|
where I: futures::stream::Stream<Item = P>,
|
|
P: AsRef<std::path::Path>
|
|
{
|
|
use futures::prelude::*;
|
|
let children: Vec<usize> =
|
|
paths.map(|path| futures::stream::once(walk::start_walk(cfg.clone(), std::sync::Arc::clone(worker_cfg), path, to.clone())).boxed_local()).flatten_unordered(None).try_collect().await?;
|
|
|
|
Ok(children.into_iter().sum())
|
|
}
|
|
|
|
|
|
fn print_help<W: ?Sized>(to: &mut W) -> std::io::Result<()>
|
|
where W: std::io::Write,
|
|
{
|
|
let execp = args::prog_name();
|
|
writeln!(to, "{} v{} - {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_DESCRIPTION"))?;
|
|
writeln!(to, " GPL'd with <3 by {}", env!("CARGO_PKG_AUTHORS"))?;
|
|
writeln!(to, "\nUsage:")?;
|
|
writeln!(to, "{execp} [OPTIONS] [--] [<files...>]")?;
|
|
writeln!(to, "\tAccording to OPTIONS, given input file paths `files...` (or, if empty, paths read from `stdin`), write them to `stdout` ordered by their metadata's timecodes")?;
|
|
writeln!(to, "{execp} --help")?;
|
|
writeln!(to, "\tPrint this message to `stderr`, then exit with code 0.")?;
|
|
writeln!(to, "")?;
|
|
writeln!(to, "OPTIONS:")?;
|
|
macro_rules! write_opt {
|
|
($($name:literal),+ => $explain:literal $(, $format:expr)*) => {
|
|
{
|
|
let names = [$($name),+].into_iter().fold(String::default(), |prev, n| if prev.is_empty() { n.to_string() } else { format!("{prev}, {n}") });
|
|
writeln!(to, concat!(" {}\t", $explain), names $(, $format)*)
|
|
}
|
|
};
|
|
}
|
|
|
|
write_opt!("-r", "--recursive <limit>" => "Recursively sort input files, up to `<limit>` (set to 0 for infniite); if limit is not specified, recursion is infinite")?;
|
|
write_opt!("-z", "-0", "--nul" => "Seperate lines when reading/writing by the ascii NUL character (0) instead of a newline. This applies to reading input from `stdin` as well as writing output")?;
|
|
write_opt!("-I", "--delim ifs" => "Read the first byte of the IFS environment variable as the I/O line seperator.")?;
|
|
write_opt!("--delim <byte>" => "Use this user-provided byte as the I/O line seperator")?;
|
|
write_opt!("-a", "--atime" => "Sort by atime")?;
|
|
write_opt!("-c", "--ctime" => "Sort by ctime")?;
|
|
write_opt!("-m", "--mtime" => "Sort by mtime")?;
|
|
write_opt!("-b", "--btime" => "Sort by birth (default)")?;
|
|
write_opt!("-n", "--reverse" => "Print output in reverse")?;
|
|
write_opt!("-p", "--parallel cpus|<max tasks>" => "Run tasks in parallel, with a max number of tasks being equal `<max tasks>`, or, if 0, to infinity (see `-P`), if 'cpus', to the number of logical CPU cores ({}, default)", *walk::NUM_CPUS)?;
|
|
write_opt!("-P", "--parallel 0" => "Run tasks with unbounded parallelism, no limit to the number of walker tasks running at once (note: the physical thread pool will always be the same size regardless of these flags)")?;
|
|
write_opt!("-1", "--parallel 1" => "Only let one directory be processed at once")?;
|
|
write_opt!("-", "--" => "Stop parsing arguments, treat the rest as input paths")?;
|
|
|
|
//TODO: Allow controlling failure modes (currently it's hardcoded when walking will fail and why and also kind arbitary; it being controllable would be better).
|
|
|
|
writeln!(to, "")?;
|
|
|
|
writeln!(to, "ENV VARS:")?;
|
|
writeln!(to, "`RUST_LOG` - Control the logging (to stderr) level.")?;
|
|
writeln!(to, r#" "none" - No output.
|
|
"error" - Errors only.
|
|
"warn" - Warnings and above.
|
|
"info" - Information and above.
|
|
"debug" - Debug information and above.
|
|
"trace" - All recorded information."#)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> eyre::Result<()> {
|
|
init_logging().wrap_err("Failed to set logging handlers")?;
|
|
|
|
let (tx, rx) = mpsc::channel(4096);
|
|
|
|
//Read main config from args
|
|
let args = match args::parse_args()
|
|
.wrap_err("Failed to parse command line args")
|
|
.with_suggestion(|| "Try `--help`")?
|
|
{
|
|
args::Mode::Normal(n) => n,
|
|
args::Mode::Help => return print_help(&mut std::io::stderr().lock()).wrap_err("Failed to write help to stderr"),
|
|
};
|
|
debug!("Parsed args: {:?}", args);
|
|
|
|
let worker_cfg = {
|
|
//Read worker config from main config
|
|
std::sync::Arc::new(args.worker.clone())
|
|
};
|
|
let walker_cfg = {
|
|
//Read walker config from main config
|
|
args.walker.clone()
|
|
};
|
|
|
|
// Spin up ordering task.
|
|
let ordering = {
|
|
let cfg = (*worker_cfg).clone();
|
|
tokio::spawn(async move {
|
|
trace!("spun up ordering backing thread with config: {:?}", &cfg);
|
|
work_on(cfg, rx).await //TODO: Parse config from args
|
|
})
|
|
};
|
|
|
|
trace!("Starting recursive walk of input locations with config: {:?}", &walker_cfg);
|
|
//TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx`
|
|
let walk = walk_paths(args.paths(), walker_cfg, &worker_cfg, tx);
|
|
tokio::pin!(walk);
|
|
|
|
let set = async move {
|
|
ordering.await.wrap_err("Ordering task panic")?
|
|
.wrap_err(eyre!("Failed to collect ordered files"))
|
|
};
|
|
tokio::pin!(set);
|
|
|
|
/*let set = tokio::select! {
|
|
n = walk => {
|
|
let n =n.wrap_err("Walker failed")?;
|
|
info!("Walked {} files", n);
|
|
},
|
|
res = set => {
|
|
let set = res.wrap_err("Ordering task failed before walk completed")?;
|
|
return Err(eyre!("Ordering task exited before walker task"));
|
|
}
|
|
};*/
|
|
let (walk, set) = {
|
|
let (w, s) = tokio::join!(walk, set);
|
|
(w?, s?)
|
|
};
|
|
info!("Walked {} files", walk);
|
|
|
|
// Write the output in a blocking task - There's not much benefit from async here.
|
|
tokio::task::spawn_blocking(move || -> eyre::Result<()> {
|
|
use std::io::Write;
|
|
use std::os::unix::prelude::*;
|
|
let mut stdout = {
|
|
let lock = std::io::stdout().lock();
|
|
std::io::BufWriter::new(lock)
|
|
};
|
|
trace!("Writing ordered results to stdout... (buffered, sync, rev: {})", args.reverse);
|
|
|
|
#[inline]
|
|
fn operate_on<W: ?Sized, I>(stdout: &mut W, set: I, delim: &[u8]) -> eyre::Result<()>
|
|
where W: Write,
|
|
I: IntoIterator<Item = work::FileInfo> + ExactSizeIterator + DoubleEndedIterator + std::iter::FusedIterator + 'static
|
|
{
|
|
for info in set
|
|
{
|
|
stdout.write_all(info.path().as_os_str().as_bytes())
|
|
.and_then(|_| stdout.write_all(delim))
|
|
.wrap_err("Failed to write raw pathname for entry to stdout")
|
|
.with_context(|| format!("{:?}", info.path()).header("Pathname was"))?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
let delim = &[args.delim];
|
|
|
|
if args.reverse {
|
|
operate_on(&mut stdout, set.into_iter().rev(), delim)
|
|
} else {
|
|
operate_on(&mut stdout, set.into_iter(), delim)
|
|
}.wrap_err("Abandoning output write due to intermittent failure")?;
|
|
|
|
stdout.flush().wrap_err("Failed to flush buffered output to stdout")?;
|
|
Ok(())
|
|
}).await.wrap_err("Writer (blocking) task panic")?
|
|
.wrap_err("Failed to write results to stdout")?;
|
|
|
|
trace!("Finished output task");
|
|
|
|
Ok(())
|
|
}
|