added max threads

master
Avril 4 years ago
parent 7c7125dcd9
commit 408c214145
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -24,3 +24,4 @@ futures = { version = "0.3", optional = true }
lazy_static = "1.4" lazy_static = "1.4"
chrono = "0.4" chrono = "0.4"
shellexpand = "1.1" shellexpand = "1.1"
# cfg_if = "0.1"

@ -37,6 +37,12 @@ pub fn usage() -> !
println!(" --cancel -w\t\tAlias for `--error-mode CANCEL`"); println!(" --cancel -w\t\tAlias for `--error-mode CANCEL`");
println!(" --error -W\t\tAlias for `--error-mode TERMINATE`"); println!(" --error -W\t\tAlias for `--error-mode TERMINATE`");
println!(" --recurse <max>|inf\tRecursive mode, give max depth or infinite."); println!(" --recurse <max>|inf\tRecursive mode, give max depth or infinite.");
#[cfg(feature="threads")]
println!(" --threads <max>|inf\tMax number of threads to run at once.");
#[cfg(feature="threads")]
println!(" -U\t\tUlimited max threads.");
#[cfg(feature="threads")]
println!(" --sync -S\t\tRun only one file at a time.");
println!(" --\t\t\tStop reading args"); println!(" --\t\t\tStop reading args");
println!("Other:"); println!("Other:");
println!(" --help -h:\t\tPrint this message"); println!(" --help -h:\t\tPrint this message");
@ -120,6 +126,9 @@ where I: IntoIterator<Item=String>
Ok(config::Rebase{ Ok(config::Rebase{
save: files.clone(), //TODO: Seperate save+loads save: files.clone(), //TODO: Seperate save+loads
load: files, load: files,
#[cfg(feature="threads")]
max_threads: config::MAX_THREADS,
}) })
} }
@ -138,6 +147,9 @@ where I: IntoIterator<Item=String>
let mut mode_rec = config::RecursionMode::None; let mut mode_rec = config::RecursionMode::None;
let mut mode_log = log::Mode::Warn; let mut mode_log = log::Mode::Warn;
#[cfg(feature="threads")]
let mut threads = config::MAX_THREADS;
macro_rules! push { macro_rules! push {
($arg:expr) => { ($arg:expr) => {
{ {
@ -167,6 +179,16 @@ where I: IntoIterator<Item=String>
"--" => reading = false, "--" => reading = false,
#[cfg(feature="threads")]
"--threads" if take_one!() => {
if one.to_lowercase().trim() == "inf" {
threads = None;
} else {
threads = Some(one.as_str().parse::<std::num::NonZeroUsize>().or_else(|e| Err(Error::BadNumber(e)))?);
}
},
#[cfg(feature="threads")]
"--sync" => threads = Some(unsafe{std::num::NonZeroUsize::new_unchecked(1)}),
"--load" => { "--load" => {
load.push(validate_path(config::DEFAULT_HASHNAME.to_string(), Ensure::File, false)?.to_owned()); load.push(validate_path(config::DEFAULT_HASHNAME.to_string(), Ensure::File, false)?.to_owned());
}, },
@ -226,6 +248,11 @@ where I: IntoIterator<Item=String>
'h' => return Ok(Output::Help), 'h' => return Ok(Output::Help),
#[cfg(feature="threads")]
'U' => threads = None,
#[cfg(feature="threads")]
'S' => threads = Some(unsafe{std::num::NonZeroUsize::new_unchecked(1)}),
'r' => mode_rec = config::RecursionMode::All, 'r' => mode_rec = config::RecursionMode::All,
_ => return Err(Error::UnknownArgChar(argchar)), _ => return Err(Error::UnknownArgChar(argchar)),
} }
@ -257,6 +284,8 @@ where I: IntoIterator<Item=String>
}, },
save, save,
load, load,
#[cfg(feature="threads")]
max_threads: threads,
})) }))
} }
@ -271,6 +300,7 @@ pub enum Error
ExpectedFile(PathBuf), ExpectedFile(PathBuf),
ExpectedDirectory(PathBuf), ExpectedDirectory(PathBuf),
UnknownErrorMode(String), UnknownErrorMode(String),
BadNumber(std::num::ParseIntError),
Unknown, Unknown,
} }
@ -281,6 +311,7 @@ impl fmt::Display for Error
{ {
write!(f, "failed to parse args: ")?; write!(f, "failed to parse args: ")?;
match self { match self {
Error::BadNumber(num) => write!(f, "{}", num),
Error::NoInput => write!(f, "need at least one input"), Error::NoInput => write!(f, "need at least one input"),
Error::Parse(value, typ) => write!(f, "expected a {}, got `{}'", typ, value), Error::Parse(value, typ) => write!(f, "expected a {}, got `{}'", typ, value),
Error::UnknownArg(arg) => write!(f, "i don't know how to `{}'", arg), Error::UnknownArg(arg) => write!(f, "i don't know how to `{}'", arg),

@ -1,6 +1,9 @@
use super::*; use super::*;
use lazy_static::lazy_static; use lazy_static::lazy_static;
#[cfg(feature="threads")]
use std::num::NonZeroUsize;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RecursionMode pub enum RecursionMode
{ {
@ -53,6 +56,10 @@ fn expand_path(path: impl AsRef<str>) -> String
shellexpand::tilde(path.as_ref()).to_string() shellexpand::tilde(path.as_ref()).to_string()
} }
/// Default max threads, `None` for unlimited.
#[cfg(feature="threads")]
pub const MAX_THREADS: Option<NonZeroUsize> = Some(unsafe{NonZeroUsize::new_unchecked(10)});
#[derive(Debug)] #[derive(Debug)]
pub struct Config pub struct Config
{ {
@ -64,6 +71,9 @@ pub struct Config
pub save: Vec<String>, pub save: Vec<String>,
/// Load hashes from /// Load hashes from
pub load: Vec<String>, pub load: Vec<String>,
/// Max number of threads to spawn
#[cfg(feature="threads")]
pub max_threads: Option<NonZeroUsize>, //TODO: Implement
} }
#[derive(Debug)] #[derive(Debug)]
@ -73,4 +83,7 @@ pub struct Rebase
pub load: Vec<String>, pub load: Vec<String>,
/// Rebase to here /// Rebase to here
pub save: Vec<String>, pub save: Vec<String>,
/// Max number of threads to spawn
#[cfg(feature="threads")]
pub max_threads: Option<NonZeroUsize>, //TODO: Implement
} }

@ -80,7 +80,7 @@ fn absolute(path: impl AsRef<std::path::Path>) -> std::path::PathBuf
} }
#[cfg(feature="threads")] #[cfg(feature="threads")]
async fn rebase_one_async(path: impl AsRef<std::path::Path>, hash: hash::Sha256Hash) -> Result<Option<(std::path::PathBuf, hash::Sha256Hash)>, error::Error> async fn rebase_one_async(path: impl AsRef<std::path::Path>, hash: hash::Sha256Hash, semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>>) -> Result<Option<(std::path::PathBuf, hash::Sha256Hash)>, error::Error>
{ {
use std::{ use std::{
convert::TryInto, convert::TryInto,
@ -91,6 +91,10 @@ async fn rebase_one_async(path: impl AsRef<std::path::Path>, hash: hash::Sha256H
}, },
}; };
let path = path.as_ref(); let path = path.as_ref();
let _lock = match semaphore {
Some(sem) => Some(sem.acquire_owned().await),
None => None,
};
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.read(true) .read(true)
.open(path).await?; .open(path).await?;
@ -113,11 +117,13 @@ async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>
path::{ path::{
Path, Path,
}, },
sync::Arc,
}; };
use tokio::{ use tokio::{
fs::{ fs::{
OpenOptions, OpenOptions,
}, },
sync::Semaphore,
}; };
let mut hashes = container::DupeMap::new(); let mut hashes = container::DupeMap::new();
for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x))) for (transient, load) in config.load.iter().map(|x| (false, x)).chain(config.save.iter().map(|x| (true, x)))
@ -140,6 +146,7 @@ async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>
let mut remove = Vec::new(); let mut remove = Vec::new();
let mut children = Vec::with_capacity(hashes.cache_len()); let mut children = Vec::with_capacity(hashes.cache_len());
let semaphore = config.max_threads.map(|num| Arc::new(Semaphore::new(num.into())));
for (path, (hash, trans)) in hashes.cache_iter() for (path, (hash, trans)) in hashes.cache_iter()
{ {
if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno. if !trans { //Don't rebuild transient ones, this is desired I think? Maybe not... Dunno.
@ -147,8 +154,9 @@ async fn rebase(config: config::Rebase) -> Result<(), Box<dyn std::error::Error>
//Getting hash //Getting hash
let path = path.clone(); let path = path.clone();
let hash = *hash; let hash = *hash;
let semaphore = semaphore.as_ref().map(|semaphore| Arc::clone(semaphore));
children.push(tokio::task::spawn(async move { children.push(tokio::task::spawn(async move {
rebase_one_async(path, hash).await rebase_one_async(path, hash, semaphore).await
})); }));
} else { } else {
remove.push(path.clone()); remove.push(path.clone());
@ -266,6 +274,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>>
log!(Debug, lmode => "Loaded hashes: {}", hashes); log!(Debug, lmode => "Loaded hashes: {}", hashes);
log!(Info, lmode => "Starting checks (threaded)"); log!(Info, lmode => "Starting checks (threaded)");
let hashes = Arc::new(Mutex::new(hashes)); let hashes = Arc::new(Mutex::new(hashes));
let semaphore = args.max_threads.map(|num| Arc::new(tokio::sync::Semaphore::new(num.into())));
for path in args.paths.iter() for path in args.paths.iter()
{ {
let path = Path::new(path); let path = Path::new(path);
@ -274,9 +283,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>>
let mode = args.mode.clone(); let mode = args.mode.clone();
let path = absolute(&path); let path = absolute(&path);
let hashes= Arc::clone(&hashes); let hashes= Arc::clone(&hashes);
let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem));
children.push(tokio::task::spawn(async move { children.push(tokio::task::spawn(async move {
log!(Debug, mode.logging_mode => " + {:?}", path); log!(Debug, mode.logging_mode => " + {:?}", path);
let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone()).await).log_and_forget(&mode.logging_mode, log::Level::Error); let res = mode.error_mode.handle(proc::do_dir_async(path.clone(), 0, hashes, mode.clone(), semaphore).await).log_and_forget(&mode.logging_mode, log::Level::Error);
log!(Info, mode.logging_mode => " - {:?}", path); log!(Info, mode.logging_mode => " - {:?}", path);
res res
})); }));

@ -121,7 +121,7 @@ pub fn process_file<P: AsRef<Path>>(path: P, set: &mut container::DupeMap) -> Re
/// Process a file and add it to the table, returns true if is not a dupe. /// Process a file and add it to the table, returns true if is not a dupe.
#[cfg(feature="threads")] #[cfg(feature="threads")]
pub async fn process_file_async<P: AsRef<Path>>(path: P, set: &std::sync::Arc<tokio::sync::Mutex<container::DupeMap>>) -> Result<bool, error::Error> pub async fn process_file_async<P: AsRef<Path>>(path: P, set: &std::sync::Arc<tokio::sync::Mutex<container::DupeMap>>, sem: Option<std::sync::Arc<tokio::sync::Semaphore>>) -> Result<bool, error::Error>
{ {
use tokio::{ use tokio::{
fs::{ fs::{
@ -136,6 +136,10 @@ pub async fn process_file_async<P: AsRef<Path>>(path: P, set: &std::sync::Arc<to
let mut set = set.lock().await; let mut set = set.lock().await;
Ok(set.try_add(hash)) Ok(set.try_add(hash))
} else { } else {
let _lock = match sem {
Some(sem) => Some(sem.acquire_owned().await),
None => None,
};
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.read(true) .read(true)
.open(path).await?; .open(path).await?;
@ -186,7 +190,7 @@ pub fn do_dir<P: AsRef<Path>>(dir: P, depth: usize, set: &mut container::DupeMap
/// Walk a dir structure and remove all dupes in it /// Walk a dir structure and remove all dupes in it
#[cfg(feature="threads")] #[cfg(feature="threads")]
pub fn do_dir_async<P: AsRef<Path> + std::marker::Send + std::marker::Sync + 'static>(dir: P, depth: usize, set: std::sync::Arc<tokio::sync::Mutex<container::DupeMap>>, mode: config::Mode) -> futures::future::BoxFuture<'static, Result<DupeCount, error::Error>> pub fn do_dir_async<P: AsRef<Path> + std::marker::Send + std::marker::Sync + 'static>(dir: P, depth: usize, set: std::sync::Arc<tokio::sync::Mutex<container::DupeMap>>, mode: config::Mode, semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>>) -> futures::future::BoxFuture<'static, Result<DupeCount, error::Error>>
{ {
use std::sync::Arc; use std::sync::Arc;
use futures::future::{ use futures::future::{
@ -213,9 +217,10 @@ pub fn do_dir_async<P: AsRef<Path> + std::marker::Send + std::marker::Sync + 'st
let set = Arc::clone(&set); let set = Arc::clone(&set);
let cmode = cmode.clone(); let cmode = cmode.clone();
let mode = mode.clone(); let mode = mode.clone();
let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem));
children.push(tokio::task::spawn(async move { children.push(tokio::task::spawn(async move {
log!(Info, cmode.logging_mode => "OK {:?}", obj); log!(Info, cmode.logging_mode => "OK {:?}", obj);
match mode.handle(do_dir_async(obj, depth+1, set, cmode).await) { match mode.handle(do_dir_async(obj, depth+1, set, cmode, semaphore).await) {
Ok(v) => Ok(v.unwrap_or_default()), Ok(v) => Ok(v.unwrap_or_default()),
Err(v) => Err(v), Err(v) => Err(v),
} }
@ -224,8 +229,9 @@ pub fn do_dir_async<P: AsRef<Path> + std::marker::Send + std::marker::Sync + 'st
let set = Arc::clone(&set); let set = Arc::clone(&set);
let mode = mode.clone(); let mode = mode.clone();
let cmode = cmode.clone(); let cmode = cmode.clone();
let semaphore = semaphore.as_ref().map(|sem| Arc::clone(sem));
workers.push(tokio::task::spawn(async move { workers.push(tokio::task::spawn(async move {
match mode.handle(process_file_async(&obj, &set).await) { match mode.handle(process_file_async(&obj, &set, semaphore).await) {
Ok(v) => { Ok(v) => {
if v.unwrap_or_default() { if v.unwrap_or_default() {
log!(Info, cmode.logging_mode => "OK {:?}", obj); log!(Info, cmode.logging_mode => "OK {:?}", obj);

Loading…
Cancel
Save