Compare commits

..

No commits in common. '7fa4004ebf9a9997562f888b8c2af8950e70a6f1' and '354cac4a5002e11e531e3cb32d320cdd3c65849d' have entirely different histories.

113
Cargo.lock generated

@ -1,7 +1,5 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "0.4.7" version = "0.4.7"
@ -61,9 +59,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -76,9 +74,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -86,15 +84,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -103,38 +101,42 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
dependencies = [ dependencies = [
"proc-macro-hack",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.100", "syn",
] ]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
dependencies = [
"once_cell",
]
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.31" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -143,8 +145,10 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite 0.2.16", "pin-project",
"pin-utils", "pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab", "slab",
] ]
@ -184,12 +188,11 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "leanify-many" name = "leanify-many"
version = "1.2.0" version = "1.1.1"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"futures", "futures",
"lazy_static", "lazy_static",
"libc",
"num_cpus", "num_cpus",
"pin-project", "pin-project",
"recolored", "recolored",
@ -200,9 +203,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.171" version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10"
[[package]] [[package]]
name = "log" name = "log"
@ -304,6 +307,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "once_cell"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "0.4.23" version = "0.4.23"
@ -321,7 +330,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.38", "syn",
] ]
[[package]] [[package]]
@ -330,32 +339,38 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]] [[package]]
name = "pin-utils" name = "pin-utils"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
[[package]]
name = "proc-macro-nested"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.94" version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12"
dependencies = [ dependencies = [
"unicode-ident", "unicode-xid",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -440,17 +455,6 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "syn"
version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]] [[package]]
name = "terminal_size" name = "terminal_size"
version = "0.1.13" version = "0.1.13"
@ -487,9 +491,8 @@ dependencies = [
"mio-named-pipes", "mio-named-pipes",
"mio-uds", "mio-uds",
"num_cpus", "num_cpus",
"pin-project-lite 0.1.7", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"slab",
"tokio-macros", "tokio-macros",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -502,15 +505,9 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.38", "syn",
] ]
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.1" version = "0.2.1"

@ -1,6 +1,6 @@
[package] [package]
name = "leanify-many" name = "leanify-many"
version = "1.2.0" version = "1.1.1"
description = "spawn leanify subprocesses" description = "spawn leanify subprocesses"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"
@ -44,14 +44,13 @@ collect_err = []
[dependencies] [dependencies]
lazy_static = "1.4" lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]} tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]}
futures = "0.3" futures = "0.3"
termprogress = {version="0.3", optional=true} termprogress = {version="0.3", optional=true}
cfg-if = "0.1" cfg-if = "0.1"
recolored = { version = "1.9", optional = true } recolored = { version = "1.9", optional = true }
num_cpus = "1.13" num_cpus = "1.13"
pin-project = {version = "0.4", optional = true} pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"] }
[build-dependencies] [build-dependencies]
rustc_version = "0.2" rustc_version = "0.2"

@ -45,7 +45,6 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
} }
} }
use futures::future::FutureExt;
work::work(&args.flags, leanify, args.files, match args.flags.hard_limit { work::work(&args.flags, leanify, args.files, match args.flags.hard_limit {
Some(hard_limit) => { Some(hard_limit) => {
// We have hard limit // We have hard limit
@ -59,7 +58,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
} }
}, },
_ => args.max_children, _ => args.max_children,
}, tokio::signal::ctrl_c().boxed().fuse()).await }).await
} }

