Parallelised rebase"s saving process

Fortune for rmdupe's current commit: Small blessing − 小吉
master
Avril 3 years ago
parent 2bf7275ca2
commit 7c52334df3
Signed by: flanchan
GPG Key ID: 284488987C31F630

4
Cargo.lock generated

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "0.4.7" version = "0.4.7"
@ -578,7 +580,7 @@ dependencies = [
[[package]] [[package]]
name = "rmdupe" name = "rmdupe"
version = "2.0.1" version = "2.0.2"
dependencies = [ dependencies = [
"chrono", "chrono",
"futures", "futures",

@ -1,6 +1,6 @@
[package] [package]
name = "rmdupe" name = "rmdupe"
version = "2.0.1" version = "2.0.2"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"
@ -12,6 +12,7 @@ lto = "fat"
codegen-units = 1 codegen-units = 1
[features] [features]
default = ["threads"]
threads = ["tokio", "futures", "lzzzz/tokio-io"] threads = ["tokio", "futures", "lzzzz/tokio-io"]
[dev-dependencies] [dev-dependencies]

@ -13,9 +13,47 @@ use std::{
fmt, fmt,
}; };
/// Map of collisions
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CollisionMap<'a>(HashMap<hash::Sha256Hash, Vec<&'a Path>>);
impl<'a> CollisionMap<'a>
{
#[inline] pub fn len(&self) -> usize
{
self.0.len()
}
#[inline] pub fn full_len(&self) -> usize
{
self.0.iter().map(|(_, v)| v.len()).sum()
}
#[inline] pub fn iter(&self) -> impl Iterator<Item = (&hash::Sha256Hash, &[&'a Path])>
{
self.0.iter().map(|(k, v)| (k, v.as_slice()))
}
#[inline] pub fn of_hash(&self, name: &hash::Sha256Hash) -> &[&'a Path]
{
if let Some(vec) = self.0.get(name)
{
&vec[..]
} else {
&[]
}
}
#[inline] pub fn hashes(&self) -> impl Iterator<Item = &hash::Sha256Hash>
{
self.0.iter().map(|(k, _)| k)
}
#[inline] pub fn into_iter(self) -> impl Iterator<Item = (hash::Sha256Hash, Vec<&'a Path>)>
{
self.0.into_iter()
}
}
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
pub struct DupeMap pub struct DupeMap
{ {
names: HashMap<PathBuf, hash::Sha256Hash>,
iteration: HashSet<hash::Sha256Hash>, // What we calculate iteration: HashSet<hash::Sha256Hash>, // What we calculate
table: HashMap<PathBuf, (hash::Sha256Hash, bool)>, // What we save and load, and if it's transient (ignored in calculate) table: HashMap<PathBuf, (hash::Sha256Hash, bool)>, // What we save and load, and if it's transient (ignored in calculate)
} }
@ -53,7 +91,7 @@ impl DupeMap
/// Create a new empty dupe map /// Create a new empty dupe map
pub fn new() -> Self pub fn new() -> Self
{ {
Self{iteration: HashSet::new(), table: HashMap::new()} Self{iteration: HashSet::new(), table: HashMap::new(), names: HashMap::new()}
} }
/// Iterator over all hashes /// Iterator over all hashes
@ -167,14 +205,32 @@ impl DupeMap
} }
/// Try to add to store. True if adding was oke, false if already exists. /// Try to add to store. True if adding was oke, false if already exists.
pub fn try_add(&mut self, hash: hash::Sha256Hash) -> bool pub fn try_add(&mut self, hash: hash::Sha256Hash, name: impl Into<PathBuf>) -> bool
{ {
if self.iteration.contains(&hash) { if self.iteration.insert(hash) {
self.names.insert(name.into(), hash);
true
} else {
false false
}
}
/// Create a map of all collisions
pub fn get_collision_map(&self) -> CollisionMap<'_>
{
let mut cm = CollisionMap(HashMap::new());
for (name, hash) in self.names.iter()
{
if let Some(vec) = cm.0.get_mut(hash)
{
vec.push(name);
} else { } else {
self.iteration.insert(hash); cm.0.insert(*hash, vec![name]);
true }
} }
cm
} }
/// Save this list to a file /// Save this list to a file

