From 7c7125dcd946012e3a15341c52cf0d3449426375 Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 23 Jul 2020 21:08:48 +0100 Subject: [PATCH] fixed canonical paths --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/arg.rs | 25 +++++ src/config.rs | 9 ++ src/container.rs | 35 +++++++ src/main.rs | 262 ++++++++++++++++++++++++++++++++++++----------- 6 files changed, 272 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a42663..a8ef6e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,7 +555,7 @@ dependencies = [ [[package]] name = "rmdupe" -version = "1.1.1" +version = "1.2.1" dependencies = [ "chrono", "futures", diff --git a/Cargo.toml b/Cargo.toml index 91e0bb2..a81cc3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmdupe" -version = "1.1.1" +version = "1.2.1" authors = ["Avril "] edition = "2018" diff --git a/src/arg.rs b/src/arg.rs index 74c86a9..b5f7420 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -20,6 +20,7 @@ pub fn program() -> &'static str pub fn usage() -> ! { println!("Usage: {} [OPTIONS] [--] ", program()); + println!("Usage: {} --rebase []", program()); println!("Usage: {} --help", program()); println!("OPTIONS:"); println!(" --load -l:\t\tLoad the hashes from `load-file` if possible."); @@ -39,6 +40,7 @@ pub fn usage() -> ! println!(" --\t\t\tStop reading args"); println!("Other:"); println!(" --help -h:\t\tPrint this message"); + println!(" --rebase:\t\tRebuild hash stores created by `--save`. If no files are provided, use default."); #[cfg(feature="threads")] println!("Compiled with threading support"); std::process::exit(1) @@ -97,6 +99,28 @@ pub enum Output { Normal(config::Config), Help, + Rebase(config::Rebase), +} + +/// Arg parse for rebase +fn parse_rebase(args: I) -> Result +where I: IntoIterator +{ + let mut files = Vec::new(); + + for file in args.into_iter().map(|path| validate_path(path, Ensure::File, true)) + { + files.push(file?); + } + + if files.len() < 1 { + files.push(validate_path(config::DEFAULT_HASHNAME.to_string(), Ensure::File, false)?.to_owned()); + } + + Ok(config::Rebase{ + save: files.clone(), //TODO: Seperate save+loads + load: files, + }) } /// Try to parse args @@ -139,6 +163,7 @@ where I: IntoIterator if reading && arg.chars().next().unwrap_or('\0') == '-' { match &arg[..] { "--help" => return Ok(Output::Help), + "--rebase" => return Ok(Output::Rebase(parse_rebase(args)?)), "--" => reading = false, diff --git a/src/config.rs b/src/config.rs index 354caa2..433eb86 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,3 +65,12 @@ pub struct Config /// Load hashes from pub load: Vec, } + +#[derive(Debug)] +pub struct Rebase +{ + /// Load from here + pub load: Vec, + /// Rebase to here + pub save: Vec, +} diff --git a/src/container.rs b/src/container.rs index 45f78b7..11490a5 100644 --- a/src/container.rs +++ b/src/container.rs @@ -62,6 +62,41 @@ impl DupeMap self.iteration.iter() } + /// Forcefully update the cache + pub fn cache_force(&mut self, id: impl AsRef, hash: hash::Sha256Hash, trans: bool) + { + if let Some((h,t)) = self.table.get_mut(id.as_ref()) { + *h = hash; + *t = trans; + } else { + self.table.insert(id.as_ref().to_owned(), (hash,true)); + } + } + + /// Remove from the cache if it exists + pub fn uncache(&mut self, id: impl AsRef) -> Option<(hash::Sha256Hash, bool)> + { + self.table.remove(id.as_ref()) + } + + /// The amount of cached items (inc. transient ones) + pub fn cache_len(&self) -> usize + { + self.table.len() + } + + /// Iterate through the cache + pub fn cache_iter(&self) -> std::collections::hash_map::Iter + { + self.table.iter() + } + + /// Iterate through the cache + pub fn cache_iter_mut(&mut self) -> std::collections::hash_map::IterMut + { + self.table.iter_mut() + } + /// Cache this path's hash /// /// # Returns diff --git a/src/main.rs b/src/main.rs index f5ecfc4..7066d8d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,104 +72,244 @@ mod test { } } -fn parse_args() -> Result + +#[inline] +fn absolute(path: impl AsRef) -> std::path::PathBuf { - match arg::parse_args()? { - arg::Output::Normal(conf) => Ok(conf), - _ => arg::usage(), - } + std::fs::canonicalize(path).expect("Invalid path internal") } -#[cfg_attr(feature="threads", tokio::main)] #[cfg(feature="threads")] -async fn main() -> Result<(), Box> +async fn rebase_one_async(path: impl AsRef, hash: hash::Sha256Hash) -> Result, error::Error> { + use std::{ + convert::TryInto, + }; use tokio::{ fs::{ OpenOptions, }, - sync::{ - Mutex - }, }; - use std::{ - path::Path, - sync::Arc, - }; - let args = parse_args().into_string()?; - let lmode = &args.mode.logging_mode; - - log!(Debug, lmode => "Args parsed: {:?}", args); + let path = path.as_ref(); + let mut file = OpenOptions::new() + .read(true) + .open(path).await?; + let sz: usize = file.metadata().await?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?; + let mut result = hash::Sha256Hash::default(); + error::check_size(sz, hash::compute_async(&mut file, &mut result).await?)?; - let mut children = Vec::new(); + println!("Computed {:?}", path); + if hash != result { + Ok(Some((path.to_owned(), result))) + } else { + Ok(None) + } +} +#[cfg(feature="threads")] +async fn rebase(config: config::Rebase) -> Result<(), Box> +{ + use std::{ + path::{ + Path, + }, + }; + use tokio::{ + fs::{ + OpenOptions, + }, + }; let mut hashes = container::DupeMap::new(); - - // Load hashes - for (transient, load) in args.load.iter().map(|x| (false, x)).chain(args.save.iter().map(|x| (true, x))) + for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x))) { let load = Path::new(load); if load.exists() { if load.is_file() { - if let Some(mut file) = OpenOptions::new() + if let Ok(mut file) = OpenOptions::new() .read(true) - .open(load).await.log_and_forget(lmode, log::Level::Warning)? + .open(load).await { - log!(Info, lmode => "Hashes loading from {:?}", load); - args.mode.error_mode.handle(hashes.load_async(&mut file, transient).await).log_and_forget(lmode, if transient {log::Level::Info} else {log::Level::Warning})?; + match hashes.load_async(&mut file, transient).await { + Err(e) if !transient=> return Err(format!("Failed to load required {:?}: {}", file, e))?, + _ => (), + }; } - } else { - log!(Warning, lmode => "Exclusing directory from load path {:?}", load); } - } else { - log!(Info, lmode => "Ignoring non-existant load path {:?}", load); } } - log!(Debug, lmode => "Loaded hashes: {}", hashes); - log!(Info, lmode => "Starting checks (threaded)"); - let hashes = Arc::new(Mutex::new(hashes)); - for path in args.paths.iter() + let mut remove = Vec::new(); + let mut children = Vec::with_capacity(hashes.cache_len()); + for (path, (hash, trans)) in hashes.cache_iter() { - let path = Path::new(path); - if path.is_dir() { - log!(Debug, lmode => "Spawning for {:?}", path); - let mode = args.mode.clone(); - let path = path.to_owned(); - let hashes= Arc::clone(&hashes); - children.push(tokio::task::spawn(async move { - log!(Debug, mode.logging_mode => " + {:?}", path); - let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone()).await).log_and_forget(&mode.logging_mode, log::Level::Error); - log!(Info, mode.logging_mode => " - {:?}", path); - res - })); + if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno. + if path.exists() && path.is_file() { + //Getting hash + let path = path.clone(); + let hash = *hash; + children.push(tokio::task::spawn(async move { + rebase_one_async(path, hash).await + })); + } else { + remove.push(path.clone()); + } } } - log!(Info, lmode => "Waiting on children"); - let mut done = proc::DupeCount::default(); + + let (mut changed, mut removed) = (0usize, 0usize); for child in children.into_iter() { - done += args.mode.error_mode.handle(child.await?)?.unwrap_or_default().unwrap_or_default().unwrap_or_default(); + if let Some((path, hash)) = child.await.expect("Child panic")? + { + println!("Updating {:?} -> {}", path, hash); + hashes.cache_force(path, hash, false); + changed +=1; + } + } + for remove in remove.into_iter() + { + println!("Removing {:?}", remove); + hashes.uncache(remove); + removed +=1; } - log!(Info, lmode => "Found: {:?}", done); - - let hashes = hashes.lock().await; - log!(Debug, lmode => "New hashes: {}", hashes); - for save in args.save.iter() + println!("Updated. {} changed, {} removed.", changed, removed); + + for save in config.save.iter() { let save = Path::new(save); - log!(Info, lmode => "Saving hashes to {:?}", save); - if let Some(mut file) = OpenOptions::new() + let mut file = match OpenOptions::new() .create(true) - //.append(true) .truncate(true) .write(true) - .open(save).await.log_and_forget(lmode, log::Level::Warning)? - { - args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?; - } + .open(&save).await { + Ok(v) => v, + Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); continue;}, + }; + + match hashes.save_async(&mut file).await { + Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e), + _ => (), + }; + } + + Ok(()) +} + +#[cfg(not(feature="threads"))] +fn rebase(config: config::Rebase) -> Result<(), Box> +{ + todo!() +} + + +fn parse_args() -> Result +{ + match arg::parse_args()? { + arg::Output::Help => arg::usage(), + conf => Ok(conf), + } +} + +#[cfg_attr(feature="threads", tokio::main)] +#[cfg(feature="threads")] +async fn main() -> Result<(), Box> +{ + use tokio::{ + fs::{ + OpenOptions, + }, + sync::{ + Mutex + }, + }; + use std::{ + path::Path, + sync::Arc, + }; + + match parse_args().into_string()? { + arg::Output::Rebase(r) => { + return rebase(r).await; + }, + arg::Output::Normal(args) => { + let lmode = &args.mode.logging_mode; + + log!(Debug, lmode => "Args parsed: {:?}", args); + + let mut children = Vec::new(); + + let mut hashes = container::DupeMap::new(); + + // Load hashes + for (transient, load) in args.load.iter().map(|x| (false, x)).chain(args.save.iter().map(|x| (true, x))) + { + let load = Path::new(load); + if load.exists() { + if load.is_file() { + if let Some(mut file) = OpenOptions::new() + .read(true) + .open(load).await.log_and_forget(lmode, log::Level::Warning)? + { + log!(Info, lmode => "Hashes loading from {:?}", load); + args.mode.error_mode.handle(hashes.load_async(&mut file, transient).await).log_and_forget(lmode, if transient {log::Level::Info} else {log::Level::Warning})?; + } + } else { + log!(Warning, lmode => "Exclusing directory from load path {:?}", load); + } + } else { + log!(Info, lmode => "Ignoring non-existant load path {:?}", load); + } + } + + log!(Debug, lmode => "Loaded hashes: {}", hashes); + log!(Info, lmode => "Starting checks (threaded)"); + let hashes = Arc::new(Mutex::new(hashes)); + for path in args.paths.iter() + { + let path = Path::new(path); + if path.is_dir() { + log!(Debug, lmode => "Spawning for {:?}", path); + let mode = args.mode.clone(); + let path = absolute(&path); + let hashes= Arc::clone(&hashes); + children.push(tokio::task::spawn(async move { + log!(Debug, mode.logging_mode => " + {:?}", path); + let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone()).await).log_and_forget(&mode.logging_mode, log::Level::Error); + log!(Info, mode.logging_mode => " - {:?}", path); + res + })); + } + } + log!(Info, lmode => "Waiting on children"); + let mut done = proc::DupeCount::default(); + for child in children.into_iter() + { + done += args.mode.error_mode.handle(child.await?)?.unwrap_or_default().unwrap_or_default().unwrap_or_default(); + } + log!(Info, lmode => "Found: {:?}", done); + + let hashes = hashes.lock().await; + log!(Debug, lmode => "New hashes: {}", hashes); + + for save in args.save.iter() + { + let save = Path::new(save); + log!(Info, lmode => "Saving hashes to {:?}", save); + if let Some(mut file) = OpenOptions::new() + .create(true) + //.append(true) + .truncate(true) + .write(true) + .open(save).await.log_and_forget(lmode, log::Level::Warning)? + { + args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?; + } + } + }, + _ => unreachable!(), + }; Ok(()) }