From 2b6569651289d4e8641fc3e9144fd5ed675da696 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 22 Mar 2025 13:47:51 +0000 Subject: [PATCH] Added SIGINT graceful cancellation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Small curse − 小凶 --- Cargo.lock | 5 +++-- Cargo.toml | 1 + src/main.rs | 2 +- src/process.rs | 46 +++++++++++++++++++++++++++++++++++++--------- src/work.rs | 48 +++++++++++++++++++++++++++++------------------- 5 files changed, 71 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e316571..f6a31f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -189,6 +189,7 @@ dependencies = [ "cfg-if", "futures", "lazy_static", + "libc", "num_cpus", "pin-project", "recolored", @@ -199,9 +200,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.74" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "log" diff --git a/Cargo.toml b/Cargo.toml index d594ebd..92e54de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ cfg-if = "0.1" recolored = { version = "1.9", optional = true } num_cpus = "1.13" pin-project = {version = "0.4", optional = true} +libc = { version = "0.2.171", features = ["align"] } [build-dependencies] rustc_version = "0.2" diff --git a/src/main.rs b/src/main.rs index 40ce465..9e3bd23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,7 +59,7 @@ async fn work() -> Result<(), Box> } }, _ => args.max_children, - }, tokio::spawn(tokio::signal::ctrl_c().fuse())).await + }, tokio::signal::ctrl_c().boxed().fuse()).await } diff --git a/src/process.rs b/src/process.rs index 09b6001..d80318e 100644 --- a/src/process.rs +++ b/src/process.rs @@ -49,6 +49,26 @@ impl AsRef for Process } } +fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static +{ + use libc::{ + getpid, + setpgid, + pid_t + }; + move || { + //let this = unsafe { getpid() }; + + // SAFETY: We are setting the PGID of this (child) process to detach it from the group of the parent so signals are not propagated from the parent. + unsafe { + if setpgid(0, to) != 0 { + return Err(io::Error::last_os_error()); + } + } + Ok(()) + } +} + /// Spawn the process, and contain its standard output. /// /// # Notes @@ -69,20 +89,28 @@ where U: IntoIterator, let stderr = std::process::Stdio::inherit(); } }; - let mut child = match Command::new(process) + let mut child = Command::new(process); + let child = child .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? .args(args.into_iter()) .stdout(std::process::Stdio::piped()) .stderr(stderr) .stdin(std::process::Stdio::null()) - .kill_on_drop(false) -//.process_group(0) //XXX: It's this that is propagating the SIGINT!!! - .spawn() { - Ok(chi) => chi, - Err(sp) => { - return Err(Error::Spawning(sp)); - } - }; + .kill_on_drop(false); + + // SAFETY: This is only calling `setpgid()`. + unsafe { + child + .pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly + }; + + let mut child = match child.spawn() { + Ok(chi) => chi, + Err(sp) => { + return Err(Error::Spawning(sp)); + } + }; + let stdout = child.stdout.take().unwrap(); #[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = { diff --git a/src/work.rs b/src/work.rs index 57c8781..12d37eb 100644 --- a/src/work.rs +++ b/src/work.rs @@ -180,8 +180,13 @@ where I: IntoIterator, let results = pass_cancel!(join_all( files - .map(|filename| { + .map(|filename| -> OptionFuture<_> { let cancel = cancel.clone(); + + // Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has. + if cancel.is_aborted() { + return None.into(); + } let semaphore = semaphore.clone(); let process = Arc::clone(&process); @@ -189,17 +194,14 @@ where I: IntoIterator, #[cfg(feature="progress")] let mut progress = progress.clone(); - //TODO: Where to put the cancellation check in here?? (XXX: Current - //`AbortHandle` (old tokio version) does not have `is_aborted()`... :/) - (tokio::spawn(async move { + Some((tokio::spawn(async move { #[cfg(feature="progress")] type Opt = OptionFuture; #[cfg(not(feature="progress"))] type Opt = std::marker::PhantomData; let _task_id: Opt<_> = { - if cancel.is_aborted() { - return; - } let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; + + // Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has. if cancel.is_aborted() { return; } @@ -262,36 +264,44 @@ where I: IntoIterator, } else { let _ = progress.bump_min(1).await; } - }),i+=1).0 + }),i+=1).0).into() }))).await .map(|x| x .into_iter() - .filter_map(|x| x.err())); - + .filter_map(|x| x?.err())); + let results: Box> = match results { - Err(_) => { + Err(_) => { + let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!")); #[cfg(feature="progress")] { - progress.eprintln("[!] Child aborting...").await?.await?; - progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?; + + progress.eprintln(msg).await?.await?; + //progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?; } - #[cfg(not(feature="progress"))] eprintln!("[!] Child aborting..."); + #[cfg(not(feature="progress"))] eprintln!("{}", msg); - //todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); - // TODO: How to impl: Make a sparse-map of `usize`-keyed `MappedData<[AtomicRefCell>]> :: get_async(Process::next_uniq_id()) -> Pin<&mut SparseEntry<'_, T>>`, which keys are stored in a `std::bitset` analogue + // We do not have the results, return an empty iterator Box::new(std::iter::empty()) }, Ok(v) => Box::new(v), }; - for failed in results + for failed in results { - #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; - #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); + #[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; + #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); } #[cfg(feature="progress")] progress.shutdown().await?; drop(tx); display.await?; + + // Display task has completed, child tasks are complete; we are about to exit. + if cancel.is_aborted() { + eprintln!("[.] Running children complete, ignoring rest due to cancellation."); + } + + #[cfg(feature="progress")] prog_handle.await.expect("Child panic")?; Ok(()) }