Cancelling works when it is NOT a signal because of children PGIDs...

Fortune for leanify-many's current commit: Future blessing − 末吉
master
Avril 2 weeks ago
parent db20ca106f
commit d7b46d7c9a
Signed by: flanchan
GPG Key ID: 284488987C31F630

106
Cargo.lock generated

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "0.4.7" version = "0.4.7"
@ -59,9 +61,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -74,9 +76,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -84,15 +86,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -101,42 +103,38 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [ dependencies = [
"proc-macro-hack",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 2.0.100",
] ]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
dependencies = [
"once_cell",
]
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.5" version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -145,10 +143,8 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project", "pin-project-lite 0.2.16",
"pin-utils", "pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab", "slab",
] ]
@ -307,12 +303,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "once_cell"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "0.4.23" version = "0.4.23"
@ -330,7 +320,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 1.0.38",
] ]
[[package]] [[package]]
@ -340,37 +330,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]] [[package]]
name = "pin-utils" name = "pin-project-lite"
version = "0.1.0" version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]] [[package]]
name = "proc-macro-nested" name = "pin-utils"
version = "0.1.6" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.19" version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12" checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
dependencies = [ dependencies = [
"unicode-xid", "unicode-ident",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.7" version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -455,6 +439,17 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "syn"
version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]] [[package]]
name = "terminal_size" name = "terminal_size"
version = "0.1.13" version = "0.1.13"
@ -491,8 +486,9 @@ dependencies = [
"mio-named-pipes", "mio-named-pipes",
"mio-uds", "mio-uds",
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite 0.1.7",
"signal-hook-registry", "signal-hook-registry",
"slab",
"tokio-macros", "tokio-macros",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -505,9 +501,15 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn 1.0.38",
] ]
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.1" version = "0.2.1"

@ -44,7 +44,7 @@ collect_err = []
[dependencies] [dependencies]
lazy_static = "1.4" lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]}
futures = "0.3" futures = "0.3"
termprogress = {version="0.3", optional=true} termprogress = {version="0.3", optional=true}
cfg-if = "0.1" cfg-if = "0.1"

@ -59,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
} }
}, },
_ => args.max_children, _ => args.max_children,
}, tokio::signal::ctrl_c().boxed()).await }, tokio::spawn(tokio::signal::ctrl_c().fuse())).await
} }

@ -69,13 +69,14 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit(); let stderr = std::process::Stdio::inherit();
} }
}; };
let mut child = match Command::new(process) let mut child = match Command::new(process)
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args) .args(args.into_iter())
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.stderr(stderr) .stderr(stderr)
.stdin(std::process::Stdio::null()) .stdin(std::process::Stdio::null())
.kill_on_drop(false)
//.process_group(0) //XXX: It's this that is propagating the SIGINT!!!
.spawn() { .spawn() {
Ok(chi) => chi, Ok(chi) => chi,
Err(sp) => { Err(sp) => {
@ -107,7 +108,7 @@ where U: IntoIterator<Item=V>,
} }
}); });
let _ = task::yield_now().await; let _ = task::yield_now().await;
match child.await { match tokio::spawn(child).await.expect("Child supervisor panic") {
Ok(exit) => { Ok(exit) => {
if exit.success() { if exit.success() {
cfg_if!{ cfg_if!{

@ -519,6 +519,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.complete(); progress.complete();
}).await.map_err(|_| Error::WorkerPanic); }).await.map_err(|_| Error::WorkerPanic);
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters"); shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
res res
}); });
handle handle
@ -542,6 +543,22 @@ pub enum Error
Unknown, Unknown,
} }
impl Error
{
/// If this error comes from an attempt to communicate with the worker that failed but can be ignored.
///
/// # Usage
/// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so.
/// Those tasks can ignore those errors entirely if this function returns `true`.
pub fn can_ignore_on_com_send_failure(&self) -> bool
{
match self {
Self::WorkerDropped => true,
_ => false,
}
}
}
impl std::error::Error for Error{} impl std::error::Error for Error{}
impl std::fmt::Display for Error impl std::fmt::Display for Error
{ {

@ -91,6 +91,7 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
}; };
let _ = tokio::task::yield_now().await; let _ = tokio::task::yield_now().await;
//let _ = opt_await.await; //let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await { match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => { Err(error) => {
@ -127,9 +128,9 @@ where I: IntoIterator<Item=T>,
}; };
/// Make a future cancellable by the passed `cancel` token. /// Make a future cancellable by the passed `cancel` token.
macro_rules! pass_cancel { macro_rules! pass_cancel {
($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); ($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
} }
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
@ -179,11 +180,12 @@ where I: IntoIterator<Item=T>,
let results = let results =
pass_cancel!(join_all( pass_cancel!(join_all(
files files
.map(|filename| { .map(|filename| {
let cancel = cancel.clone();
let semaphore = semaphore.clone(); let semaphore = semaphore.clone();
let process = Arc::clone(&process); let process = Arc::clone(&process);
let mut tx = tx.clone(); let mut tx = tx.clone();
let cancel = cancel.clone();
#[cfg(feature="progress")] let mut progress = progress.clone(); #[cfg(feature="progress")] let mut progress = progress.clone();
@ -193,7 +195,14 @@ where I: IntoIterator<Item=T>,
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>; #[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>; #[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = { let _task_id: Opt<_> = {
if cancel.is_aborted() {
return;
}
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
if cancel.is_aborted() {
return;
}
// (task_id, worker_result) // (task_id, worker_result)
let worker = { let worker = {
@ -223,7 +232,10 @@ where I: IntoIterator<Item=T>,
#[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error));
#[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await
.or_else(|e| { .or_else(|e| {
eprintln!("\n{}",e); Err(e) if !e.can_ignore_on_com_send_failure() {
eprintln!("\n{}",e);
}
Err(e)
}); });
}, },
} }
@ -253,10 +265,10 @@ where I: IntoIterator<Item=T>,
}),i+=1).0 }),i+=1).0
}))).await }))).await
.map(|x| x .map(|x| x
.into_iter() .into_iter()
.filter_map(|x| x.err())); .filter_map(|x| x.err()));
let results = match results { let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results {
Err(_) => { Err(_) => {
#[cfg(feature="progress")] { #[cfg(feature="progress")] {
progress.eprintln("[!] Child aborting...").await?.await?; progress.eprintln("[!] Child aborting...").await?.await?;
@ -264,17 +276,19 @@ where I: IntoIterator<Item=T>,
} }
#[cfg(not(feature="progress"))] eprintln!("[!] Child aborting..."); #[cfg(not(feature="progress"))] eprintln!("[!] Child aborting...");
todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); //todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)");
// TODO: How to impl: Make a sparse-map of `usize`-keyed `MappedData<[AtomicRefCell<MaybeUninit<T>>]> :: get_async(Process::next_uniq_id()) -> Pin<&mut SparseEntry<'_, T>>`, which keys are stored in a `std::bitset<usize>` analogue
Box::new(std::iter::empty())
}, },
Ok(v) => v, Ok(v) => Box::new(v),
}; };
#[cfg(feature="progress")] progress.shutdown().await?;
for failed in results for failed in results
{ {
#[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
} }
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx); drop(tx);
display.await?; display.await?;

Loading…
Cancel
Save