You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
11 KiB

#![allow(dead_code)]
pub const BUFFER_SIZE: usize = 4096;
#[macro_use]
mod log;
mod bytes;
mod ext;
pub use ext::*;
mod error;
mod hash;
mod container;
mod config;
mod arg;
mod proc;
//#[cfg(feature="threads")]
//mod pipe; //No longer needed
#[cfg(test)]
mod test {
use super::*;
use std::{
path::Path,
};
#[test]
pub fn args() -> Result<(), arg::Error>
{
macro_rules! string_literal {
($($strings:expr),*) => {
vec![$(
format!($strings),
)*]
}
}
let args = string_literal!["-lsd", "--load-file", "hello", "--load-save", "load-save!", "--", "test-input", "test-input", "test-input-3", "test-input-2"];
println!("{:?}", arg::parse(args)?);
Ok(())
}
#[test]
pub fn test() -> Result<(), error::Error>
{
let mut cont = container::DupeMap::new();
let mode = config::Mode::default();
let path = Path::new("test-input");
assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir(path, 0, &mut cont, &mode)?);
Ok(())
}
#[cfg(feature="threads")]
pub async fn _test_async() -> Result<(), error::Error>
{
use std::sync::Arc;
use tokio::{
sync::Mutex,
};
let cont = Arc::new(Mutex::new(container::DupeMap::new()));
let mode = config::Mode::default();
let path = Path::new("test-input");
assert_eq!(proc::DupeCount{total:4, dupes:2}, proc::do_dir_async(path, 0, cont, mode, None).await?);
Ok(())
}
#[cfg(feature="threads")]
#[test]
pub fn test_async() -> Result<(), error::Error>
{
tokio_test::block_on(_test_async())
}
}
#[inline]
fn absolute(path: impl AsRef<std::path::Path>) -> std::path::PathBuf
{
std::fs::canonicalize(path).expect("Invalid path internal")
}
#[cfg(feature="threads")]
async fn rebase_one_async(path: impl AsRef<std::path::Path>, hash: hash::Sha256Hash, semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>>) -> Result<Option<(std::path::PathBuf, hash::Sha256Hash)>, error::Error>
{
use std::{
convert::TryInto,
};
use tokio::{
fs::{
OpenOptions,
},
};
let path = path.as_ref();
let _lock = match semaphore {
Some(sem) => Some(sem.acquire_owned().await),
None => None,
};
let mut file = OpenOptions::new()
.read(true)
.open(path).await?;
let sz: usize = file.metadata().await?.len().try_into().or(Err(error::Error::Arch(Some("Filesize is too large to be known. you have likely compiled the binary for 32-bit architecture or less. This shouldn't happen on 64-bit systems."))))?;
let mut result = hash::Sha256Hash::default();
error::check_size(sz, hash::compute_async(&mut file, &mut result).await?)?;
println!("Computed {:?}", path);
if hash != result {
Ok(Some((path.to_owned(), result)))
} else {
Ok(None)
}
}
#[cfg(feature="threads")]
async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>>
{
use std::{
path::{
Path,
},
sync::Arc,
};
use tokio::{
fs::{
OpenOptions,
},
sync::Semaphore,
};
let mut hashes = container::DupeMap::new();
for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x)))
{
let load = Path::new(load);
if load.exists() {
if load.is_file() {
if let Ok(mut file) = OpenOptions::new()
.read(true)
.open(load).await
{
match hashes.load_async(&mut file, transient).await {
Err(e) if !transient=> return Err(format!("Failed to load required {:?}: {}", file, e))?,
_ => (),
};
}
}
}
}
let mut remove = Vec::new();
let mut children = Vec::with_capacity(hashes.cache_len());
let semaphore = config.max_threads.map(|num| Arc::new(Semaphore::new(num.into())));
for (path, (hash, trans)) in hashes.cache_iter()
{
if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno.
if path.exists() && path.is_file() {
//Getting hash
let path = path.clone();
let hash = *hash;
let semaphore = semaphore.as_ref().map(|semaphore| Arc::clone(semaphore));
children.push(tokio::task::spawn(async move {
rebase_one_async(path, hash, semaphore).await
}));
} else {
remove.push(path.clone());
}
}
}
let (mut changed, mut removed) = (0usize, 0usize);
for child in children.into_iter()
{
if let Some((path, hash)) = child.await.expect("Child panic")?
{
println!("Updating {:?} -> {}", path, hash);
hashes.cache_force(path, hash, false);
changed +=1;
}
}
for remove in remove.into_iter()
{
println!("Removing {:?}", remove);
hashes.uncache(remove);
removed +=1;
}
println!("Updated. {} changed, {} removed.", changed, removed);
// Save the hashes set in concurrently to each file.
let hashes = Arc::new(hashes);
futures::future::join_all(config.save.iter().map(|save| {
let hashes = hashes.clone();
let save = save.to_owned();
tokio::task::spawn(async move {
let save = Path::new(&save);
let mut file = match OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&save).await {
Ok(v) => v,
Err(e) => {println!("Warning: Failed to open output {:?}, ignoring: {}", save, e); return;},
};
match hashes.save_async(&mut file).await {
Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e),
_ => (),
};
match file.sync_data().await {
Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e),
_ => (),
}
})
})).await;
Ok(())
}
#[cfg(not(feature="threads"))]
fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>>
{
todo!()
}
fn parse_args() -> Result<arg::Output, error::Error>
{
match arg::parse_args()? {
arg::Output::Help => arg::usage(),
conf => Ok(conf),
}
}
#[cfg_attr(feature="threads", tokio::main)]
#[cfg(feature="threads")]
async fn main() -> Result<(), Box<dyn std::error::Error>>
{
use tokio::{
fs::{
OpenOptions,
},
sync::{
Mutex
},
};
use std::{
path::Path,
sync::Arc,
};
match parse_args().into_string()? {
arg::Output::Rebase(r) => {
return rebase(r).await;
},
arg::Output::Normal(args) => {
let lmode = &args.mode.logging_mode;
log!(Debug, lmode => "Args parsed: {:?}", args);
let mut children = Vec::new();
let mut hashes = container::DupeMap::new();
// Load hashes
for (transient, load) in args.load.iter().map(|x| (false, x)).chain(args.save.iter().map(|x| (true, x)))
{
let load = Path::new(load);
if load.exists() {
if load.is_file() {
if let Some(mut file) = OpenOptions::new()
.read(true)
.open(load).await.log_and_forget(lmode, log::Level::Warning)?
{
log!(Info, lmode => "Hashes loading from {:?}", load);
args.mode.error_mode.handle(hashes.load_async(&mut file, transient).await).log_and_forget(lmode, if transient {log::Level::Info} else {log::Level::Warning})?;
}
} else {
log!(Warning, lmode => "Exclusing directory from load path {:?}", load);
}
} else {
log!(Info, lmode => "Ignoring non-existant load path {:?}", load);
}
}
log!(Debug, lmode => "Loaded hashes: {}", hashes);
log!(Info, lmode => "Starting checks (threaded)");
let hashes = Arc::new(Mutex::new(hashes));
let semaphore = args.max_threads.map(|num| Arc::new(tokio::sync::Semaphore::new(num.into())));
for path in args.paths.iter()
{
let path = Path::new(path);
if path.is_dir() {
log!(Debug, lmode => "Spawning for {:?}", path);
let mode = args.mode.clone();
let path = absolute(&path);
let hashes= Arc::clone(&hashes);
let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem));
children.push(tokio::task::spawn(async move {
log!(Debug, mode.logging_mode => " + {:?}", path);
let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone(), semaphore).await).log_and_forget(&mode.logging_mode, log::Level::Error);
log!(Info, mode.logging_mode => " - {:?}", path);
res
}));
}
}
log!(Info, lmode => "Waiting on children");
let mut done = proc::DupeCount::default();
for child in children.into_iter()
{
done += args.mode.error_mode.handle(child.await?)?.unwrap_or_default().unwrap_or_default().unwrap_or_default();
}
log!(Info, lmode => "Found: {:?}", done);
let hashes = hashes.lock().await;
log!(Debug, lmode => "New hashes: {}", hashes);
for save in args.save.iter()
{
let save = Path::new(save);
log!(Info, lmode => "Saving hashes to {:?}", save);
if let Some(mut file) = OpenOptions::new()
.create(true)
//.append(true)
.truncate(true)
.write(true)
.open(save).await.log_and_forget(lmode, log::Level::Warning)?
{
args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?;
use tokio::prelude::*;
file.shutdown().await.log_and_forget(lmode, log::Level::Warning)?; //One of these is probably redundant.
file.sync_data().await.log_and_forget(lmode, log::Level::Warning)?;
}
}
},
_ => unreachable!(),
};
Ok(())
}
#[cfg(not(feature="threads"))]
fn main() -> Result<(), Box<dyn std::error::Error>>
{
use std::{
path::Path,
fs::{
OpenOptions,
},
};
let args = parse_args().into_string()?;
let lmode = &args.mode.logging_mode;
log!(Debug, lmode => "Args parsed: {:?}", args);
let mut hashes = container::DupeMap::new();
// Load hashes
for load in args.load.iter()
{
let load = Path::new(load);
if load.exists() {
if load.is_file() {
if let Some(mut file) = OpenOptions::new()
.read(true)
.open(load).log_and_forget(lmode, log::Level::Warning)?
{
log!(Info, lmode => "Hashes loading from {:?}", load);
args.mode.error_mode.handle(hashes.load(&mut file)).log_and_forget(lmode, log::Level::Warning)?;
}
} else {
log!(Warning, lmode => "Exclusing directory from load path {:?}", load);
}
} else {
log!(Info, lmode => "Ignoring non-existant load path {:?}", load);
}
}
log!(Debug, lmode => "Loaded hashes: {:?}", hashes);
log!(Info, lmode => "Starting checks (threaded)");
let mut done = proc::DupeCount::default();
for path in args.paths.iter()
{
let path = Path::new(path);
if path.is_dir() {
log!(Debug, lmode => " + {:?}", path);
done += args.mode.error_mode.handle(proc::do_dir(path.clone(), 0, &mut hashes, &args.mode)).log_and_forget(lmode, log::Level::Error)?.unwrap_or_default().unwrap_or_default();
log!(Info, lmode => " - {:?}", path);
}
}
log!(Info, lmode => "Found: {:?}", done);
log!(Debug, lmode => "New hashes: {}", hashes);
for save in args.save.iter()
{
let save = Path::new(save);
log!(Info, lmode => "Saving hashes to {:?}", save);
if let Some(mut file) = OpenOptions::new()
.create(true)
//.append(true)
.truncate(true)
.write(true)
.open(save).log_and_forget(lmode, log::Level::Warning)?
{
args.mode.error_mode.handle(hashes.save(&mut file)).log_and_forget(lmode, log::Level::Warning)?;
}
}
Ok(())
}