Added SIGINT graceful cancellation.

Fortune for leanify-many's current commit: Small curse − 小凶
master
Avril 1 week ago
parent d7b46d7c9a
commit 2b65696512
Signed by: flanchan
GPG Key ID: 284488987C31F630

5
Cargo.lock generated

@ -189,6 +189,7 @@ dependencies = [
"cfg-if",
"futures",
"lazy_static",
"libc",
"num_cpus",
"pin-project",
"recolored",
@ -199,9 +200,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.74"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"

@ -51,6 +51,7 @@ cfg-if = "0.1"
recolored = { version = "1.9", optional = true }
num_cpus = "1.13"
pin-project = {version = "0.4", optional = true}
libc = { version = "0.2.171", features = ["align"] }
[build-dependencies]
rustc_version = "0.2"

@ -59,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
}
},
_ => args.max_children,
}, tokio::spawn(tokio::signal::ctrl_c().fuse())).await
}, tokio::signal::ctrl_c().boxed().fuse()).await
}

@ -49,6 +49,26 @@ impl AsRef<Path> for Process
}
}
fn set_process_group(to: i32) -> impl FnMut() -> io::Result<()> + Send + Sync + 'static
{
use libc::{
getpid,
setpgid,
pid_t
};
move || {
//let this = unsafe { getpid() };
// SAFETY: We are setting the PGID of this (child) process to detach it from the group of the parent so signals are not propagated from the parent.
unsafe {
if setpgid(0, to) != 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
/// Spawn the process, and contain its standard output.
///
/// # Notes
@ -69,20 +89,28 @@ where U: IntoIterator<Item=V>,
let stderr = std::process::Stdio::inherit();
}
};
let mut child = match Command::new(process)
let mut child = Command::new(process);
let child = child
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args.into_iter())
.stdout(std::process::Stdio::piped())
.stderr(stderr)
.stdin(std::process::Stdio::null())
.kill_on_drop(false)
//.process_group(0) //XXX: It's this that is propagating the SIGINT!!!
.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
.kill_on_drop(false);
// SAFETY: This is only calling `setpgid()`.
unsafe {
child
.pre_exec(set_process_group(0)) // NOTE: .process_group() does not exist in this version, so we hack it together outselves and call `setpgid()` directly
};
let mut child = match child.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap();
#[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = {

@ -180,8 +180,13 @@ where I: IntoIterator<Item=T>,
let results =
pass_cancel!(join_all(
files
.map(|filename| {
.map(|filename| -> OptionFuture<_> {
let cancel = cancel.clone();
// Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has.
if cancel.is_aborted() {
return None.into();
}
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
@ -189,17 +194,14 @@ where I: IntoIterator<Item=T>,
#[cfg(feature="progress")] let mut progress = progress.clone();
//TODO: Where to put the cancellation check in here?? (XXX: Current
//`AbortHandle` (old tokio version) does not have `is_aborted()`... :/)
(tokio::spawn(async move {
Some((tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = {
if cancel.is_aborted() {
return;
}
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
// Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has.
if cancel.is_aborted() {
return;
}
@ -262,36 +264,44 @@ where I: IntoIterator<Item=T>,
} else {
let _ = progress.bump_min(1).await;
}
}),i+=1).0
}),i+=1).0).into()
}))).await
.map(|x| x
.into_iter()
.filter_map(|x| x.err()));
.filter_map(|x| x?.err()));
let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results {
Err(_) => {
Err(_) => {
let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!"));
#[cfg(feature="progress")] {
progress.eprintln("[!] Child aborting...").await?.await?;
progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
progress.eprintln(msg).await?.await?;
//progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
}
#[cfg(not(feature="progress"))] eprintln!("[!] Child aborting...");
#[cfg(not(feature="progress"))] eprintln!("{}", msg);
//todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)");
// TODO: How to impl: Make a sparse-map of `usize`-keyed `MappedData<[AtomicRefCell<MaybeUninit<T>>]> :: get_async(Process::next_uniq_id()) -> Pin<&mut SparseEntry<'_, T>>`, which keys are stored in a `std::bitset<usize>` analogue
// We do not have the results, return an empty iterator
Box::new(std::iter::empty())
},
Ok(v) => Box::new(v),
};
for failed in results
for failed in results
{
#[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
#[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed);
}
#[cfg(feature="progress")] progress.shutdown().await?;
drop(tx);
display.await?;
// Display task has completed, child tasks are complete; we are about to exit.
if cancel.is_aborted() {
eprintln!("[.] Running children complete, ignoring rest due to cancellation.");
}
#[cfg(feature="progress")] prog_handle.await.expect("Child panic")?;
Ok(())
}

Loading…
Cancel
Save