From 29d45438eed427984a0ce4a29cf9a190cce4eb33 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 24 Mar 2025 14:36:01 +0000 Subject: [PATCH] Added *optional* SIGINT trapping at compile-time (feature="shutdown") and runtime (`-n`/`--no-cancel` to disable, default is to catch gracefully.) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for leanify-many's current commit: Small curse − 小凶 --- Cargo.lock | 2 +- Cargo.toml | 13 ++++-- src/arg.rs | 12 +++++- src/main.rs | 28 +++++++++++- src/process.rs | 110 +++++++++++++++++++++++++++++++++++++++++++++--- src/progress.rs | 2 + src/work.rs | 12 +++--- 7 files changed, 161 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edbb30f..d9114d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,7 +184,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "leanify-many" -version = "1.2.0" +version = "1.2.0+1" dependencies = [ "cfg-if", "futures", diff --git a/Cargo.toml b/Cargo.toml index 57fa10b..72d58e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "leanify-many" -version = "1.2.0" +version = "1.2.0+1" description = "spawn leanify subprocesses" authors = ["Avril "] edition = "2018" @@ -16,7 +16,7 @@ lto = "fat" codegen-units = 1 [features] -default = ["splash", "progress", "colour", "collect_err"] +default = ["splash", "progress", "colour", "collect_err", "shutdown"] # Enable progress bar progress = ["termprogress", "pin-project"] @@ -42,6 +42,13 @@ checked_pass = [] # Note: Disabling this can cause weird deadlock bugs sometimes collect_err = [] +# Enables the `SIGINT` trapping & graceful shutdown of the hypervisor. +# +# This causes the queue to stop being processes, and any already running `leanify` subprocesses to be awaited on before the shutdown. +# Without this feature enabled, `leanify` subprocesses will receive terminating `SIGINT`s as normal. +shutdown = ["libc"] + + [dependencies] lazy_static = "1.4" tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]} @@ -51,7 +58,7 @@ cfg-if = "0.1" recolored = { version = "1.9", optional = true } num_cpus = "1.13" pin-project = {version = "0.4", optional = true} -libc = { version = "0.2.171", features = ["align"] } +libc = { version = "0.2.171", features = ["align"], optional = true } [build-dependencies] rustc_version = "0.2" diff --git a/src/arg.rs b/src/arg.rs index 3384651..9091459 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -29,7 +29,8 @@ mod extra { { #[cfg(feature="progress")] writeln!(output, " --no-progress Do not display progress bar")?; #[cfg(feature="colour")] writeln!(output, " --no-colour Do not display terminal colours")?; - #[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?; + #[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?; + #[cfg(feature="shutdown")] writeln!(output, " -n, --no-cancel Do not capture `SIGINT`s for graceful shutdown.")?; Ok(()) } @@ -165,6 +166,7 @@ fn comp_flags() check!(on "collect_err", "Collect the output of children's stderr instead of printing immediately"); check!(off "threads", "Enable threaded scheduler (usually not needed)"); check!(off "checked_pass", "Check the arguments passed with `--passthrough` to leanify. By default they are passed as is"); + check!(on "shutdown", "Enables the trapping of `SIGINT` for graceful shutdowns, allowing already running children to complete before exiting."); } pub fn usage() -> ! @@ -262,6 +264,8 @@ pub struct Flags pub hard_limit: Option, /// Other flags to pass to the `leanify` subprocesses pub leanify_flags: flags::LeanifyFlags, + /// Enable graceful-shutdown on `SIGINT`. + #[cfg(feature="shutdown")] pub graceful_shutdown: bool, } impl Default for Flags @@ -274,6 +278,7 @@ impl Default for Flags #[cfg(feature="colour")] coloured: None, hard_limit: None, leanify_flags: Default::default(), + #[cfg(feature="shutdown")] graceful_shutdown: true, } } } @@ -335,6 +340,11 @@ where I: IntoIterator, first=false; match lw_arg.as_str() { + #[cfg(feature="shutdown")] + "--no-cancel" | "-n" => { + cfg.flags.graceful_shutdown = false; + continue; + }, "--max-children" => { if let Some(nzi) = args.next() { cfg.max_children = Some(nzi.parse()?); diff --git a/src/main.rs b/src/main.rs index 9e3bd23..e09cbd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,32 @@ async fn work() -> Result<(), Box> } use futures::future::FutureExt; + + /// Gets the graceful shutdown future if there is one both allowed by the feature flags (see `shutdown` feature) and not disabled by user-provided argument(s). + /// + /// __NOTE__: This is extracted from the cancel future expression in the call to `work::work()` due to the requirement of the use of `cfg_if!` in a non-expression context only. + #[inline(always)] + fn get_shutdown_future(_flags: &arg::Flags) -> impl futures::future::Future + Send + Unpin + 'static + { + cfg_if! { + if #[cfg(feature="shutdown")] { + use futures::future::{ + self, + // OptionFuture, // NOTE: Not used here as `OptionFuture` returns *immediately* on `None`, instead of never on `None`, which is what we want. + Either, + }; + + return _flags.graceful_shutdown + .then(|| Either::Left(tokio::signal::ctrl_c().boxed())) //TODO: Inside `ctrl_c()` handler, make a 2nd interrupt send `SIGINT` to the children maybe? + .unwrap_or(Either::Right(future::pending())); + } else if #[cfg(nightly)] { + return futures::future::pending::(); + } else { + return futures::future::pending::(); + } + } + } + work::work(&args.flags, leanify, args.files, match args.flags.hard_limit { Some(hard_limit) => { // We have hard limit @@ -59,7 +85,7 @@ async fn work() -> Result<(), Box> } }, _ => args.max_children, - }, tokio::signal::ctrl_c().boxed().fuse()).await + }, get_shutdown_future(&args.flags).fuse()).await } diff --git a/src/process.rs b/src/process.rs index d80318e..c5db335 100644 --- a/src/process.rs +++ b/src/process.rs @@ -49,12 +49,13 @@ impl AsRef for Process } } +#[cfg(feature="shutdown")] fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static { use libc::{ - getpid, + //getpid, // Not required: `setpgid(0, ...)` works on the current (*child* in this case) process. setpgid, - pid_t + //pid_t, // Not required: Result & arguments for `getpgid()`, deduced implicitly. }; move || { //let this = unsafe { getpid() }; @@ -69,11 +70,61 @@ fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + } } +/// Options passed to `contained_spawn()` to control how the process spawning intricicies are handled. +/// +/// Usage: `contained_spawn(Some(&config.into()), ...)` +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub struct SpawnOptions<'cli> { + cli_flags: &'cli crate::arg::Flags, +} + +impl<'cli> From<&'cli crate::arg::Config> for SpawnOptions<'cli> +{ + #[inline] + fn from(from: &'cli crate::arg::Config) -> Self + { + Self::with_flags(&from.flags) + } +} + +impl<'cli> From<&'cli crate::arg::Flags> for SpawnOptions<'cli> +{ + #[inline] + fn from(from: &'cli crate::arg::Flags) -> Self + { + Self::with_flags(from) + } +} + +impl<'cli> SpawnOptions<'cli> +{ + /// Create a new `SpawnOptions` instance using the specified command-line flags. + #[inline(always)] + pub const fn with_flags(cli_flags: &'cli crate::arg::Flags) -> Self + { + Self { cli_flags } + } + + /// Create a new `SpawnOptions` instance using the optionally specified command-line flags. + #[inline(always)] + pub fn new(config: Option<&'cli crate::arg::Config>) -> Option + { + config.map(Into::into) + } + + /// Create a new `SpawnOptions` instance using the optionally specified command-line flags. + #[inline(always)] + pub fn try_with_flags(config: Option<&'cli crate::arg::Flags>) -> Option + { + config.map(Self::with_flags) + } +} + /// Spawn the process, and contain its standard output. /// /// # Notes /// Standard error is printed immediately instead. -pub async fn contained_spawn(process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error> +pub async fn contained_spawn(_cli: Option>, process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error> where U: IntoIterator, V: AsRef, T: AsRef @@ -89,6 +140,47 @@ where U: IntoIterator, let stderr = std::process::Stdio::inherit(); } }; + + /// Retrieve a command-line flag if command-line flags have been provided. + /// + /// # Usage + /// Retrieve value-or-default (**eager**): `cli_flag!([ref] name_of_field ?? default_value)`. + /// If `ref` is provided, `default_value` must be able to coerce to *a reference of* the same type as `name_of_field`. + /// + /// Retrieve value-or-default lazily: `cli_flag!([ref] name_of_field ?? do default_value_thunk)`. + /// If `ref` is provided, the thunk/function must return a reference pointing to the same type as `name_of_field` with a compatible lifetime. + /// + /// Retrieve value inside an `Option`: `cli_flag!([ref] name_of_field ?)`. + /// If `ref` is provided, the returned Option will be `Option<&'_ T>` where `T` is the type of the field, otherwise, it will return `Option`. + #[allow(unused_macros)] // NOTE: This macro might not be used if certain feature flags are not set. + macro_rules! cli_flag { + ($name:ident ?? do $default:expr) => { + _cli.as_ref().map(|opt| opt.cli_flags.$name).unwrap_or_else($default) + }; + ($name:ident ?? $default:expr) => { + _cli.as_ref().map(|opt| opt.cli_flags.$name).unwrap_or(From::from($default)) + }; + ($name:ident ?) => { + _cli.as_ref().map(|opt| opt.cli_flags.$name) + }; + ($name:ident) => { + cli_flag!($name ?? Default::default()) + }; + + (ref $name:ident ?? do $default:expr) => { + _cli.as_ref().map(|opt| &opt.cli_flags.$name).unwrap_or_else($default) + }; + (ref $name:ident ?? $default:expr) => { + _cli.as_ref().map(|opt| &opt.cli_flags.$name).unwrap_or($default) + }; + (ref $name:ident ?) => { + _cli.as_ref().map(|opt| &opt.cli_flags.$name) + }; + (ref $name:ident) => { + cli_flag!(ref $name ?? Default::default()) + }; + } + let mut child = Command::new(process); let child = child .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? @@ -98,10 +190,14 @@ where U: IntoIterator, .stdin(std::process::Stdio::null()) .kill_on_drop(false); - // SAFETY: This is only calling `setpgid()`. - unsafe { - child - .pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly + #[cfg(feature="shutdown")] + if cli_flag!(graceful_shutdown ?? true) { + // SAFETY: This is only calling the `setpgid()` syscall. + unsafe { + child + .pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly + + }; }; let mut child = match child.spawn() { diff --git a/src/progress.rs b/src/progress.rs index 3a741ff..55967bc 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -1,3 +1,5 @@ +#![allow(unused_must_use)] // Suppress `#[pin_project]` spamming warnings on build. + use super::*; use pin_project::pin_project; use std::{ diff --git a/src/work.rs b/src/work.rs index 12d37eb..2a9519d 100644 --- a/src/work.rs +++ b/src/work.rs @@ -52,7 +52,7 @@ type Error = process::Error; #[allow(unused_mut)] #[allow(unused_variables)] -async fn do_work(process: impl AsRef, file: impl AsRef, mut prog: ProgressSender) -> Result, Error> +async fn do_work<'cli>(flags: Option<&'cli arg::Flags>, process: impl AsRef, file: impl AsRef, mut prog: ProgressSender) -> Result, Error> { let file = file.as_ref(); @@ -92,7 +92,7 @@ async fn do_work(process: impl AsRef, file: impl AsRef, mut prog let _ = tokio::task::yield_now().await; //let _ = opt_await.await; - match process::contained_spawn(process, std::iter::once(file), tx).await { + match process::contained_spawn(flags.map(Into::into), process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Err(error) => { cfg_if! { @@ -187,10 +187,11 @@ where I: IntoIterator, if cancel.is_aborted() { return None.into(); } - let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); + + let flags = flags.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -208,15 +209,16 @@ where I: IntoIterator, // (task_id, worker_result) let worker = { + let flags = &flags; cfg_if!{ if #[cfg(feature="progress")] { - let worker = do_work(&process, &filename, progress.clone()); + let worker = do_work(Some(flags), &process, &filename, progress.clone()); let task = progress.add_task(format!("{:?}", filename.as_ref())); future::join(task, worker).await } else { #[cfg(nightly)] type NoReturn = !; #[cfg(not(nightly))] type NoReturn = (); - (Option::::None, do_work(&process, &filename, ()).await) + (Option::::None, do_work(Some(flags), &process, &filename, ()).await) } } };