Compare commits

...

2 Commits

2
Cargo.lock generated

@ -184,7 +184,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "leanify-many"
version = "1.1.1"
version = "1.2.0+1"
dependencies = [
"cfg-if",
"futures",

@ -1,6 +1,6 @@
[package]
name = "leanify-many"
version = "1.1.1"
version = "1.2.0+1"
description = "spawn leanify subprocesses"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1
[features]
default = ["splash", "progress", "colour", "collect_err"]
default = ["splash", "progress", "colour", "collect_err", "shutdown"]
# Enable progress bar
progress = ["termprogress", "pin-project"]
@ -42,6 +42,13 @@ checked_pass = []
# Note: Disabling this can cause weird deadlock bugs sometimes
collect_err = []
# Enables the `SIGINT` trapping & graceful shutdown of the hypervisor.
#
# This causes the queue to stop being processes, and any already running `leanify` subprocesses to be awaited on before the shutdown.
# Without this feature enabled, `leanify` subprocesses will receive terminating `SIGINT`s as normal.
shutdown = ["libc"]
[dependencies]
lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]}
@ -51,7 +58,7 @@ cfg-if = "0.1"
recolored = { version = "1.9", optional = true }
num_cpus = "1.13"
pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"] }
libc = { version = "0.2.171", features = ["align"], optional = true }
[build-dependencies]
rustc_version = "0.2"

@ -29,7 +29,8 @@ mod extra {
{
#[cfg(feature="progress")] writeln!(output, " --no-progress Do not display progress bar")?;
#[cfg(feature="colour")] writeln!(output, " --no-colour Do not display terminal colours")?;
#[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?;
#[cfg(feature="colour")] writeln!(output, " --colour Always display terminal colour, even if env flags tell us not to")?;
#[cfg(feature="shutdown")] writeln!(output, " -n, --no-cancel Do not capture `SIGINT`s for graceful shutdown.")?;
Ok(())
}
@ -165,6 +166,7 @@ fn comp_flags()
check!(on "collect_err", "Collect the output of children's stderr instead of printing immediately");
check!(off "threads", "Enable threaded scheduler (usually not needed)");
check!(off "checked_pass", "Check the arguments passed with `--passthrough` to leanify. By default they are passed as is");
check!(on "shutdown", "Enables the trapping of `SIGINT` for graceful shutdowns, allowing already running children to complete before exiting.");
}
pub fn usage() -> !
@ -262,6 +264,8 @@ pub struct Flags
pub hard_limit: Option<NonZeroUsize>,
/// Other flags to pass to the `leanify` subprocesses
pub leanify_flags: flags::LeanifyFlags,
/// Enable graceful-shutdown on `SIGINT`.
#[cfg(feature="shutdown")] pub graceful_shutdown: bool,
}
impl Default for Flags
@ -274,6 +278,7 @@ impl Default for Flags
#[cfg(feature="colour")] coloured: None,
hard_limit: None,
leanify_flags: Default::default(),
#[cfg(feature="shutdown")] graceful_shutdown: true,
}
}
}
@ -335,6 +340,11 @@ where I: IntoIterator<Item=T>,
first=false;
match lw_arg.as_str() {
#[cfg(feature="shutdown")]
"--no-cancel" | "-n" => {
cfg.flags.graceful_shutdown = false;
continue;
},
"--max-children" => {
if let Some(nzi) = args.next() {
cfg.max_children = Some(nzi.parse()?);

@ -46,6 +46,32 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
use futures::future::FutureExt;
/// Gets the graceful shutdown future if there is one both allowed by the feature flags (see `shutdown` feature) and not disabled by user-provided argument(s).
///
/// __NOTE__: This is extracted from the cancel future expression in the call to `work::work()` due to the requirement of the use of `cfg_if!` in a non-expression context only.
#[inline(always)]
fn get_shutdown_future(_flags: &arg::Flags) -> impl futures::future::Future + Send + Unpin + 'static
{
cfg_if! {
if #[cfg(feature="shutdown")] {
use futures::future::{
self,
// OptionFuture, // NOTE: Not used here as `OptionFuture` returns *immediately* on `None`, instead of never on `None`, which is what we want.
Either,
};
return _flags.graceful_shutdown
.then(|| Either::Left(tokio::signal::ctrl_c().boxed())) //TODO: Inside `ctrl_c()` handler, make a 2nd interrupt send `SIGINT` to the children maybe?
.unwrap_or(Either::Right(future::pending()));
} else if #[cfg(nightly)] {
return futures::future::pending::<!>();
} else {
return futures::future::pending::<std::convert::Infallible>();
}
}
}
work::work(&args.flags, leanify, args.files, match args.flags.hard_limit {
Some(hard_limit) => {
// We have hard limit
@ -59,7 +85,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
},
_ => args.max_children,
}, tokio::signal::ctrl_c().boxed().fuse()).await
}, get_shutdown_future(&args.flags).fuse()).await
}