@ -62,7 +62,7 @@ mod test {
let mode = config::Mode::default(); let mode = config::Mode::default();
let path = Path::new("test-input"); let path = Path::new("test-input");
assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode).await?); assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode, None).await?);
Ok(()) Ok(())
} }
@ -185,16 +185,20 @@ async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>
println!("Updated. {} changed, {} removed.", changed, removed); println!("Updated. {} changed, {} removed.", changed, removed);
for save in config.save.iter() // Save the hashes set in concurrently to each file.
{ let hashes = Arc::new(hashes);
let save = Path::new(save); futures::future::join_all(config.save.iter().map(|save| {
let hashes = hashes.clone();
let save = save.to_owned();
tokio::task::spawn(async move {
let save = Path::new(&save);
let mut file = match OpenOptions::new() let mut file = match OpenOptions::new()
.create(true) .create(true)
.truncate(true) .truncate(true)
.write(true) .write(true)
.open(&save).await { .open(&save).await {
Ok(v) => v, Ok(v) => v,
Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); continue;}, Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); return;},
}; };
match hashes.save_async(&mut file).await { match hashes.save_async(&mut file).await {
@ -205,7 +209,8 @@ async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>
Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e), Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e),
_ => (), _ => (),
} }
} })
})).await;
Ok(()) Ok(())
} }

@ -33,7 +33,7 @@ where P: AsRef<Path>
} }
/// Handle a detected dupe async /// Handle a detected dupe async
#[inline(always)] #[inline]
#[cfg(feature="threads")] #[cfg(feature="threads")]
async fn handle_dupe_async<P>(path: P, mode: &config::Mode) -> Result<(), error::Error> async fn handle_dupe_async<P>(path: P, mode: &config::Mode) -> Result<(), error::Error>
where P: AsRef<Path> where P: AsRef<Path>
@ -104,7 +104,7 @@ pub fn process_file<P: AsRef<Path>>(path: P, set: &mut container::DupeMap) -> Re
{ {
let path = path.as_ref(); let path = path.as_ref();
if let Some(&hash) = set.get_cache(path) { if let Some(&hash) = set.get_cache(path) {
Ok(set.try_add(hash)) Ok(set.try_add(hash, path))
} else { } else {
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.read(true) .read(true)
@ -114,7 +114,7 @@ pub fn process_file<P: AsRef<Path>>(path: P, set: &mut container::DupeMap) -> Re
let mut result = hash::Sha256Hash::default(); let mut result = hash::Sha256Hash::default();
error::check_size(sz, hash::compute(&mut file, &mut result)?)?; error::check_size(sz, hash::compute(&mut file, &mut result)?)?;
set.cache(path, result); set.cache(path, result);
Ok(set.try_add(result)) Ok(set.try_add(result, path))
} }
} }
@ -134,7 +134,7 @@ pub async fn process_file_async<P: AsRef<Path>>(path: P, set: &std::sync::Arc<to
set.get_cache(path).and_then(|&h| Some(h)) set.get_cache(path).and_then(|&h| Some(h))
} { } {
let mut set = set.lock().await; let mut set = set.lock().await;
Ok(set.try_add(hash)) Ok(set.try_add(hash, path))
} else { } else {
let _lock = match sem { let _lock = match sem {
Some(sem) => Some(sem.acquire_owned().await), Some(sem) => Some(sem.acquire_owned().await),
@ -150,7 +150,7 @@ pub async fn process_file_async<P: AsRef<Path>>(path: P, set: &std::sync::Arc<to
let mut set = set.lock().await; let mut set = set.lock().await;
set.cache(path, result); set.cache(path, result);
Ok(set.try_add(result)) Ok(set.try_add(result, path))
} }
} }

Loading…
Cancel
Save