From d7b46d7c9a97eb8f0cd9b63065c643d8c7980c6b Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 22 Mar 2025 13:07:19 +0000 Subject: [PATCH] Cancelling works when it is NOT a signal because of children PGIDs... MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Future blessing − 末吉 --- Cargo.lock | 106 ++++++++++++++++++++++++------------------------ Cargo.toml | 2 +- src/main.rs | 2 +- src/process.rs | 7 ++-- src/progress.rs | 17 ++++++++ src/work.rs | 36 +++++++++++----- 6 files changed, 102 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 809c7b2..e316571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 4 + [[package]] name = "arc-swap" version = "0.4.7" @@ -59,9 +61,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -74,9 +76,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -84,15 +86,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -101,42 +103,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ - "proc-macro-hack", "proc-macro2", "quote", - "syn", + "syn 2.0.100", ] [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" -dependencies = [ - "once_cell", -] +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -145,10 +143,8 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project", + "pin-project-lite 0.2.16", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -307,12 +303,6 @@ dependencies = [ "libc", ] -[[package]] -name = "once_cell" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" - [[package]] name = "pin-project" version = "0.4.23" @@ -330,7 +320,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.38", ] [[package]] @@ -340,37 +330,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" [[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "proc-macro-hack" -version = "0.5.18" +name = "pin-project-lite" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] -name = "proc-macro-nested" -version = "0.1.6" +name = "pin-utils" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "proc-macro2" -version = "1.0.19" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] name = "quote" -version = "1.0.7" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -455,6 +439,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "syn" +version = "2.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "terminal_size" version = "0.1.13" @@ -491,8 +486,9 @@ dependencies = [ "mio-named-pipes", "mio-uds", "num_cpus", - "pin-project-lite", + "pin-project-lite 0.1.7", "signal-hook-registry", + "slab", "tokio-macros", "winapi 0.3.9", ] @@ -505,9 +501,15 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.38", ] +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + [[package]] name = "unicode-xid" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index c7c3daa..d594ebd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ collect_err = [] [dependencies] lazy_static = "1.4" -tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} +tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]} futures = "0.3" termprogress = {version="0.3", optional=true} cfg-if = "0.1" diff --git a/src/main.rs b/src/main.rs index 1f68a92..40ce465 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,7 +59,7 @@ async fn work() -> Result<(), Box> } }, _ => args.max_children, - }, tokio::signal::ctrl_c().boxed()).await + }, tokio::spawn(tokio::signal::ctrl_c().fuse())).await } diff --git a/src/process.rs b/src/process.rs index 9b74a3c..09b6001 100644 --- a/src/process.rs +++ b/src/process.rs @@ -69,13 +69,14 @@ where U: IntoIterator, let stderr = std::process::Stdio::inherit(); } }; - let mut child = match Command::new(process) .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? - .args(args) + .args(args.into_iter()) .stdout(std::process::Stdio::piped()) .stderr(stderr) .stdin(std::process::Stdio::null()) + .kill_on_drop(false) +//.process_group(0) //XXX: It's this that is propagating the SIGINT!!! .spawn() { Ok(chi) => chi, Err(sp) => { @@ -107,7 +108,7 @@ where U: IntoIterator, } }); let _ = task::yield_now().await; - match child.await { + match tokio::spawn(child).await.expect("Child supervisor panic") { Ok(exit) => { if exit.success() { cfg_if!{ diff --git a/src/progress.rs b/src/progress.rs index 510f7e5..3a741ff 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -519,6 +519,7 @@ pub fn create_progress bool + { + match self { + Self::WorkerDropped => true, + _ => false, + } + } +} + impl std::error::Error for Error{} impl std::fmt::Display for Error { diff --git a/src/work.rs b/src/work.rs index 55d31fb..57c8781 100644 --- a/src/work.rs +++ b/src/work.rs @@ -91,6 +91,7 @@ async fn do_work(process: impl AsRef, file: impl AsRef, mut prog }; let _ = tokio::task::yield_now().await; //let _ = opt_await.await; + match process::contained_spawn(process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Err(error) => { @@ -127,9 +128,9 @@ where I: IntoIterator, }; /// Make a future cancellable by the passed `cancel` token. macro_rules! pass_cancel { - ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); + ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); } - + let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); @@ -179,11 +180,12 @@ where I: IntoIterator, let results = pass_cancel!(join_all( files - .map(|filename| { + .map(|filename| { + let cancel = cancel.clone(); + let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); - let cancel = cancel.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -193,7 +195,14 @@ where I: IntoIterator, #[cfg(feature="progress")] type Opt = OptionFuture; #[cfg(not(feature="progress"))] type Opt = std::marker::PhantomData; let _task_id: Opt<_> = { + + if cancel.is_aborted() { + return; + } let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; + if cancel.is_aborted() { + return; + } // (task_id, worker_result) let worker = { @@ -223,7 +232,10 @@ where I: IntoIterator, #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await .or_else(|e| { - eprintln!("\n{}",e); Err(e) + if !e.can_ignore_on_com_send_failure() { + eprintln!("\n{}",e); + } + Err(e) }); }, } @@ -253,10 +265,10 @@ where I: IntoIterator, }),i+=1).0 }))).await .map(|x| x - .into_iter() - .filter_map(|x| x.err())); + .into_iter() + .filter_map(|x| x.err())); - let results = match results { + let results: Box> = match results { Err(_) => { #[cfg(feature="progress")] { progress.eprintln("[!] Child aborting...").await?.await?; @@ -264,17 +276,19 @@ where I: IntoIterator, } #[cfg(not(feature="progress"))] eprintln!("[!] Child aborting..."); - todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); + //todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); + // TODO: How to impl: Make a sparse-map of `usize`-keyed `MappedData<[AtomicRefCell>]> :: get_async(Process::next_uniq_id()) -> Pin<&mut SparseEntry<'_, T>>`, which keys are stored in a `std::bitset` analogue + Box::new(std::iter::empty()) }, - Ok(v) => v, + Ok(v) => Box::new(v), }; - #[cfg(feature="progress")] progress.shutdown().await?; for failed in results { #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); } + #[cfg(feature="progress")] progress.shutdown().await?; drop(tx); display.await?;