diff --git a/Cargo.toml b/Cargo.toml index 9b5cecc..3a52ace 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["parallel"] +default = ["parallel", "limit-concurrency", "splash"] +# Limit the concurrent operations of parallel mode to 4096 +limit-concurrency = ["parallel"] +splash = [] parallel = ["tokio", "futures"] threads = ["parallel", "tokio/rt-threaded"] diff --git a/src/arg.rs b/src/arg.rs new file mode 100644 index 0000000..b60d2c5 --- /dev/null +++ b/src/arg.rs @@ -0,0 +1,18 @@ +//! Argument stuff +use super::*; + + +/// Name of executable +pub fn program_name() -> &'static str +{ + lazy_static! { + static ref NAME: String = std::env::args().next().unwrap(); + } + &NAME[..] +} + +/// Print usage +pub fn usage() +{ + println!(r#"Usage: {} "#, program_name()); +} diff --git a/src/ext.rs b/src/ext.rs new file mode 100644 index 0000000..56aba06 --- /dev/null +++ b/src/ext.rs @@ -0,0 +1,25 @@ +use super::*; +use std::iter; + +pub trait StringJoinExt: Sized +{ + fn join>(self, sep: P) -> String; +} + +impl StringJoinExt for I +where I: IntoIterator, + T: AsRef +{ + fn join>(self, sep: P) -> String + { + let mut string = String::new(); + for (first, s) in iter::successors(Some(true), |_| Some(false)).zip(self.into_iter()) + { + if !first { + string.push_str(sep.as_ref()); + } + string.push_str(s.as_ref()); + } + string + } +} diff --git a/src/main.rs b/src/main.rs index ccfc34a..bc242ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,27 +8,58 @@ use color_eyre::{ Help, SectionExt, }; use lazy_static::lazy_static; +use cfg_if::cfg_if; fn init() -> eyre::Result<()> { color_eyre::install()?; - pretty_env_logger::init(); //TODO: Change to builder + if let None = std::env::var_os("RUST_LOG") { + std::env::set_var("RUST_LOG", "info"); + } + pretty_env_logger::init(); trace!("Initialised"); Ok(()) } +mod ext; +use ext::*; +mod map; +use map::*; mod temp; mod error; +mod arg; + +cfg_if!{ + if #[cfg(feature="splash")] { + mod splash; + } else { + mod splash { + #[inline(always)] pub fn splash() -> ! { + super::arg::usage(); + std::process::exit(1) + } + } + } +} #[cfg(feature="parallel")] mod parallel; +fn args_or_out(i: T, low: usize) -> T +{ + if i.len() < low { + splash::splash(); + } else { + i + } +} + #[cfg(feature="parallel")] #[cfg_attr(feature="parallel", tokio::main)] async fn main() -> eyre::Result<()> { reyre!(init(), "Failed to initialise")?; - reyre!(parallel::main(std::env::args().skip(1)).await, "Jobs failed") + reyre!(parallel::main(args_or_out(std::env::args(), 2).skip(1).dedup()).await, "Jobs failed") } #[cfg(not(feature="parallel"))] @@ -37,5 +68,6 @@ mod serial; #[cfg(not(feature="parallel"))] fn main() -> eyre::Result<()> { reyre!(init(), "Failed to initialise")?; + let args = args_or_out(std::env::args(), 2).skip(1).dedup(); todo!("Sync unimplemented") } diff --git a/src/map.rs b/src/map.rs new file mode 100644 index 0000000..1dc5b3c --- /dev/null +++ b/src/map.rs @@ -0,0 +1,66 @@ +//! Map iter +use super::*; +use std::{ + iter, + collections::HashSet, + hash::Hash, +}; + +//TODO: Feature flag for SHA256 hashing +type HashOutput = u64; + +fn compute(what: &H) -> HashOutput +{ + use std::hash::Hasher; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + what.hash(&mut hasher); + hasher.finish() +} + +pub struct DedupIter(I, HashSet) +where ::Item: Hash; + +impl Iterator for DedupIter +where I::Item: Hash +{ + type Item = I::Item; + fn next(&mut self) -> Option + { + if let Some(next) = self.0.next() { + let hash = compute(&next); + if self.1.insert(hash) { + Some(next) + } else { + return self.next(); + } + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) + { + let (low, high) = self.0.size_hint(); + + (if low < 1 {0} else {1}, high) + } +} + +pub trait DedupIterExt: Iterator + Sized +where Self::Item: Hash +{ + fn dedup(self) -> DedupIter; +} + +impl DedupIterExt for I +where I::Item: Hash +{ + fn dedup(self) -> DedupIter + { + let set = match self.size_hint() { + (0, Some(0)) | (0, None) => HashSet::new(), + (x, None) | (_, Some(x)) => HashSet::with_capacity(x), + }; + DedupIter(self, set) + } +} diff --git a/src/parallel.rs b/src/parallel.rs index f3e1165..79925f8 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -25,7 +25,13 @@ use tokio::{ }; use error::{Error, ErrorKind}; -const MAX_WORKERS: Option = Some(unsafe {NonZeroUsize::new_unchecked(4096)}); +cfg_if!{ + if #[cfg(feature="limit-concurrency")] { + pub const MAX_WORKERS: Option = Some(unsafe {NonZeroUsize::new_unchecked(4096)}); + } else { + pub const MAX_WORKERS: Option = None; + } +} fn gensem() -> Option> { @@ -67,7 +73,7 @@ async fn work>(apath: P, sem: Option>) -> Result<( use std::os::unix::fs::MetadataExt; let nlink = meta.nlink(); - debug!("<{:?}> Links: {}", path, nlink); + debug!("<{:?}> has {} links", path, nlink); if nlink > 1 { //todo work i guess fuck it unlink(path).await?; @@ -80,8 +86,13 @@ async fn work>(apath: P, sem: Option>) -> Result<( pub async fn main>(list: I) -> eyre::Result<()> { let sem = gensem(); - let mut failures = 0usize; - for (i, res) in (0usize..).zip(join_all(list.into_iter().map(|file| tokio::spawn(work(file, sem.clone())))) + let list = list.into_iter(); + let mut failures = match list.size_hint() { + (0, Some(0)) | (0, None) => Vec::new(), + (x, None) | (_, Some(x)) => Vec::with_capacity(x), + }; + let mut done = 0usize; + for (i, res) in (0usize..).zip(join_all(list.map(|file| tokio::spawn(work(file, sem.clone())))) .map(|x| {trace!("--- {} Finished ---", x.len()); x}).await) { //trace!("Done on {:?}", res); @@ -96,10 +107,11 @@ pub async fn main>(list: I) -> eyre::Result<()> .with_warning(|| "This suggests a bug in the program"); } else { warn!("Child {} cancelled", i); - failures += 1; + return Ok(()); } }, - Ok(Err(kind)) if !kind.kind().is_skippable() => { // + Ok(Err(kind)) if !kind.kind().is_skippable() => { + failures.push((kind.path().to_owned(), kind.to_string())); let fuck = format!("{:?}", kind.path()); let sug = kind.kind().suggestion(); let err = Err::(kind) @@ -109,18 +121,23 @@ pub async fn main>(list: I) -> eyre::Result<()> .unwrap_err(); error!("{}", err); debug!("Error: {:?}", err); - failures += 1; }, Ok(Err(k)) => { + failures.push((k.path().to_owned(), k.to_string())); trace!("<{:?}> Failed (skipped)", k.path()); - failures+=1; }, } + done+=1; } - if failures > 0 { - return Err(eyre!("Some tasks failed to complete successfullly")) - .with_section(|| failures.to_string().header("Number of failed tasks")) + if failures.len() > 0 { + return Err(eyre!("{}/{} tasks failed to complete successfullly", failures.len(), done)) + .with_section(|| failures.into_iter() + .map(|(x, err)| format!("{}: {}", x.into_os_string() + .into_string() + .unwrap_or_else(|os| os.to_string_lossy().into_owned()), err)) + .join("\n") + .header("Failed tasks:")) .with_suggestion(|| "Run with `RUST_LOG=debug` or `RUST_LOG=trace` for verbose error reporting"); } diff --git a/src/splash.rs b/src/splash.rs new file mode 100644 index 0000000..8f02c91 --- /dev/null +++ b/src/splash.rs @@ -0,0 +1,47 @@ +//! Splash screen~ +use super::*; +use std::borrow::Cow; + +macro_rules! feature { + (in $name:tt, $desc:literal) => { + cfg_if! { + if #[cfg($name)] { + println!(" +{}\t{}", stringify!($name), $desc); + } + } + }; + ($name:literal, $desc:literal $($tt:tt)*) => { + cfg_if! { + if #[cfg(feature=$name)] { + println!(" +{}\t{}", $name, format!($desc $($tt)*)); + } else { + println!(" -{}", $name); + } + } + }; +} + +pub fn splash() -> ! { + arg::usage(); + + // splash screen + println!(r#" +> sever ({}) v{} +> Coerce hardlinks to new files + +For verbose output, set `RUST_LOG` env var to one of the following: + trace - Most verbose + debug - Verbose + info - Default + warn - Only show warnings + error - Only show errors + +Made by {} with <3 (Licensed GPL 3.0 or later)"#, arg::program_name(), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_AUTHORS")); + println!("\nEnabled extensions: "); + feature!(in nightly, "\tCompiled with Rust nightly extensions"); + println!(); + feature!("parallel", "\tWill run up to {} operations in parallel", parallel::MAX_WORKERS.map(|x| Cow::Owned(x.to_string())).unwrap_or(Cow::Borrowed("unlimited"))); + feature!("limit-concurrency", "Concurrency is capped"); + feature!("threads", "\tUsing thread-pool"); + std::process::exit(1) +} diff --git a/src/temp.rs b/src/temp.rs index 40ebe07..9bb25a7 100644 --- a/src/temp.rs +++ b/src/temp.rs @@ -54,6 +54,7 @@ impl TempFile d } + #[cfg(feature="parallel")] /// Attempt to remove this temp file async pub async fn drop_async(mut self) -> tokio::io::Result<()> { diff --git a/test/hi b/test/hi new file mode 100644 index 0000000..e69de29