progress okay for top-level

progress
Avril 4 years ago
parent 950bf6db88
commit 72bfc293cd
Signed by: flanchan
GPG Key ID: 284488987C31F630

23
Cargo.lock generated

@ -39,6 +39,17 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.0" version = "1.0.0"
@ -605,6 +616,17 @@ dependencies = [
"rand_core", "rand_core",
] ]
[[package]]
name = "recolored"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1584c92dd8a87686229f766bb3a62d263a90c47c81e45a49f1a6d684a1b7968d"
dependencies = [
"atty",
"lazy_static",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.1.57" version = "0.1.57"
@ -875,6 +897,7 @@ dependencies = [
"futures", "futures",
"lazy_format", "lazy_format",
"lazy_static", "lazy_static",
"recolored",
"rustc_version", "rustc_version",
"sha2", "sha2",
"smallmap", "smallmap",

@ -31,6 +31,7 @@ smallmap = "^1.1.6"
termprogress = "0.3.4" termprogress = "0.3.4"
uuid = {version = "0.8.1", features=["v4"]} uuid = {version = "0.8.1", features=["v4"]}
lazy_format = "1.8.3" lazy_format = "1.8.3"
recolored = "1.9.3"
[build-dependencies] [build-dependencies]
rustc_version = "0.2" rustc_version = "0.2"

@ -24,7 +24,7 @@ pub fn program_name() -> &'static str
/// Process program args in parallel spawning the `callback` closure of the argument in a new task for each. /// Process program args in parallel spawning the `callback` closure of the argument in a new task for each.
/// ///
/// The returned future can be awaited to wait for all tasks to complete. If one or more tasks are cancelled or panic, this future will immediately output `Err()`, if they all complete successfully, it will output an aggregate `Vec` of the output of each argument in order. /// The returned future can be awaited to wait for all tasks to complete. If one or more tasks are cancelled or panic, this future will immediately output `Err()`, if they all complete successfully, it will output an aggregate `Vec` of the output of each argument in order.
pub fn process<F, T>(mut callback: F) -> impl Future<Output= eyre::Result<Vec<T::Output>>> pub fn process<F, T>(mut callback: F) -> (usize, impl Future<Output= eyre::Result<Vec<T::Output>>>)
where F: FnMut(String) -> T, where F: FnMut(String) -> T,
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send, T::Output: Send,
@ -32,7 +32,7 @@ where F: FnMut(String) -> T,
let args = std::env::args(); let args = std::env::args();
let output: Vec<_> = args.skip(1).dedup().map(|arg| tokio::spawn(callback(arg))).collect(); let output: Vec<_> = args.skip(1).dedup().map(|arg| tokio::spawn(callback(arg))).collect();
let mut real_output = Vec::with_capacity(output.len()); let mut real_output = Vec::with_capacity(output.len());
async move { (output.len(), async move {
let mut j=0; let mut j=0;
for x in futures::future::try_join_all(output).await for x in futures::future::try_join_all(output).await
.wrap_err(eyre!("Child panic or cancel.")) .wrap_err(eyre!("Child panic or cancel."))
@ -43,5 +43,5 @@ where F: FnMut(String) -> T,
j+=1; j+=1;
} }
Ok(real_output) Ok(real_output)
} })
} }