@ -49,26 +49,6 @@ impl AsRef<Path> for Process
} }
} }
fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static
{
use libc::{
getpid,
setpgid,
pid_t
};
move || {
//let this = unsafe { getpid() };
// SAFETY: We are setting the PGID of this (child) process to detach it from the group of the parent so signals are not propagated from the parent.
unsafe {
if setpgid(0, to) != 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
/// Spawn the process, and contain its standard output. /// Spawn the process, and contain its standard output.
/// ///
/// # Notes /// # Notes
@ -89,28 +69,19 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit(); let stderr = std::process::Stdio::inherit();
} }
}; };
let mut child = Command::new(process);
let child = child let mut child = match Command::new(process)
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args.into_iter()) .args(args)
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.stderr(stderr) .stderr(stderr)
.stdin(std::process::Stdio::null()) .stdin(std::process::Stdio::null())
.kill_on_drop(false); .spawn() {
Ok(chi) => chi,
// SAFETY: This is only calling `setpgid()`. Err(sp) => {
unsafe { return Err(Error::Spawning(sp));
child }
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly };
};
let mut child = match child.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap(); let stdout = child.stdout.take().unwrap();
#[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = { #[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = {
@ -136,7 +107,7 @@ where U: IntoIterator<Item=V>,
} }
}); });
let _ = task::yield_now().await; let _ = task::yield_now().await;
match tokio::spawn(child).await.expect("Child supervisor panic") { match child.await {
Ok(exit) => { Ok(exit) => {
if exit.success() { if exit.success() {
cfg_if!{ cfg_if!{

@ -59,11 +59,6 @@ pub enum CommandKind
AddTask(Task), AddTask(Task),
RemoveTask(usize), RemoveTask(usize),
RemoveAll {
lock: bool,
title: Option<Cow<'static, str>>,
},
Complete, Complete,
Many(Vec<CommandKind>), Many(Vec<CommandKind>),
@ -199,12 +194,6 @@ impl<'a> CommandBuilder<'a>
self self
} }
pub fn clear_tasks(&mut self, lock: bool, title: Option<&'static str>) -> &mut Self
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Borrowed), lock });
self
}
/// Signal a shutdown to the worker /// Signal a shutdown to the worker
pub fn shutdown(&mut self) -> &mut Self pub fn shutdown(&mut self) -> &mut Self
{ {
@ -328,11 +317,6 @@ impl ProgressSender
self.send_command(CommandKind::RemoveTask(task_idx)).await self.send_command(CommandKind::RemoveTask(task_idx)).await
} }
pub async fn clear_tasks(&mut self, lock: bool, title: Option<String>) -> Result<CommandWaiter, Error>
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Owned), lock }).await
}
/// Signal a shutdown to the worker /// Signal a shutdown to the worker
pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error> pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error>
{ {
@ -479,20 +463,6 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str()); progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
} }
}, },
CommandKind::RemoveAll{ title: None, lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
},
CommandKind::RemoveAll{ title: Some(title), lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, title.as_ref()).as_str());
},
CommandKind::Complete => { CommandKind::Complete => {
if has_blanked { if has_blanked {
progress.refresh(); progress.refresh();
@ -519,7 +489,6 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.complete(); progress.complete();
}).await.map_err(|_| Error::WorkerPanic); }).await.map_err(|_| Error::WorkerPanic);
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters"); shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
res res
}); });
handle handle
@ -543,22 +512,6 @@ pub enum Error
Unknown, Unknown,
} }
impl Error
{
/// If this error comes from an attempt to communicate with the worker that failed but can be ignored.
///
/// # Usage
/// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so.
/// Those tasks can ignore those errors entirely if this function returns `true`.
pub fn can_ignore_on_com_send_failure(&self) -> bool
{
match self {
Self::WorkerDropped => true,
_ => false,
}
}
}
impl std::error::Error for Error{} impl std::error::Error for Error{}
impl std::fmt::Display for Error impl std::fmt::Display for Error
{ {

@ -1,9 +1,5 @@
use super::*; use super::*;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::sync::atomic::{
AtomicBool,
Ordering,
};
#[cfg(nightly)] type ListAbs<T> = std::collections::LinkedList<T>; #[cfg(nightly)] type ListAbs<T> = std::collections::LinkedList<T>;
#[cfg(not(nightly))] type ListAbs<T> = Vec<T>; #[cfg(not(nightly))] type ListAbs<T> = Vec<T>;
@ -13,8 +9,6 @@ pub struct TaskList {
list: ListAbs<(usize, String)>, list: ListAbs<(usize, String)>,
buffer: String, buffer: String,
index: usize, index: usize,
poisoned: AtomicBool,
} }
fn find<T,F>(list: &ListAbs<T>, mut fun: F) -> Option<usize> fn find<T,F>(list: &ListAbs<T>, mut fun: F) -> Option<usize>
@ -38,8 +32,6 @@ impl TaskList
list: ListAbs::new(), list: ListAbs::new(),
buffer: String::new(), buffer: String::new(),
index: 0, index: 0,
poisoned: AtomicBool::new(false),
} }
} }
@ -52,15 +44,10 @@ impl TaskList
/// Push a new task string, and return its ID. /// Push a new task string, and return its ID.
pub fn push(&mut self, string: impl Into<String>) -> usize pub fn push(&mut self, string: impl Into<String>) -> usize
{ {
if self.poisoned.load(Ordering::SeqCst) {
return 0;
}
let idx = { let idx = {
self.index+=1; self.index+=1;
self.index self.index
}; };
#[cfg(nightly)] self.list.push_back((idx,string.into())); #[cfg(nightly)] self.list.push_back((idx,string.into()));
#[cfg(not(nightly))] self.list.push((idx,string.into())); #[cfg(not(nightly))] self.list.push((idx,string.into()));
@ -71,10 +58,6 @@ impl TaskList
/// Pop a task off the string, returns `true` if successful, `false` if wasn't found. /// Pop a task off the string, returns `true` if successful, `false` if wasn't found.
pub fn pop(&mut self, idx: usize) -> bool pub fn pop(&mut self, idx: usize) -> bool
{ {
if idx == 0 && self.poisoned.load(Ordering::Relaxed) {
return false;
}
if let Some(idx) = find(&self.list, |&(i, _)| idx == i) if let Some(idx) = find(&self.list, |&(i, _)| idx == i)
{ {
self.list.remove(idx); self.list.remove(idx);
@ -85,10 +68,6 @@ impl TaskList
} }
} }
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst);
}
/// Clear the `TaskList` /// Clear the `TaskList`
pub fn clear(&mut self) pub fn clear(&mut self)
{ {
@ -131,8 +110,6 @@ impl<T> FromIterator<T> for TaskList
list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(), list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(),
buffer: Default::default(), buffer: Default::default(),
index: 0, index: 0,
poisoned: AtomicBool::new(false),
}; };
this.index = i; this.index = i;
this.recalc(); this.recalc();

