Compare commits

...

4 Commits

Author SHA1 Message Date
Avril 7fa4004ebf
Version bump 1.2.0: Added SIGINT trapping & graceful cancellation: SIGINT does not terminate already-running child processes, just prevents more from being spawned and waits on the running ones.
1 week ago
Avril 2b65696512
Added SIGINT graceful cancellation.
1 week ago
Avril d7b46d7c9a
Cancelling works when it is NOT a signal because of children PGIDs...
1 week ago
Avril db20ca106f
Added CTRL+C cancel infrastructure into async work flow tree, but haven"t figured out how we can both stop adding new tasks *and* `await` on *all* current child processes before returning in error, which is what we need.
4 months ago

113
Cargo.lock generated

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "arc-swap"
version = "0.4.7"
@ -59,9 +61,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@ -74,9 +76,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@ -84,15 +86,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@ -101,42 +103,38 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"syn 2.0.100",
]
[[package]]
name = "futures-sink"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
dependencies = [
"once_cell",
]
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@ -145,10 +143,8 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-project-lite 0.2.16",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -188,11 +184,12 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "leanify-many"
version = "1.1.1"
version = "1.2.0"
dependencies = [
"cfg-if",
"futures",
"lazy_static",
"libc",
"num_cpus",
"pin-project",
"recolored",
@ -203,9 +200,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.74"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
@ -307,12 +304,6 @@ dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
name = "pin-project"
version = "0.4.23"
@ -330,7 +321,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.38",
]
[[package]]
@ -340,37 +331,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "proc-macro-nested"
version = "0.1.6"
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.19"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
dependencies = [
"unicode-xid",
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.7"
version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
dependencies = [
"proc-macro2",
]
@ -455,6 +440,17 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "syn"
version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "terminal_size"
version = "0.1.13"
@ -491,8 +487,9 @@ dependencies = [
"mio-named-pipes",
"mio-uds",
"num_cpus",
"pin-project-lite",
"pin-project-lite 0.1.7",
"signal-hook-registry",
"slab",
"tokio-macros",
"winapi 0.3.9",
]
@ -505,9 +502,15 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.38",
]
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "unicode-xid"
version = "0.2.1"

@ -1,6 +1,6 @@
[package]
name = "leanify-many"
version = "1.1.1"
version = "1.2.0"
description = "spawn leanify subprocesses"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -44,13 +44,14 @@ collect_err = []
[dependencies]
lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]}
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]}
futures = "0.3"
termprogress = {version="0.3", optional=true}
cfg-if = "0.1"
recolored = { version = "1.9", optional = true }
num_cpus = "1.13"
pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"] }
[build-dependencies]
rustc_version = "0.2"

@ -45,6 +45,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
}
use futures::future::FutureExt;
work::work(&args.flags, leanify, args.files, match args.flags.hard_limit {
Some(hard_limit) => {
// We have hard limit
@ -58,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
},
_ => args.max_children,
}).await
}, tokio::signal::ctrl_c().boxed().fuse()).await
}

