#![allow(dead_code)] pub const BUFFER_SIZE: usize = 4096; #[macro_use] mod log; mod bytes; mod ext; pub use ext::*; mod error; mod hash; mod container; mod config; mod arg; mod proc; //#[cfg(feature="threads")] //mod pipe; //No longer needed #[cfg(test)] mod test { use super::*; use std::{ path::Path, }; #[test] pub fn args() -> Result<(), arg::Error> { macro_rules! string_literal { ($($strings:expr),*) => { vec![$( format!($strings), )*] } } let args = string_literal!["-lsd", "--load-file", "hello", "--load-save", "load-save!", "--", "test-input", "test-input", "test-input-3", "test-input-2"]; println!("{:?}", arg::parse(args)?); Ok(()) } #[test] pub fn test() -> Result<(), error::Error> { let mut cont = container::DupeMap::new(); let mode = config::Mode::default(); let path = Path::new("test-input"); assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir(path, 0, &mut cont, &mode)?); Ok(()) } #[cfg(feature="threads")] pub async fn _test_async() -> Result<(), error::Error> { use std::sync::Arc; use tokio::{ sync::Mutex, }; let cont = Arc::new(Mutex::new(container::DupeMap::new())); let mode = config::Mode::default(); let path = Path::new("test-input"); assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode, None).await?); Ok(()) } #[cfg(feature="threads")] #[test] pub fn test_async() -> Result<(), error::Error> { tokio_test::block_on(_test_async()) } } #[inline] fn absolute(path: impl AsRef) -> std::path::PathBuf { std::fs::canonicalize(path).expect("Invalid path internal") } #[cfg(feature="threads")] async fn rebase_one_async(path: impl AsRef, hash: hash::Sha256Hash, semaphore: Option>) -> Result, error::Error> { use std::{ convert::TryInto, }; use tokio::{ fs::{ OpenOptions, }, }; let path = path.as_ref(); let _lock = match semaphore { Some(sem) => Some(sem.acquire_owned().await), None => None, }; let mut file = OpenOptions::new() .read(true) .open(path).await?; let sz: usize = file.metadata().await?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?; let mut result = hash::Sha256Hash::default(); error::check_size(sz, hash::compute_async(&mut file, &mut result).await?)?; println!("Computed {:?}", path); if hash != result { Ok(Some((path.to_owned(), result))) } else { Ok(None) } } #[cfg(feature="threads")] async fn rebase(config: config::Rebase) -> Result<(), Box> { use std::{ path::{ Path, }, sync::Arc, }; use tokio::{ fs::{ OpenOptions, }, sync::Semaphore, }; let mut hashes = container::DupeMap::new(); for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x))) { let load = Path::new(load); if load.exists() { if load.is_file() { if let Ok(mut file) = OpenOptions::new() .read(true) .open(load).await { match hashes.load_async(&mut file, transient).await { Err(e) if !transient=> return Err(format!("Failed to load required {:?}: {}", file, e))?, _ => (), }; } } } } let mut remove = Vec::new(); let mut children = Vec::with_capacity(hashes.cache_len()); let semaphore = config.max_threads.map(|num| Arc::new(Semaphore::new(num.into()))); for (path, (hash, trans)) in hashes.cache_iter() { if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno. if path.exists() && path.is_file() { //Getting hash let path = path.clone(); let hash = *hash; let semaphore = semaphore.as_ref().map(|semaphore| Arc::clone(semaphore)); children.push(tokio::task::spawn(async move { rebase_one_async(path, hash, semaphore).await })); } else { remove.push(path.clone()); } } } let (mut changed, mut removed) = (0usize, 0usize); for child in children.into_iter() { if let Some((path, hash)) = child.await.expect("Child panic")? { println!("Updating {:?} -> {}", path, hash); hashes.cache_force(path, hash, false); changed +=1; } } for remove in remove.into_iter() { println!("Removing {:?}", remove); hashes.uncache(remove); removed +=1; } println!("Updated. {} changed, {} removed.", changed, removed); // Save the hashes set in concurrently to each file. let hashes = Arc::new(hashes); futures::future::join_all(config.save.iter().map(|save| { let hashes = hashes.clone(); let save = save.to_owned(); tokio::task::spawn(async move { let save = Path::new(&save); let mut file = match OpenOptions::new() .create(true) .truncate(true) .write(true) .open(&save).await { Ok(v) => v, Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); return;}, }; match hashes.save_async(&mut file).await { Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e), _ => (), }; match file.sync_data().await { Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e), _ => (), } }) })).await; Ok(()) } #[cfg(not(feature="threads"))] fn rebase(config: config::Rebase) -> Result<(), Box> { todo!() } fn parse_args() -> Result { match arg::parse_args()? { arg::Output::Help => arg::usage(), conf => Ok(conf), } } #[cfg_attr(feature="threads", tokio::main)] #[cfg(feature="threads")] async fn main() -> Result<(), Box> { use tokio::{ fs::{ OpenOptions, }, sync::{ Mutex }, }; use std::{ path::Path, sync::Arc, }; match parse_args().into_string()? { arg::Output::Rebase(r) => { return rebase(r).await; }, arg::Output::Normal(args) => { let lmode = &args.mode.logging_mode; log!(Debug, lmode => "Args parsed: {:?}", args); let mut children = Vec::new(); let mut hashes = container::DupeMap::new(); // Load hashes for (transient, load) in args.load.iter().map(|x| (false, x)).chain(args.save.iter().map(|x| (true, x))) { let load = Path::new(load); if load.exists() { if load.is_file() { if let Some(mut file) = OpenOptions::new() .read(true) .open(load).await.log_and_forget(lmode, log::Level::Warning)? { log!(Info, lmode => "Hashes loading from {:?}", load); args.mode.error_mode.handle(hashes.load_async(&mut file, transient).await).log_and_forget(lmode, if transient {log::Level::Info} else {log::Level::Warning})?; } } else { log!(Warning, lmode => "Exclusing directory from load path {:?}", load); } } else { log!(Info, lmode => "Ignoring non-existant load path {:?}", load); } } log!(Debug, lmode => "Loaded hashes: {}", hashes); log!(Info, lmode => "Starting checks (threaded)"); let hashes = Arc::new(Mutex::new(hashes)); let semaphore = args.max_threads.map(|num| Arc::new(tokio::sync::Semaphore::new(num.into()))); for path in args.paths.iter() { let path = Path::new(path); if path.is_dir() { log!(Debug, lmode => "Spawning for {:?}", path); let mode = args.mode.clone(); let path = absolute(&path); let hashes= Arc::clone(&hashes); let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem)); children.push(tokio::task::spawn(async move { log!(Debug, mode.logging_mode => " + {:?}", path); let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone(), semaphore).await).log_and_forget(&mode.logging_mode, log::Level::Error); log!(Info, mode.logging_mode => " - {:?}", path); res })); } } log!(Info, lmode => "Waiting on children"); let mut done = proc::DupeCount::default(); for child in children.into_iter() { done += args.mode.error_mode.handle(child.await?)?.unwrap_or_default().unwrap_or_default().unwrap_or_default(); } log!(Info, lmode => "Found: {:?}", done); let hashes = hashes.lock().await; log!(Debug, lmode => "New hashes: {}", hashes); for save in args.save.iter() { let save = Path::new(save); log!(Info, lmode => "Saving hashes to {:?}", save); if let Some(mut file) = OpenOptions::new() .create(true) //.append(true) .truncate(true) .write(true) .open(save).await.log_and_forget(lmode, log::Level::Warning)? { args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?; use tokio::prelude::*; file.shutdown().await.log_and_forget(lmode, log::Level::Warning)?; //One of these is probably redundant. file.sync_data().await.log_and_forget(lmode, log::Level::Warning)?; } } }, _ => unreachable!(), }; Ok(()) } #[cfg(not(feature="threads"))] fn main() -> Result<(), Box> { use std::{ path::Path, fs::{ OpenOptions, }, }; let args = parse_args().into_string()?; let lmode = &args.mode.logging_mode; log!(Debug, lmode => "Args parsed: {:?}", args); let mut hashes = container::DupeMap::new(); // Load hashes for load in args.load.iter() { let load = Path::new(load); if load.exists() { if load.is_file() { if let Some(mut file) = OpenOptions::new() .read(true) .open(load).log_and_forget(lmode, log::Level::Warning)? { log!(Info, lmode => "Hashes loading from {:?}", load); args.mode.error_mode.handle(hashes.load(&mut file)).log_and_forget(lmode, log::Level::Warning)?; } } else { log!(Warning, lmode => "Exclusing directory from load path {:?}", load); } } else { log!(Info, lmode => "Ignoring non-existant load path {:?}", load); } } log!(Debug, lmode => "Loaded hashes: {:?}", hashes); log!(Info, lmode => "Starting checks (threaded)"); let mut done = proc::DupeCount::default(); for path in args.paths.iter() { let path = Path::new(path); if path.is_dir() { log!(Debug, lmode => " + {:?}", path); done += args.mode.error_mode.handle(proc::do_dir(path.clone(), 0, &mut hashes, &args.mode)).log_and_forget(lmode, log::Level::Error)?.unwrap_or_default().unwrap_or_default(); log!(Info, lmode => " - {:?}", path); } } log!(Info, lmode => "Found: {:?}", done); log!(Debug, lmode => "New hashes: {}", hashes); for save in args.save.iter() { let save = Path::new(save); log!(Info, lmode => "Saving hashes to {:?}", save); if let Some(mut file) = OpenOptions::new() .create(true) //.append(true) .truncate(true) .write(true) .open(save).log_and_forget(lmode, log::Level::Warning)? { args.mode.error_mode.handle(hashes.save(&mut file)).log_and_forget(lmode, log::Level::Warning)?; } } Ok(()) }