You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
411 lines
11 KiB
411 lines
11 KiB
#![allow(dead_code)]
|
|
|
|
pub const BUFFER_SIZE: usize = 4096;
|
|
|
|
#[macro_use]
|
|
mod log;
|
|
mod bytes;
|
|
mod ext;
|
|
pub use ext::*;
|
|
mod error;
|
|
mod hash;
|
|
mod container;
|
|
mod config;
|
|
mod arg;
|
|
mod proc;
|
|
|
|
//#[cfg(feature="threads")]
|
|
//mod pipe; //No longer needed
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
use std::{
|
|
path::Path,
|
|
};
|
|
|
|
#[test]
|
|
pub fn args() -> Result<(), arg::Error>
|
|
{
|
|
macro_rules! string_literal {
|
|
($($strings:expr),*) => {
|
|
vec![$(
|
|
format!($strings),
|
|
)*]
|
|
}
|
|
}
|
|
let args = string_literal!["-lsd", "--load-file", "hello", "--load-save", "load-save!", "--", "test-input", "test-input", "test-input-3", "test-input-2"];
|
|
println!("{:?}", arg::parse(args)?);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
pub fn test() -> Result<(), error::Error>
|
|
{
|
|
let mut cont = container::DupeMap::new();
|
|
let mode = config::Mode::default();
|
|
let path = Path::new("test-input");
|
|
|
|
assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir(path, 0, &mut cont, &mode)?);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(feature="threads")]
|
|
pub async fn _test_async() -> Result<(), error::Error>
|
|
{
|
|
use std::sync::Arc;
|
|
use tokio::{
|
|
sync::Mutex,
|
|
};
|
|
let cont = Arc::new(Mutex::new(container::DupeMap::new()));
|
|
let mode = config::Mode::default();
|
|
let path = Path::new("test-input");
|
|
|
|
assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode, None).await?);
|
|
|
|
Ok(())
|
|
}
|
|
#[cfg(feature="threads")]
|
|
#[test]
|
|
pub fn test_async() -> Result<(), error::Error>
|
|
{
|
|
tokio_test::block_on(_test_async())
|
|
}
|
|
}
|
|
|
|
|
|
#[inline]
|
|
fn absolute(path: impl AsRef<std::path::Path>) -> std::path::PathBuf
|
|
{
|
|
std::fs::canonicalize(path).expect("Invalid path internal")
|
|
}
|
|
|
|
#[cfg(feature="threads")]
|
|
async fn rebase_one_async(path: impl AsRef<std::path::Path>, hash: hash::Sha256Hash, semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>>) -> Result<Option<(std::path::PathBuf, hash::Sha256Hash)>, error::Error>
|
|
{
|
|
use std::{
|
|
convert::TryInto,
|
|
};
|
|
use tokio::{
|
|
fs::{
|
|
OpenOptions,
|
|
},
|
|
};
|
|
let path = path.as_ref();
|
|
let _lock = match semaphore {
|
|
Some(sem) => Some(sem.acquire_owned().await),
|
|
None => None,
|
|
};
|
|
let mut file = OpenOptions::new()
|
|
.read(true)
|
|
.open(path).await?;
|
|
let sz: usize = file.metadata().await?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?;
|
|
let mut result = hash::Sha256Hash::default();
|
|
error::check_size(sz, hash::compute_async(&mut file, &mut result).await?)?;
|
|
|
|
println!("Computed {:?}", path);
|
|
if hash != result {
|
|
Ok(Some((path.to_owned(), result)))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
#[cfg(feature="threads")]
|
|
async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>>
|
|
{
|
|
use std::{
|
|
path::{
|
|
Path,
|
|
},
|
|
sync::Arc,
|
|
};
|
|
use tokio::{
|
|
fs::{
|
|
OpenOptions,
|
|
},
|
|
sync::Semaphore,
|
|
};
|
|
let mut hashes = container::DupeMap::new();
|
|
for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x)))
|
|
{
|
|
let load = Path::new(load);
|
|
if load.exists() {
|
|
if load.is_file() {
|
|
if let Ok(mut file) = OpenOptions::new()
|
|
.read(true)
|
|
.open(load).await
|
|
{
|
|
match hashes.load_async(&mut file, transient).await {
|
|
Err(e) if !transient=> return Err(format!("Failed to load required {:?}: {}", file, e))?,
|
|
_ => (),
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut remove = Vec::new();
|
|
let mut children = Vec::with_capacity(hashes.cache_len());
|
|
let semaphore = config.max_threads.map(|num| Arc::new(Semaphore::new(num.into())));
|
|
for (path, (hash, trans)) in hashes.cache_iter()
|
|
{
|
|
if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno.
|
|
if path.exists() && path.is_file() {
|
|
//Getting hash
|
|
let path = path.clone();
|
|
let hash = *hash;
|
|
let semaphore = semaphore.as_ref().map(|semaphore| Arc::clone(semaphore));
|
|
children.push(tokio::task::spawn(async move {
|
|
rebase_one_async(path, hash, semaphore).await
|
|
}));
|
|
} else {
|
|
remove.push(path.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
let (mut changed, mut removed) = (0usize, 0usize);
|
|
for child in children.into_iter()
|
|
{
|
|
if let Some((path, hash)) = child.await.expect("Child panic")?
|
|
{
|
|
println!("Updating {:?} -> {}", path, hash);
|
|
hashes.cache_force(path, hash, false);
|
|
changed +=1;
|
|
}
|
|
}
|
|
for remove in remove.into_iter()
|
|
{
|
|
println!("Removing {:?}", remove);
|
|
hashes.uncache(remove);
|
|
removed +=1;
|
|
}
|
|
|
|
println!("Updated. {} changed, {} removed.", changed, removed);
|
|
|
|
// Save the hashes set in concurrently to each file.
|
|
let hashes = Arc::new(hashes);
|
|
futures::future::join_all(config.save.iter().map(|save| {
|
|
let hashes = hashes.clone();
|
|
let save = save.to_owned();
|
|
tokio::task::spawn(async move {
|
|
let save = Path::new(&save);
|
|
let mut file = match OpenOptions::new()
|
|
.create(true)
|
|
.truncate(true)
|
|
.write(true)
|
|
.open(&save).await {
|
|
Ok(v) => v,
|
|
Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); return;},
|
|
};
|
|
|
|
match hashes.save_async(&mut file).await {
|
|
Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e),
|
|
_ => (),
|
|
};
|
|
match file.sync_data().await {
|
|
Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e),
|
|
_ => (),
|
|
}
|
|
})
|
|
})).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(not(feature="threads"))]
|
|
fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>>
|
|
{
|
|
todo!()
|
|
}
|
|
|
|
|
|
fn parse_args() -> Result<arg::Output, error::Error>
|
|
{
|
|
match arg::parse_args()? {
|
|
arg::Output::Help => arg::usage(),
|
|
conf => Ok(conf),
|
|
}
|
|
}
|
|
|
|
#[cfg_attr(feature="threads", tokio::main)]
|
|
#[cfg(feature="threads")]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>>
|
|
{
|
|
use tokio::{
|
|
fs::{
|
|
OpenOptions,
|
|
},
|
|
sync::{
|
|
Mutex
|
|
},
|
|
};
|
|
use std::{
|
|
path::Path,
|
|
sync::Arc,
|
|
};
|
|
|
|
match parse_args().into_string()? {
|
|
arg::Output::Rebase(r) => {
|
|
return rebase(r).await;
|
|
},
|
|
arg::Output::Normal(args) => {
|
|
let lmode = &args.mode.logging_mode;
|
|
|
|
log!(Debug, lmode => "Args parsed: {:?}", args);
|
|
|
|
let mut children = Vec::new();
|
|
|
|
let mut hashes = container::DupeMap::new();
|
|
|
|
// Load hashes
|
|
for (transient, load) in args.load.iter().map(|x| (false, x)).chain(args.save.iter().map(|x| (true, x)))
|
|
{
|
|
let load = Path::new(load);
|
|
if load.exists() {
|
|
if load.is_file() {
|
|
if let Some(mut file) = OpenOptions::new()
|
|
.read(true)
|
|
.open(load).await.log_and_forget(lmode, log::Level::Warning)?
|
|
{
|
|
log!(Info, lmode => "Hashes loading from {:?}", load);
|
|
args.mode.error_mode.handle(hashes.load_async(&mut file, transient).await).log_and_forget(lmode, if transient {log::Level::Info} else {log::Level::Warning})?;
|
|
}
|
|
} else {
|
|
log!(Warning, lmode => "Exclusing directory from load path {:?}", load);
|
|
}
|
|
} else {
|
|
log!(Info, lmode => "Ignoring non-existant load path {:?}", load);
|
|
}
|
|
}
|
|
|
|
log!(Debug, lmode => "Loaded hashes: {}", hashes);
|
|
log!(Info, lmode => "Starting checks (threaded)");
|
|
let hashes = Arc::new(Mutex::new(hashes));
|
|
let semaphore = args.max_threads.map(|num| Arc::new(tokio::sync::Semaphore::new(num.into())));
|
|
for path in args.paths.iter()
|
|
{
|
|
let path = Path::new(path);
|
|
if path.is_dir() {
|
|
log!(Debug, lmode => "Spawning for {:?}", path);
|
|
let mode = args.mode.clone();
|
|
let path = absolute(&path);
|
|
let hashes= Arc::clone(&hashes);
|
|
let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem));
|
|
children.push(tokio::task::spawn(async move {
|
|
log!(Debug, mode.logging_mode => " + {:?}", path);
|
|
let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone(), semaphore).await).log_and_forget(&mode.logging_mode, log::Level::Error);
|
|
log!(Info, mode.logging_mode => " - {:?}", path);
|
|
res
|
|
}));
|
|
}
|
|
}
|
|
log!(Info, lmode => "Waiting on children");
|
|
let mut done = proc::DupeCount::default();
|
|
for child in children.into_iter()
|
|
{
|
|
done += args.mode.error_mode.handle(child.await?)?.unwrap_or_default().unwrap_or_default().unwrap_or_default();
|
|
}
|
|
log!(Info, lmode => "Found: {:?}", done);
|
|
|
|
let hashes = hashes.lock().await;
|
|
log!(Debug, lmode => "New hashes: {}", hashes);
|
|
|
|
for save in args.save.iter()
|
|
{
|
|
let save = Path::new(save);
|
|
log!(Info, lmode => "Saving hashes to {:?}", save);
|
|
if let Some(mut file) = OpenOptions::new()
|
|
.create(true)
|
|
//.append(true)
|
|
.truncate(true)
|
|
.write(true)
|
|
.open(save).await.log_and_forget(lmode, log::Level::Warning)?
|
|
{
|
|
args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?;
|
|
use tokio::prelude::*;
|
|
file.shutdown().await.log_and_forget(lmode, log::Level::Warning)?; //One of these is probably redundant.
|
|
file.sync_data().await.log_and_forget(lmode, log::Level::Warning)?;
|
|
}
|
|
}
|
|
},
|
|
_ => unreachable!(),
|
|
};
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(not(feature="threads"))]
|
|
fn main() -> Result<(), Box<dyn std::error::Error>>
|
|
{
|
|
use std::{
|
|
path::Path,
|
|
fs::{
|
|
OpenOptions,
|
|
},
|
|
};
|
|
let args = parse_args().into_string()?;
|
|
let lmode = &args.mode.logging_mode;
|
|
|
|
log!(Debug, lmode => "Args parsed: {:?}", args);
|
|
|
|
let mut hashes = container::DupeMap::new();
|
|
|
|
// Load hashes
|
|
for load in args.load.iter()
|
|
{
|
|
let load = Path::new(load);
|
|
if load.exists() {
|
|
if load.is_file() {
|
|
if let Some(mut file) = OpenOptions::new()
|
|
.read(true)
|
|
.open(load).log_and_forget(lmode, log::Level::Warning)?
|
|
{
|
|
log!(Info, lmode => "Hashes loading from {:?}", load);
|
|
args.mode.error_mode.handle(hashes.load(&mut file)).log_and_forget(lmode, log::Level::Warning)?;
|
|
}
|
|
} else {
|
|
log!(Warning, lmode => "Exclusing directory from load path {:?}", load);
|
|
}
|
|
} else {
|
|
log!(Info, lmode => "Ignoring non-existant load path {:?}", load);
|
|
}
|
|
}
|
|
|
|
log!(Debug, lmode => "Loaded hashes: {:?}", hashes);
|
|
log!(Info, lmode => "Starting checks (threaded)");
|
|
|
|
let mut done = proc::DupeCount::default();
|
|
for path in args.paths.iter()
|
|
{
|
|
let path = Path::new(path);
|
|
if path.is_dir() {
|
|
log!(Debug, lmode => " + {:?}", path);
|
|
done += args.mode.error_mode.handle(proc::do_dir(path.clone(), 0, &mut hashes, &args.mode)).log_and_forget(lmode, log::Level::Error)?.unwrap_or_default().unwrap_or_default();
|
|
log!(Info, lmode => " - {:?}", path);
|
|
}
|
|
}
|
|
|
|
log!(Info, lmode => "Found: {:?}", done);
|
|
log!(Debug, lmode => "New hashes: {}", hashes);
|
|
|
|
for save in args.save.iter()
|
|
{
|
|
let save = Path::new(save);
|
|
log!(Info, lmode => "Saving hashes to {:?}", save);
|
|
if let Some(mut file) = OpenOptions::new()
|
|
.create(true)
|
|
//.append(true)
|
|
.truncate(true)
|
|
.write(true)
|
|
.open(save).log_and_forget(lmode, log::Level::Warning)?
|
|
{
|
|
args.mode.error_mode.handle(hashes.save(&mut file)).log_and_forget(lmode, log::Level::Warning)?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|