From 7c52334df3b2ea985e13c9325e85473924986742 Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 13 Jan 2022 23:25:52 +0000 Subject: [PATCH] Parallelised rebase"s saving process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for rmdupe's current commit: Small blessing − 小吉 --- Cargo.lock | 4 ++- Cargo.toml | 3 ++- src/container.rs | 68 +++++++++++++++++++++++++++++++++++++++++++----- src/main.rs | 51 ++++++++++++++++++++---------------- src/proc.rs | 10 +++---- 5 files changed, 100 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fae70b3..5ab0d83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "arc-swap" version = "0.4.7" @@ -578,7 +580,7 @@ dependencies = [ [[package]] name = "rmdupe" -version = "2.0.1" +version = "2.0.2" dependencies = [ "chrono", "futures", diff --git a/Cargo.toml b/Cargo.toml index add8f6d..f173c6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmdupe" -version = "2.0.1" +version = "2.0.2" authors = ["Avril "] edition = "2018" @@ -12,6 +12,7 @@ lto = "fat" codegen-units = 1 [features] +default = ["threads"] threads = ["tokio", "futures", "lzzzz/tokio-io"] [dev-dependencies] diff --git a/src/container.rs b/src/container.rs index 3d35bb7..52190d4 100644 --- a/src/container.rs +++ b/src/container.rs @@ -13,9 +13,47 @@ use std::{ fmt, }; +/// Map of collisions +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CollisionMap<'a>(HashMap>); + +impl<'a> CollisionMap<'a> +{ + #[inline] pub fn len(&self) -> usize + { + self.0.len() + } + #[inline] pub fn full_len(&self) -> usize + { + self.0.iter().map(|(_, v)| v.len()).sum() + } + #[inline] pub fn iter(&self) -> impl Iterator + { + self.0.iter().map(|(k, v)| (k, v.as_slice())) + } + #[inline] pub fn of_hash(&self, name: &hash::Sha256Hash) -> &[&'a Path] + { + if let Some(vec) = self.0.get(name) + { + &vec[..] + } else { + &[] + } + } + #[inline] pub fn hashes(&self) -> impl Iterator + { + self.0.iter().map(|(k, _)| k) + } + #[inline] pub fn into_iter(self) -> impl Iterator)> + { + self.0.into_iter() + } +} + #[derive(Clone, PartialEq, Eq, Debug)] pub struct DupeMap { + names: HashMap, iteration: HashSet, // What we calculate table: HashMap, // What we save and load, and if it's transient (ignored in calculate) } @@ -53,7 +91,7 @@ impl DupeMap /// Create a new empty dupe map pub fn new() -> Self { - Self{iteration: HashSet::new(), table: HashMap::new()} + Self{iteration: HashSet::new(), table: HashMap::new(), names: HashMap::new()} } /// Iterator over all hashes @@ -167,14 +205,32 @@ impl DupeMap } /// Try to add to store. True if adding was oke, false if already exists. - pub fn try_add(&mut self, hash: hash::Sha256Hash) -> bool + pub fn try_add(&mut self, hash: hash::Sha256Hash, name: impl Into) -> bool { - if self.iteration.contains(&hash) { - false - } else { - self.iteration.insert(hash); + if self.iteration.insert(hash) { + self.names.insert(name.into(), hash); true + } else { + false + } + } + + /// Create a map of all collisions + pub fn get_collision_map(&self) -> CollisionMap<'_> + { + let mut cm = CollisionMap(HashMap::new()); + + for (name, hash) in self.names.iter() + { + if let Some(vec) = cm.0.get_mut(hash) + { + vec.push(name); + } else { + cm.0.insert(*hash, vec![name]); + } } + + cm } /// Save this list to a file diff --git a/src/main.rs b/src/main.rs index 6542624..3851b63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,7 +62,7 @@ mod test { let mode = config::Mode::default(); let path = Path::new("test-input"); - assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode).await?); + assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode, None).await?); Ok(()) } @@ -184,29 +184,34 @@ async fn rebase(config: config::Rebase) -> Result<(), Box } println!("Updated. {} changed, {} removed.", changed, removed); - - for save in config.save.iter() - { - let save = Path::new(save); - let mut file = match OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&save).await { - Ok(v) => v, - Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); continue;}, - }; - - match hashes.save_async(&mut file).await { - Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e), - _ => (), - }; - match file.sync_data().await { - Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e), - _ => (), - } - } + // Save the hashes set in concurrently to each file. + let hashes = Arc::new(hashes); + futures::future::join_all(config.save.iter().map(|save| { + let hashes = hashes.clone(); + let save = save.to_owned(); + tokio::task::spawn(async move { + let save = Path::new(&save); + let mut file = match OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&save).await { + Ok(v) => v, + Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); return;}, + }; + + match hashes.save_async(&mut file).await { + Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e), + _ => (), + }; + match file.sync_data().await { + Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e), + _ => (), + } + }) + })).await; + Ok(()) } diff --git a/src/proc.rs b/src/proc.rs index 949cdb6..4cf6284 100644 --- a/src/proc.rs +++ b/src/proc.rs @@ -33,7 +33,7 @@ where P: AsRef } /// Handle a detected dupe async -#[inline(always)] +#[inline] #[cfg(feature="threads")] async fn handle_dupe_async

(path: P, mode: &config::Mode) -> Result<(), error::Error> where P: AsRef @@ -104,7 +104,7 @@ pub fn process_file>(path: P, set: &mut container::DupeMap) -> Re { let path = path.as_ref(); if let Some(&hash) = set.get_cache(path) { - Ok(set.try_add(hash)) + Ok(set.try_add(hash, path)) } else { let mut file = OpenOptions::new() .read(true) @@ -114,7 +114,7 @@ pub fn process_file>(path: P, set: &mut container::DupeMap) -> Re let mut result = hash::Sha256Hash::default(); error::check_size(sz, hash::compute(&mut file, &mut result)?)?; set.cache(path, result); - Ok(set.try_add(result)) + Ok(set.try_add(result, path)) } } @@ -134,7 +134,7 @@ pub async fn process_file_async>(path: P, set: &std::sync::Arc Some(sem.acquire_owned().await), @@ -150,7 +150,7 @@ pub async fn process_file_async>(path: P, set: &std::sync::Arc