@ -91,7 +91,6 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
}; };
let _ = tokio::task::yield_now().await; let _ = tokio::task::yield_now().await;
//let _ = opt_await.await; //let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await { match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => { Err(error) => {
@ -109,28 +108,12 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
} }
} }
pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>, cancel: impl Future + Send + Unpin + 'static) -> Result<(), Box<dyn std::error::Error>> pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>, where I: IntoIterator<Item=T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator, <I as IntoIterator>::IntoIter: ExactSizeIterator,
T: AsRef<OsStr> + Send + Sync + 'static + Clone, T: AsRef<OsStr> + Send + Sync + 'static + Clone,
U: Into<PathBuf> U: Into<PathBuf>
{ {
let (cancel, cancel_register) = {
use futures::future::AbortHandle;
let (handle, reg) = AbortHandle::new_pair();
let rh = handle.clone();
tokio::spawn(async move {
let _ = cancel.await;
handle.abort();
});
(rh, reg)
};
/// Make a future cancellable by the passed `cancel` token.
macro_rules! pass_cancel {
($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
}
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
@ -178,38 +161,25 @@ where I: IntoIterator<Item=T>,
}; };
let mut i=0usize; let mut i=0usize;
let results = let results =
pass_cancel!(join_all( join_all(
files files
.map(|filename| -> OptionFuture<_> { .map(|filename| {
let cancel = cancel.clone();
// Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has.
if cancel.is_aborted() {
return None.into();
}
let semaphore = semaphore.clone(); let semaphore = semaphore.clone();
let process = Arc::clone(&process); let process = Arc::clone(&process);
let mut tx = tx.clone(); let mut tx = tx.clone();
#[cfg(feature="progress")] let mut progress = progress.clone(); #[cfg(feature="progress")] let mut progress = progress.clone();
Some((tokio::spawn(async move { (tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>; #[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>; #[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = { let _task_id: Opt<_> = {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
// Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has.
if cancel.is_aborted() {
return;
}
// (task_id, worker_result) // (task_id, worker_result)
let worker = { let worker = {
cfg_if!{ cfg_if!{
if #[cfg(feature="progress")] { if #[cfg(feature="progress")] {
let worker = do_work(&process, &filename, progress.clone()); let worker = do_work(&process, &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref())); let task = progress.add_task(format!("{:?}", filename.as_ref()));
future::join(task, worker).await future::join(task, worker).await
@ -234,10 +204,7 @@ where I: IntoIterator<Item=T>,
#[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error));
#[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await
.or_else(|e| { .or_else(|e| {
if !e.can_ignore_on_com_send_failure() { eprintln!("\n{}",e); Err(e)
eprintln!("\n{}",e);
}
Err(e)
}); });
}, },
} }
@ -264,44 +231,20 @@ where I: IntoIterator<Item=T>,
} else { } else {
let _ = progress.bump_min(1).await; let _ = progress.bump_min(1).await;
} }
}),i+=1).0).into() }),i+=1).0
}))).await })).await
.map(|x| x .into_iter()
.into_iter() .filter_map(|x| x.err());
.filter_map(|x| x?.err()));
let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results {
Err(_) => {
let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!"));
#[cfg(feature="progress")] {
progress.eprintln(msg).await?.await?;
//progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
}
#[cfg(not(feature="progress"))] eprintln!("{}", msg);
// We do not have the results, return an empty iterator #[cfg(feature="progress")] progress.shutdown().await?;
Box::new(std::iter::empty()) for failed in results
},
Ok(v) => Box::new(v),
};
for failed in results
{ {
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
} }
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx); drop(tx);
display.await?; display.await?;
// Display task has completed, child tasks are complete; we are about to exit.
if cancel.is_aborted() {
eprintln!("[.] Running children complete, ignoring rest due to cancellation.");
}
#[cfg(feature="progress")] prog_handle.await.expect("Child panic")?; #[cfg(feature="progress")] prog_handle.await.expect("Child panic")?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save