Compare commits

...

2 Commits

Author SHA1 Message Date
Avril 2b65696512
Added SIGINT graceful cancellation.
1 week ago
Avril d7b46d7c9a
Cancelling works when it is NOT a signal because of children PGIDs...
1 week ago

111
Cargo.lock generated

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "arc-swap"
version = "0.4.7"
@ -59,9 +61,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@ -74,9 +76,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@ -84,15 +86,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@ -101,42 +103,38 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"syn 2.0.100",
]
[[package]]
name = "futures-sink"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
dependencies = [
"once_cell",
]
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.5"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@ -145,10 +143,8 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-project-lite 0.2.16",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -193,6 +189,7 @@ dependencies = [
"cfg-if",
"futures",
"lazy_static",
"libc",
"num_cpus",
"pin-project",
"recolored",
@ -203,9 +200,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.74"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
@ -307,12 +304,6 @@ dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
name = "pin-project"
version = "0.4.23"
@ -330,7 +321,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.38",
]
[[package]]
@ -340,37 +331,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "proc-macro-nested"
version = "0.1.6"
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.19"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
dependencies = [
"unicode-xid",
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.7"
version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
dependencies = [
"proc-macro2",
]
@ -455,6 +440,17 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "syn"
version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "terminal_size"
version = "0.1.13"
@ -491,8 +487,9 @@ dependencies = [
"mio-named-pipes",
"mio-uds",
"num_cpus",
"pin-project-lite",
"pin-project-lite 0.1.7",
"signal-hook-registry",
"slab",
"tokio-macros",
"winapi 0.3.9",
]
@ -505,9 +502,15 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.38",
]
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "unicode-xid"
version = "0.2.1"

@ -44,13 +44,14 @@ collect_err = []
[dependencies]
lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]}
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]}
futures = "0.3"
termprogress = {version="0.3", optional=true}
cfg-if = "0.1"
recolored = { version = "1.9", optional = true }
num_cpus = "1.13"
pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"] }
[build-dependencies]
rustc_version = "0.2"

@ -59,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
},
_ => args.max_children,
}, tokio::signal::ctrl_c().boxed()).await
}, tokio::signal::ctrl_c().boxed().fuse()).await
}

@ -49,6 +49,26 @@ impl AsRef<Path> for Process
}
}
fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static
{
use libc::{
getpid,
setpgid,
pid_t
};
move || {
//let this = unsafe { getpid() };
// SAFETY: We are setting the PGID of this (child) process to detach it from the group of the parent so signals are not propagated from the parent.
unsafe {
if setpgid(0, to) != 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
/// Spawn the process, and contain its standard output.
///
/// # Notes
@ -69,19 +89,28 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit();
}
};
let mut child = match Command::new(process)
let mut child = Command::new(process);
let child = child
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args)
.args(args.into_iter())
.stdout(std::process::Stdio::piped())
.stderr(stderr)
.stdin(std::process::Stdio::null())
.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
.kill_on_drop(false);
// SAFETY: This is only calling `setpgid()`.
unsafe {
child
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly
};
let mut child = match child.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap();
#[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = {
@ -107,7 +136,7 @@ where U: IntoIterator<Item=V>,
}
});
let _ = task::yield_now().await;
match child.await {
match tokio::spawn(child).await.expect("Child supervisor panic") {
Ok(exit) => {
if exit.success() {
cfg_if!{

@ -519,6 +519,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.complete();
}).await.map_err(|_| Error::WorkerPanic);
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
res
});
handle
@ -542,6 +543,22 @@ pub enum Error
Unknown,
}
impl Error
{
/// If this error comes from an attempt to communicate with the worker that failed but can be ignored.
///
/// # Usage
/// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so.
/// Those tasks can ignore those errors entirely if this function returns `true`.
pub fn can_ignore_on_com_send_failure(&self) -> bool
{
match self {
Self::WorkerDropped => true,
_ => false,
}
}
}
impl std::error::Error for Error{}
impl std::fmt::Display for Error
{

@ -91,6 +91,7 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
};
let _ = tokio::task::yield_now().await;
//let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => {
@ -127,9 +128,9 @@ where I: IntoIterator<Item=T>,
};
/// Make a future cancellable by the passed `cancel` token.
macro_rules! pass_cancel {
($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
}
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
@ -179,22 +180,32 @@ where I: IntoIterator<Item=T>,
let results =
pass_cancel!(join_all(
files
.map(|filename| {
.map(|filename| -> OptionFuture<_> {
let cancel = cancel.clone();
// Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has.
if cancel.is_aborted() {
return None.into();
}
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
let cancel = cancel.clone();
#[cfg(feature="progress")] let mut progress = progress.clone();
//TODO: Where to put the cancellation check in here?? (XXX: Current
//`AbortHandle` (old tokio version) does not have `is_aborted()`... :/)
(tokio::spawn(async move {
Some((tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
// Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has.
if cancel.is_aborted() {
return;
}
// (task_id, worker_result)
let worker = {
cfg_if!{
@ -223,7 +234,10 @@ where I: IntoIterator<Item=T>,
#[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error));
#[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await
.or_else(|e| {
eprintln!("\n{}",e); Err(e)
if !e.can_ignore_on_com_send_failure() {
eprintln!("\n{}",e);
}
Err(e)
});
},
}
@ -250,34 +264,44 @@ where I: IntoIterator<Item=T>,
} else {
let _ = progress.bump_min(1).await;
}
}),i+=1).0
}),i+=1).0).into()
}))).await
.map(|x| x
.into_iter()
.filter_map(|x| x.err()));
let results = match results {
Err(_) => {
.into_iter()
.filter_map(|x| x?.err()));
let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results {
Err(_) => {
let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!"));
#[cfg(feature="progress")] {
progress.eprintln("[!] Child aborting...").await?.await?;
progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
progress.eprintln(msg).await?.await?;
//progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
}
#[cfg(not(feature="progress"))] eprintln!("[!] Child aborting...");
#[cfg(not(feature="progress"))] eprintln!("{}", msg);
todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)");
// We do not have the results, return an empty iterator
Box::new(std::iter::empty())
},
Ok(v) => v,
Ok(v) => Box::new(v),
};
#[cfg(feature="progress")] progress.shutdown().await?;
for failed in results
for failed in results
{
#[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed);
}
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx);
display.await?;
// Display task has completed, child tasks are complete; we are about to exit.
if cancel.is_aborted() {
eprintln!("[.] Running children complete, ignoring rest due to cancellation.");
}
#[cfg(feature="progress")] prog_handle.await.expect("Child panic")?;
Ok(())
}

Loading…
Cancel
Save