@ -25,13 +25,13 @@ use futures::{
/// Process a single file. /// Process a single file.
/// ///
/// `path` is known to be a file at this point. /// `path` is known to be a file at this point.
async fn process_single(state: Arc<state::State>, path: impl AsRef<Path>) -> eyre::Result<()> async fn process_single(state: Arc<state::State>, logger: &mut progress::logging::Logger, path: impl AsRef<Path>) -> eyre::Result<()>
{ {
let path = path.as_ref(); let path = path.as_ref();
debug_assert!(path.is_file(), "process_single() expected a file, but {:?} is not one.", path); debug_assert!(path.is_file(), "process_single() expected a file, but {:?} is not one.", path);
let _g = state.lock().await; let _g = state.lock().await;
debug!(state.logger_output() => "{:?} Processing", path); trace!(logger => "{:?} Processing", path);
Ok(()) Ok(())
} }
@ -39,30 +39,41 @@ async fn process_single(state: Arc<state::State>, path: impl AsRef<Path>) -> eyr
/// Process this path /// Process this path
/// ///
/// This will not return until all its children finish too (if any) /// This will not return until all its children finish too (if any)
pub async fn process<'a, P>(state: Arc<state::State>, path: P) -> eyre::Result<()> pub async fn process<'a, P, L>(state: Arc<state::State>, mut logger: L, path: P) -> eyre::Result<()>
where P: 'a + Send + AsRef<Path> where P: 'a + Send + AsRef<Path>,
L: std::borrow::BorrowMut<progress::logging::Logger>
{ {
let path = path.as_ref(); let path = path.as_ref();
if path.is_dir() { if path.is_dir() {
let read = fs::read_dir(path).await?; let read = fs::read_dir(path).await?;
fn proc_dir(state: Arc<state::State>, mut read: fs::ReadDir) -> BoxFuture<'static, JoinHandle<Result<(), eyre::Report>>> fn proc_dir(state: Arc<state::State>, logger: &mut progress::logging::Logger, mut read: fs::ReadDir) -> BoxFuture<'static, JoinHandle<Result<(), eyre::Report>>>
{ {
let mut logger = logger.clone();
async move { async move {
tokio::spawn(async move { tokio::spawn(async move {
while let Some(entry) = read.next_entry().await? while let Some(entry) = read.next_entry().await?
{ {
process(Arc::clone(&state), entry.path()).await?; let path = entry.path();
if let Err(error) = process(Arc::clone(&state), &mut logger, &path).await {
{
let err = &error;
let path = &path; //TODO: Get these macros to stop moving
error!(logger => "{:?} child failed: {}", path, err);
}
debug!(logger => "Error for {:?}: {:?}", path, error);
}
} }
Ok::<_, eyre::Report>(()) Ok::<_, eyre::Report>(())
}) })
}.boxed() }.boxed()
} }
let handle = proc_dir(state, read).await; let handle = proc_dir(state, logger.borrow_mut(), read).await;
let res = handle.await let res = handle.await
.wrap_err(eyre!("Child exited abnormally"))?; .wrap_err(eyre!("Child exited abnormally"))?;
res.wrap_err(eyre!("Failed to process children")) res.wrap_err(eyre!("Failed to process children"))
} else if path.is_file() { } else if path.is_file() {
process_single(state, path).await process_single(state, logger.borrow_mut(), path).await
} else { } else {
Err(eyre!("Invalid/unsupported FSO")) Err(eyre!("Invalid/unsupported FSO"))
}.with_section(|| format!("{:?}", path).header("Path was")) }.with_section(|| format!("{:?}", path).header("Path was"))

@ -91,15 +91,17 @@ async fn process(state: Arc<state::State>, file: String) -> eyre::Result<()>
let path = std::path::Path::new(&file); let path = std::path::Path::new(&file);
let mut progress = state.progress().clone(); let mut progress = state.progress().clone();
let task_id_fut = progress.send_command(progress::CommandKind::AddTask(file.clone())).await?; let task_id_fut = progress.send_command(progress::CommandKind::AddTask(file.clone())).await?;
let mut logger= state.logger_output();
let res = if !path.exists() { let res = if !path.exists() {
error!(yield state.logger_output() => "{:?} does not exist, skipping", path); error!(yield logger => "{:?} does not exist, skipping", path);
Ok(()) Ok(())
} else { } else {
info!(state.logger_output() => "{:?} Processing", path); info!(logger => "{:?} Processing", path);
delete::process(state, path).await delete::process(state, &mut logger, path).await
.wrap_err(eyre!("Processing failed")) .wrap_err(eyre!("Processing failed"))
.with_section(move || file.header("Root path was")) .with_section(move || file.header("Root path was"))
}; };
progress.send_command_and_detach(progress::CommandKind::Bump(1)).await?;
progress.send_command_and_wait(progress::CommandKind:: progress.send_command_and_wait(progress::CommandKind::
RemoveTask(task_id_fut.await? RemoveTask(task_id_fut.await?
.map(|x| x.downcast() .map(|x| x.downcast()
@ -152,11 +154,15 @@ async fn begin() -> eyre::Result<i32>
state state
}; };
if args::process(|file| { if {
let state = Arc::clone(&state); let (sz, pro) = args::process(|file| {
use futures::future::TryFutureExt; let state = Arc::clone(&state);
process(state, file).inspect_err(|err| eprintln!("{:?}", err)) use futures::future::TryFutureExt;
}).await process(state, file).inspect_err(|err| eprintln!("{:?}", err))
});
state.progress().clone().send_command_and_wait(progress::CommandKind::BumpHigh(sz as isize)).await?;
pro
}.await
.wrap_err(eyre!("One or more child workers failed to complete successfully"))? .wrap_err(eyre!("One or more child workers failed to complete successfully"))?
.len() == 0 .len() == 0
{ {

@ -5,6 +5,7 @@ use std::{
fmt, fmt,
error, error,
}; };
use recolored::Colorize;
/// The logging level /// The logging level
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -23,12 +24,12 @@ impl fmt::Display for Level
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{ {
write!(f, "{}", match self { write!(f, "{}", match self {
Level::Trace => "TRACE", Level::Trace => "TRACE".purple(),
Level::Debug => "DEBUG", Level::Debug => "DEBUG".blue(),
Level::Info => "INFO", Level::Info => "INFO".green(),
Level::Warning => "WARNING", Level::Warning => "WARNING".yellow(),
Level::Error => "ERROR", Level::Error => "ERROR".red(),
Level::Fatal => "FATAL", Level::Fatal => "FATAL".bright_red(),
}) })
} }
} }

Loading…
Cancel
Save