@ -49,6 +49,26 @@ impl AsRef<Path> for Process
}
}
fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static
{
use libc::{
getpid,
setpgid,
pid_t
};
move || {
//let this = unsafe { getpid() };
// SAFETY: We are setting the PGID of this (child) process to detach it from the group of the parent so signals are not propagated from the parent.
unsafe {
if setpgid(0, to) != 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
/// Spawn the process, and contain its standard output.
///
/// # Notes
@ -69,19 +89,28 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit();
}
};
let mut child = match Command::new(process)
let mut child = Command::new(process);
let child = child
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args)
.args(args.into_iter())
.stdout(std::process::Stdio::piped())
.stderr(stderr)
.stdin(std::process::Stdio::null())
.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
.kill_on_drop(false);
// SAFETY: This is only calling `setpgid()`.
unsafe {
child
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly
};
let mut child = match child.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap();
#[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = {
@ -107,7 +136,7 @@ where U: IntoIterator<Item=V>,
}
});
let _ = task::yield_now().await;
match child.await {
match tokio::spawn(child).await.expect("Child supervisor panic") {
Ok(exit) => {
if exit.success() {
cfg_if!{

@ -59,6 +59,11 @@ pub enum CommandKind
AddTask(Task),
RemoveTask(usize),
RemoveAll {
lock: bool,
title: Option<Cow<'static, str>>,
},
Complete,
Many(Vec<CommandKind>),
@ -194,6 +199,12 @@ impl<'a> CommandBuilder<'a>
self
}
pub fn clear_tasks(&mut self, lock: bool, title: Option<&'static str>) -> &mut Self
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Borrowed), lock });
self
}
/// Signal a shutdown to the worker
pub fn shutdown(&mut self) -> &mut Self
{
@ -317,6 +328,11 @@ impl ProgressSender
self.send_command(CommandKind::RemoveTask(task_idx)).await
}
pub async fn clear_tasks(&mut self, lock: bool, title: Option<String>) -> Result<CommandWaiter, Error>
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Owned), lock }).await
}
/// Signal a shutdown to the worker
pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error>
{
@ -463,6 +479,20 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
}
},
CommandKind::RemoveAll{ title: None, lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
},
CommandKind::RemoveAll{ title: Some(title), lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, title.as_ref()).as_str());
},
CommandKind::Complete => {
if has_blanked {
progress.refresh();
@ -489,6 +519,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.complete();
}).await.map_err(|_| Error::WorkerPanic);
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
res
});
handle
@ -512,6 +543,22 @@ pub enum Error
Unknown,
}
impl Error
{
/// If this error comes from an attempt to communicate with the worker that failed but can be ignored.
///
/// # Usage
/// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so.
/// Those tasks can ignore those errors entirely if this function returns `true`.
pub fn can_ignore_on_com_send_failure(&self) -> bool
{
match self {
Self::WorkerDropped => true,
_ => false,
}
}
}
impl std::error::Error for Error{}
impl std::fmt::Display for Error
{

@ -1,5 +1,9 @@
use super::*;
use std::iter::FromIterator;
use std::sync::atomic::{
AtomicBool,
Ordering,
};
#[cfg(nightly)] type ListAbs<T> = std::collections::LinkedList<T>;
#[cfg(not(nightly))] type ListAbs<T> = Vec<T>;
@ -9,6 +13,8 @@ pub struct TaskList {
list: ListAbs<(usize, String)>,
buffer: String,
index: usize,
poisoned: AtomicBool,
}
fn find<T,F>(list: &ListAbs<T>, mut fun: F) -> Option<usize>
@ -32,6 +38,8 @@ impl TaskList
list: ListAbs::new(),
buffer: String::new(),
index: 0,
poisoned: AtomicBool::new(false),
}
}
@ -44,10 +52,15 @@ impl TaskList
/// Push a new task string, and return its ID.
pub fn push(&mut self, string: impl Into<String>) -> usize
{
if self.poisoned.load(Ordering::SeqCst) {
return 0;
}
let idx = {
self.index+=1;
self.index
};
#[cfg(nightly)] self.list.push_back((idx,string.into()));
#[cfg(not(nightly))] self.list.push((idx,string.into()));
@ -58,6 +71,10 @@ impl TaskList
/// Pop a task off the string, returns `true` if successful, `false` if wasn't found.
pub fn pop(&mut self, idx: usize) -> bool
{
if idx == 0 && self.poisoned.load(Ordering::Relaxed) {
return false;
}
if let Some(idx) = find(&self.list, |&(i, _)| idx == i)
{
self.list.remove(idx);
@ -68,6 +85,10 @@ impl TaskList
}
}
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst);
}
/// Clear the `TaskList`
pub fn clear(&mut self)
{
@ -110,6 +131,8 @@ impl<T> FromIterator<T> for TaskList
list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(),
buffer: Default::default(),
index: 0,
poisoned: AtomicBool::new(false),
};
this.index = i;
this.recalc();

@ -91,6 +91,7 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
};
let _ = tokio::task::yield_now().await;
//let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => {
@ -108,12 +109,28 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
}
}
pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>>
pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>, cancel: impl Future + Send + Unpin + 'static) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
T: AsRef<OsStr> + Send + Sync + 'static + Clone,
U: Into<PathBuf>
{
let (cancel, cancel_register) = {
use futures::future::AbortHandle;
let (handle, reg) = AbortHandle::new_pair();
let rh = handle.clone();
tokio::spawn(async move {
let _ = cancel.await;
handle.abort();
});
(rh, reg)
};
/// Make a future cancellable by the passed `cancel` token.
macro_rules! pass_cancel {
($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
}
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
@ -161,25 +178,38 @@ where I: IntoIterator<Item=T>,
};
let mut i=0usize;
let results =
join_all(
pass_cancel!(join_all(
files
.map(|filename| {
.map(|filename| -> OptionFuture<_> {
let cancel = cancel.clone();
// Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has.
if cancel.is_aborted() {
return None.into();
}
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
#[cfg(feature="progress")] let mut progress = progress.clone();
(tokio::spawn(async move {
Some((tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
// Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has.
if cancel.is_aborted() {
return;
}
// (task_id, worker_result)
let worker = {
cfg_if!{
if #[cfg(feature="progress")] {
if #[cfg(feature="progress")] {
let worker = do_work(&process, &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref()));
future::join(task, worker).await
@ -204,7 +234,10 @@ where I: IntoIterator<Item=T>,
#[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error));
#[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await
.or_else(|e| {
eprintln!("\n{}",e); Err(e)
if !e.can_ignore_on_com_send_failure() {
eprintln!("\n{}",e);
}
Err(e)
});
},
}
@ -231,20 +264,44 @@ where I: IntoIterator<Item=T>,
} else {
let _ = progress.bump_min(1).await;
}
}),i+=1).0
})).await
.into_iter()
.filter_map(|x| x.err());
}),i+=1).0).into()
}))).await
.map(|x| x
.into_iter()
.filter_map(|x| x?.err()));
let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results {
Err(_) => {
let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!"));
#[cfg(feature="progress")] {
progress.eprintln(msg).await?.await?;
//progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
}
#[cfg(not(feature="progress"))] eprintln!("{}", msg);
#[cfg(feature="progress")] progress.shutdown().await?;
for failed in results
// We do not have the results, return an empty iterator
Box::new(std::iter::empty())
},
Ok(v) => Box::new(v),
};
for failed in results
{
#[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed);
}
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx);
display.await?;
// Display task has completed, child tasks are complete; we are about to exit.
if cancel.is_aborted() {
eprintln!("[.] Running children complete, ignoring rest due to cancellation.");
}
#[cfg(feature="progress")] prog_handle.await.expect("Child panic")?;
Ok(())
}

Loading…
Cancel
Save