use super::*; use std::{ path::{ Path }, fs::{ self, OpenOptions, }, convert::{ TryInto, }, ops::{ self, }, }; /// Handle a detected dupe fn handle_dupe

(path: P, mode: &config::Mode) -> Result<(), error::Error> where P: AsRef { log!(Info, mode.logging_mode => " -> {:?}", path.as_ref()); match mode.operation_mode { config::OperationMode::Delete => { mode.error_mode.handle(std::fs::remove_file(path.as_ref()))?; }, _ => (), } Ok(()) } /// Handle a detected dupe async #[inline(always)] #[cfg(feature="threads")] async fn handle_dupe_async

(path: P, mode: &config::Mode) -> Result<(), error::Error> where P: AsRef { log!(Info, mode.logging_mode => " -> {:?}", path.as_ref()); match mode.operation_mode { config::OperationMode::Delete => { mode.error_mode.handle(tokio::fs::remove_file(path.as_ref()).await)?; }, _ => (), } Ok(()) } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct DupeCount { pub total: usize, pub dupes: usize, } impl From for DupeCount { fn from(b: bool) -> Self { Self{ total: 1, dupes: if b {0} else {1}, } } } impl ops::Add for DupeCount { type Output = Self; fn add(self, other: Self) -> Self { Self { total: self.total + other.total, dupes: self.dupes + other.dupes, } } } impl ops::AddAssign for DupeCount { fn add_assign(&mut self, other: Self) { *self = Self { total: self.total + other.total, dupes: self.dupes + other.dupes, }; } } impl Default for DupeCount { fn default() -> Self { Self{total:0, dupes:0} } } /// Process a file and add it to the table, returns true if is not a dupe. pub fn process_file>(file: P, set: &mut container::DupeMap) -> Result { let mut file = OpenOptions::new() .read(true) .open(file)?; let sz: usize = file.metadata()?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?; let mut result = hash::Sha256Hash::default(); error::check_size(sz, hash::compute(&mut file, &mut result)?)?; Ok(set.try_add(result)) } /// Process a file and add it to the table, returns true if is not a dupe. #[cfg(feature="threads")] pub async fn process_file_async>(file: P, set: &std::sync::Arc>) -> Result { use tokio::{ fs::{ OpenOptions, }, }; let mut file = OpenOptions::new() .read(true) .open(file).await?; let sz: usize = file.metadata().await?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?; let mut result = hash::Sha256Hash::default(); error::check_size(sz, hash::compute_async(&mut file, &mut result).await?)?; let mut set = set.lock().await; Ok(set.try_add(result)) } /// Walk a dir structure and remove all dupes in it pub fn do_dir>(dir: P, depth: usize, set: &mut container::DupeMap, mode: &config::Mode) -> Result { let recurse = match mode.recursion_mode { config::RecursionMode::N(n) if n > depth => true, config::RecursionMode::All => true, _ => false, }; let cmode = mode; let mode = &mode.error_mode; let mut count = DupeCount::default(); for obj in fs::read_dir(dir.as_ref())? //always return error if this fails { if let Some(obj) = mode.handle(obj)? { // Each one is allowed to fail if `mode` says so let obj = obj.path(); if obj.is_dir() && recurse { count += mode.handle(do_dir(obj, depth+1, set, cmode))?.unwrap_or_default(); } else { count += if mode.handle(process_file(&obj, set))?.unwrap_or_default() { log!(Info, cmode.logging_mode => "OK {:?}", obj); DupeCount{total: 1, dupes: 0} } else { mode.handle(handle_dupe(obj, &cmode))?; DupeCount{total: 1, dupes: 1} }; } } } Ok(count) } /// Walk a dir structure and remove all dupes in it #[cfg(feature="threads")] pub fn do_dir_async + std::marker::Send + std::marker::Sync + 'static>(dir: P, depth: usize, set: std::sync::Arc>, mode: config::Mode) -> futures::future::BoxFuture<'static, Result> { use std::sync::Arc; use futures::future::{ FutureExt }; async move { let recurse = match mode.recursion_mode { config::RecursionMode::N(n) if n > depth => true, config::RecursionMode::All => true, _ => false, }; let cmode = mode; let mode = &cmode.error_mode; let mut children = Vec::new(); let mut workers = Vec::new(); let mut dir = tokio::fs::read_dir(dir.as_ref()).await?; //always return error if this fails while let Some(Some(obj)) = mode.handle(dir.next_entry().await)? { let obj = obj.path(); if obj.is_dir() && recurse { let set = Arc::clone(&set); let cmode = cmode.clone(); let mode = mode.clone(); children.push(tokio::task::spawn(async move { log!(Info, cmode.logging_mode => "OK {:?}", obj); match mode.handle(do_dir_async(obj, depth+1, set, cmode).await) { Ok(v) => Ok(v.unwrap_or_default()), Err(v) => Err(v), } })); } else { let set = Arc::clone(&set); let mode = mode.clone(); let cmode = cmode.clone(); workers.push(tokio::task::spawn(async move { match mode.handle(process_file_async(&obj, &set).await) { Ok(v) => { if v.unwrap_or_default() { log!(Info, cmode.logging_mode => "OK {:?}", obj); Ok(true) } else { if let Err(e) = mode.handle(handle_dupe_async(obj, &cmode).await) { Err(e) } else { Ok(false) } } }, Err(v) => Err(v), } })); } } async fn wait_on>>, U: Default+Into>(children: T, mode: &error::Mode) -> Result { let mut count = DupeCount::default(); for child in children.into_iter() { count += mode.handle(error::internal(child.await)? /* thread panicked */)?.unwrap_or_default().into(); } Ok(count) } // Wait for all children to complete before error checking. let er1 = wait_on(workers, &mode).await; let er2 = wait_on(children, &mode).await; Ok(er1? + er2?) }.boxed() }