From 1a23ce4c4404208bb9f3071f66079b32fcb53d99 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 7 Oct 2020 01:31:51 +0100 Subject: [PATCH] serialise oke --- Cargo.lock | 12 ++++++ Cargo.toml | 1 + src/main.rs | 11 +++++- src/map.rs | 2 +- src/parallel.rs | 8 +--- src/recurse.rs | 94 ++++++++++++++++++++++++++++++++++++++++++++ src/serial.rs | 83 ++++++++++++++++++++++++++++++++++++++ src/splash.rs | 36 ++++++++++++----- test/hi2 | 0 test/hi3 | 0 test/hi4 | 0 test/test2/test3/oof | 0 12 files changed, 228 insertions(+), 19 deletions(-) create mode 100644 test/hi2 create mode 100644 test/hi3 create mode 100644 test/hi4 create mode 100644 test/test2/test3/oof diff --git a/Cargo.lock b/Cargo.lock index 83ce25c..e08429f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -557,6 +557,17 @@ dependencies = [ "rand_core", ] +[[package]] +name = "recolored" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1584c92dd8a87686229f766bb3a62d263a90c47c81e45a49f1a6d684a1b7968d" +dependencies = [ + "atty", + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -621,6 +632,7 @@ dependencies = [ "lazy_static", "log", "pretty_env_logger", + "recolored", "rustc_version", "tokio", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 922f99c..3b848ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ color-eyre = {version = "0.5.6", default-features=false} futures = {version = "0.3.5", optional = true} lazy_static = "1.4.0" uuid = {version = "0.8.1", features = ["v4"]} +recolored = "1.9.3" [build-dependencies] rustc_version = "0.2" diff --git a/src/main.rs b/src/main.rs index bbc1ea4..e811b70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,6 +82,13 @@ mod serial; #[cfg(not(feature="parallel"))] fn main() -> eyre::Result<()> { reyre!(init(), "Failed to initialise")?; - let args = args_or_out(std::env::args(), 2).skip(1).dedup(); - todo!("Sync unimplemented") + cfg_if!{ + if #[cfg(feature="recursive")] { + let args = recurse::walk_dirs(args_or_out(std::env::args(), 2).skip(1)).dedup(); + } else { + let args = args_or_out(std::env::args(), 2).skip(1).dedup(); + } + }; + reyre!(serial::main(args), + "Jobs failed") } diff --git a/src/map.rs b/src/map.rs index 1dc5b3c..79b5721 100644 --- a/src/map.rs +++ b/src/map.rs @@ -59,7 +59,7 @@ where I::Item: Hash { let set = match self.size_hint() { (0, Some(0)) | (0, None) => HashSet::new(), - (x, None) | (_, Some(x)) => HashSet::with_capacity(x), + (_, Some(x)) | (x, None) => HashSet::with_capacity(x), }; DedupIter(self, set) } diff --git a/src/parallel.rs b/src/parallel.rs index 66de235..1089498 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -107,7 +107,7 @@ pub async fn main>(list: I) -> eyre::Result<()> //let list = list.into_iter(); let mut failures = match list.size_hint() { (0, Some(0)) | (0, None) => Vec::new(), - (x, None) | (_, Some(x)) => Vec::with_capacity(x), + (_, Some(x)) | (x, None) => Vec::with_capacity(x), }; let mut done = 0usize; for (i, res) in (0usize..).zip(join_stream(list.map(|file| tokio::spawn(work(file, sem.clone())))) @@ -170,11 +170,7 @@ fn push_dir<'a>(path: &'a Path, depth: usize, to: mpsc::Sender) -> BoxFu (0, Some(0)) | (0, None) => Vec::new(), (x, None) | (_, Some(x)) => Vec::with_capacity(x), }; - let can_recurse = match MAX_DEPTH { - Recursion::All => true, - Recursion::N(n) if depth < usize::from(n) => true, - _ => false, - }; + let can_recurse = MAX_DEPTH.can_recurse(depth); while let Some(item) = dir.next_entry().await? { let mut to = to.clone(); workers.push(async move { diff --git a/src/recurse.rs b/src/recurse.rs index daee62e..deb0601 100644 --- a/src/recurse.rs +++ b/src/recurse.rs @@ -12,6 +12,20 @@ pub enum Recursion N(NonZeroUsize), } +impl Recursion { + pub fn can_recurse(&self, depth: usize) -> bool + { + match self { + Recursion::All => true, + Recursion::N(n) if depth < usize::from(*n) => true, + _ => { + warn!("Depth {} exceeds max recursion depth of {}, ignoring", depth, self); + false + }, + } + } +} + cfg_if!{ if #[cfg(feature="limit-recursion")] { pub const MAX_DEPTH: Recursion = Recursion::N(unsafe{NonZeroUsize::new_unchecked(256)}); @@ -31,3 +45,83 @@ impl fmt::Display for Recursion } } } + +#[cfg(not(feature="parallel"))] +mod iter +{ + use super::*; + use std::{ + path::Path, + collections::VecDeque, + fs, + iter::{ + Fuse, + }, + }; + + #[derive(Debug)] + pub struct DirWalker{ + paths: VecDeque<(String, usize)>, + iter: Fuse + } + + /// Walk any amount of directories + pub fn walk_dirs(iter: I) -> DirWalker + where I::Item: Into + { + let iter = iter.into_iter(); + let paths = match iter.size_hint() { + (0, Some(0)) | (0, None) => VecDeque::new(), + (x, None) | (_, Some(x)) => VecDeque::with_capacity(x), + }; + DirWalker { + paths, + iter: iter.fuse(), + } + } + + impl Iterator for DirWalker + where I::Item: Into + { + type Item = String; + + fn next(&mut self) -> Option + { + fn process(inserter: &mut VecDeque<(String, usize)>, p: String, depth: usize) + { + match fs::read_dir(&p) { + Ok(dir) => { + for file in dir { + match file { + Ok(file) => match file.path().into_os_string().into_string() { + Ok(string) => inserter.push_front((string, depth)), + Err(err) => error!("Couldn't process file {:?} because it contains invalid UTF-8", err), + }, + Err(err) => error!("Failed to enumerate dir {:?}: {}", p, err), + } + } + }, + Err(err) => error!("Walking dir {:?} failed: {}", p, err), + } + } + if let Some(next) = self.iter.next() { + self.paths.push_front((next.into(), 0)); + } + if let Some((path, depth)) = self.paths.pop_back() { + if Path::new(&path).is_dir() { + if MAX_DEPTH.can_recurse(depth) { + process(&mut self.paths, path, depth+1); + } + self.next() + } else { + Some(path) + } + } else { + None + } + } + } +} + +#[cfg(not(feature="parallel"))] +pub use iter::*; diff --git a/src/serial.rs b/src/serial.rs index ec6e2a0..d1ff719 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -1,3 +1,86 @@ //! Sync operations use super::*; +use std::{ + fs::{ + self, + OpenOptions, + }, + path::{ + Path, + }, +}; +use error::{ + Error, ErrorKind, +}; +fn unlink(path: &Path) -> Result<(), Error> +{ + let tmp = temp::TempFile::new_in(path.parent().unwrap()); + fs::copy(path, &tmp).map_err(|e| Error::new(ErrorKind::Copy(e), path.to_owned()))?; + fs::remove_file(path).map_err(|e| Error::new(ErrorKind::Unlink(e), path.to_owned()))?; + fs::rename(&tmp, path).map_err(|e| Error::new(ErrorKind::Move(e), path.to_owned()))?; + tmp.release(); // file no longer exists, so no need to drop; + Ok(()) +} + +fn work>(apath: P) -> Result<(P, bool), Error> +{ + let path = apath.as_ref(); + let file = OpenOptions::new() + .read(true) + .open(path) + .map_err(|e| (ErrorKind::Open(e), path))?; + + let meta = match file.metadata() { + Ok(meta) => meta, + Err(err) => { + debug!("Failed to stat file: {}", err); + warn!("Failed to stat {:?}, skipping", path); + return Err((ErrorKind::Stat(err), path).into()); + }, + }; + use std::os::unix::fs::MetadataExt; + + let nlink = meta.nlink(); + debug!("<{:?}> has {} links", path, nlink); + if nlink > 1 { + unlink(path)?; + Ok((apath, true)) + } else { + Ok((apath, false)) + } +} + +pub fn main>(list: I) -> eyre::Result<()> +{ + let list = list.into_iter(); + let mut failures = match list.size_hint() { + (0, Some(0)) | (0, None) => Vec::new(), + (_, Some(x)) | (x, None) => Vec::with_capacity(x), + }; + let mut done =0; + for file in list + { + match work(file) { + Ok((path, true)) => info!("<{:?}> OK (processed)", path), + Ok((path, false)) => info!("<{:?}> OK (skipped)", path), + Err(k) => { + failures.push((k.path().to_owned(), k.to_string())); + trace!("<{:?}> Failed (skipped)", k.path()); + }, + } + done+=1; + } + + if failures.len() > 0 { + return Err(eyre!("{}/{} tasks failed to complete successfullly", failures.len(), done)) + .with_section(|| failures.into_iter() + .map(|(x, err)| format!("{}: {}", x.into_os_string() + .into_string() + .unwrap_or_else(|os| os.to_string_lossy().into_owned()), err)) + .join("\n") + .header("Failed tasks:")) + .with_suggestion(|| "Run with `RUST_LOG=debug` or `RUST_LOG=trace` for verbose error reporting"); + } + Ok(()) +} diff --git a/src/splash.rs b/src/splash.rs index 493fdce..b86c8c8 100644 --- a/src/splash.rs +++ b/src/splash.rs @@ -1,21 +1,33 @@ //! Splash screen~ use super::*; use std::borrow::Cow; +use recolored::Colorize; macro_rules! feature { (in $name:tt, $desc:literal) => { cfg_if! { if #[cfg($name)] { - println!(" +{}\t{}", stringify!($name), $desc); + println!(" +{}\t{}", stringify!($name).bright_green(), $desc); + } else { + println!(" -{}", stringify!($name)); } } }; - ($name:literal, $desc:literal $($tt:tt)*) => { + (on $name:literal, $desc:literal $($tt:tt)*) => { cfg_if! { if #[cfg(feature=$name)] { - println!(" +{}\t{}", $name, format!($desc $($tt)*)); + println!(" +{}\t{}", $name.red(), format!($desc $($tt)*)); } else { - println!(" -{}", $name); + println!(" -{}", $name.bright_blue()); + } + } + }; + (off $name:literal, $desc:literal $($tt:tt)*) => { + cfg_if! { + if #[cfg(feature=$name)] { + println!(" +{}\t{}", $name.bright_red(), format!($desc $($tt)*)); + } else { + println!(" -{}", $name.blue()); } } }; @@ -38,13 +50,17 @@ For verbose output, set `RUST_LOG` env var to one of the following: Made by {} with <3 (Licensed GPL 3.0 or later)"#, arg::program_name(), env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_AUTHORS")); println!("\nEnabled extensions: "); + feature!(in nightly, "\tCompiled with Rust nightly extensions"); - println!("Features:"); - feature!("parallel", "\tWill run up to {} operations in parallel", parallel::MAX_WORKERS.map(|x| Cow::Owned(x.to_string())).unwrap_or(Cow::Borrowed("unlimited"))); - feature!("limit-concurrency", "Concurrency is capped"); - feature!("threads", "\tUsing thread-pool"); - feature!("recursive", "\tRecursivly process files up to {} directories deep", recurse::MAX_DEPTH); - feature!("limit-recursion", "Concurrency is capped"); + + println!("\nFeatures:"); + + feature!(on "splash", "\tShow this message"); + feature!(on "parallel", "\tWill run up to {} operations in parallel", parallel::MAX_WORKERS.map(|x| Cow::Owned(x.to_string())).unwrap_or(Cow::Borrowed("unlimited"))); + feature!(on "limit-concurrency", "Concurrency is capped"); + feature!(off "threads", "\tUsing thread-pool"); + feature!(on "recursive", "\tRecursivly process files up to {} directories deep", recurse::MAX_DEPTH); + feature!(on "limit-recursion", "Concurrency is capped"); std::process::exit(1) } diff --git a/test/hi2 b/test/hi2 new file mode 100644 index 0000000..e69de29 diff --git a/test/hi3 b/test/hi3 new file mode 100644 index 0000000..e69de29 diff --git a/test/hi4 b/test/hi4 new file mode 100644 index 0000000..e69de29 diff --git a/test/test2/test3/oof b/test/test2/test3/oof new file mode 100644 index 0000000..e69de29