@ -49,12 +49,13 @@ impl AsRef<Path> for Process
}
}
#[cfg(feature="shutdown")]
fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static
{
use libc::{
getpid,
//getpid, // Not required: `setpgid(0, ...)` works on the current (*child* in this case) process.
setpgid,
pid_t
//pid_t, // Not required: Result & arguments for `getpgid()`, deduced implicitly.
};
move || {
//let this = unsafe { getpid() };
@ -69,11 +70,61 @@ fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync +
}
}
/// Options passed to `contained_spawn()` to control how the process spawning intricicies are handled.
///
/// Usage: `contained_spawn(Some(&config.into()), ...)`
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub struct SpawnOptions<'cli> {
cli_flags: &'cli crate::arg::Flags,
}
impl<'cli> From<&'cli crate::arg::Config> for SpawnOptions<'cli>
{
#[inline]
fn from(from: &'cli crate::arg::Config) -> Self
{
Self::with_flags(&from.flags)
}
}
impl<'cli> From<&'cli crate::arg::Flags> for SpawnOptions<'cli>
{
#[inline]
fn from(from: &'cli crate::arg::Flags) -> Self
{
Self::with_flags(from)
}
}
impl<'cli> SpawnOptions<'cli>
{
/// Create a new `SpawnOptions` instance using the specified command-line flags.
#[inline(always)]
pub const fn with_flags(cli_flags: &'cli crate::arg::Flags) -> Self
{
Self { cli_flags }
}
/// Create a new `SpawnOptions` instance using the optionally specified command-line flags.
#[inline(always)]
pub fn new(config: Option<&'cli crate::arg::Config>) -> Option<Self>
{
config.map(Into::into)
}
/// Create a new `SpawnOptions` instance using the optionally specified command-line flags.
#[inline(always)]
pub fn try_with_flags(config: Option<&'cli crate::arg::Flags>) -> Option<Self>
{
config.map(Self::with_flags)
}
}
/// Spawn the process, and contain its standard output.
///
/// # Notes
/// Standard error is printed immediately instead.
pub async fn contained_spawn<T,U,V>(process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error>
pub async fn contained_spawn<T,U,V>(_cli: Option<SpawnOptions<'_>>, process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error>
where U: IntoIterator<Item=V>,
V: AsRef<std::ffi::OsStr>,
T: AsRef<Process>
@ -89,6 +140,47 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit();
}
};
/// Retrieve a command-line flag if command-line flags have been provided.
///
/// # Usage
/// Retrieve value-or-default (**eager**): `cli_flag!([ref] name_of_field ?? default_value)`.
/// If `ref` is provided, `default_value` must be able to coerce to *a reference of* the same type as `name_of_field`.
///
/// Retrieve value-or-default lazily: `cli_flag!([ref] name_of_field ?? do default_value_thunk)`.
/// If `ref` is provided, the thunk/function must return a reference pointing to the same type as `name_of_field` with a compatible lifetime.
///
/// Retrieve value inside an `Option`: `cli_flag!([ref] name_of_field ?)`.
/// If `ref` is provided, the returned Option will be `Option<&'_ T>` where `T` is the type of the field, otherwise, it will return `Option<T>`.
#[allow(unused_macros)] // NOTE: This macro might not be used if certain feature flags are not set.
macro_rules! cli_flag {
($name:ident ?? do $default:expr) => {
_cli.as_ref().map(|opt| opt.cli_flags.$name).unwrap_or_else($default)
};
($name:ident ?? $default:expr) => {
_cli.as_ref().map(|opt| opt.cli_flags.$name).unwrap_or(From::from($default))
};
($name:ident ?) => {
_cli.as_ref().map(|opt| opt.cli_flags.$name)
};
($name:ident) => {
cli_flag!($name ?? Default::default())
};
(ref $name:ident ?? do $default:expr) => {
_cli.as_ref().map(|opt| &opt.cli_flags.$name).unwrap_or_else($default)
};
(ref $name:ident ?? $default:expr) => {
_cli.as_ref().map(|opt| &opt.cli_flags.$name).unwrap_or($default)
};
(ref $name:ident ?) => {
_cli.as_ref().map(|opt| &opt.cli_flags.$name)
};
(ref $name:ident) => {
cli_flag!(ref $name ?? Default::default())
};
}
let mut child = Command::new(process);
let child = child
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
@ -98,10 +190,14 @@ where U: IntoIterator<Item=V>,
.stdin(std::process::Stdio::null())
.kill_on_drop(false);
// SAFETY: This is only calling `setpgid()`.
unsafe {
child
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly
#[cfg(feature="shutdown")]
if cli_flag!(graceful_shutdown ?? true) {
// SAFETY: This is only calling the `setpgid()` syscall.
unsafe {
child
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly
};
};
let mut child = match child.spawn() {

@ -1,3 +1,5 @@
#![allow(unused_must_use)] // Suppress `#[pin_project]` spamming warnings on build.
use super::*;
use pin_project::pin_project;
use std::{

@ -52,7 +52,7 @@ type Error = process::Error;
#[allow(unused_mut)]
#[allow(unused_variables)]
async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog: ProgressSender) -> Result<fixed_stack::IntoIter<(bool, String)>, Error>
async fn do_work<'cli>(flags: Option<&'cli arg::Flags>, process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog: ProgressSender) -> Result<fixed_stack::IntoIter<(bool, String)>, Error>
{
let file = file.as_ref();
@ -92,7 +92,7 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
let _ = tokio::task::yield_now().await;
//let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
match process::contained_spawn(flags.map(Into::into), process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => {
cfg_if! {
@ -187,10 +187,11 @@ where I: IntoIterator<Item=T>,
if cancel.is_aborted() {
return None.into();
}
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
let flags = flags.clone();
#[cfg(feature="progress")] let mut progress = progress.clone();
@ -208,15 +209,16 @@ where I: IntoIterator<Item=T>,
// (task_id, worker_result)
let worker = {
let flags = &flags;
cfg_if!{
if #[cfg(feature="progress")] {
let worker = do_work(&process, &filename, progress.clone());
let worker = do_work(Some(flags), &process, &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref()));
future::join(task, worker).await
} else {
#[cfg(nightly)] type NoReturn = !;
#[cfg(not(nightly))] type NoReturn = ();
(Option::<NoReturn>::None, do_work(&process, &filename, ()).await)
(Option::<NoReturn>::None, do_work(Some(flags), &process, &filename, ()).await)
}
}
};

Loading…
Cancel
Save