Added CTRL+C cancel infrastructure into async work flow tree, but haven"t figured out how we can both stop adding new tasks *and* `await` on *all* current child processes before returning in error, which is what we need.

In a seperate branch right now since I need to re-learn this codebase to add this properly i think...

Fortune for leanify-many's current commit: Blessing − 吉
safe-cancel-interrupt
Avril 2 days ago
parent 354cac4a50
commit db20ca106f
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -45,6 +45,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
} }
} }
use futures::future::FutureExt;
work::work(&args.flags, leanify, args.files, match args.flags.hard_limit { work::work(&args.flags, leanify, args.files, match args.flags.hard_limit {
Some(hard_limit) => { Some(hard_limit) => {
// We have hard limit // We have hard limit
@ -58,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>>
} }
}, },
_ => args.max_children, _ => args.max_children,
}).await }, tokio::signal::ctrl_c().boxed()).await
} }

@ -59,6 +59,11 @@ pub enum CommandKind
AddTask(Task), AddTask(Task),
RemoveTask(usize), RemoveTask(usize),
RemoveAll {
lock: bool,
title: Option<Cow<'static, str>>,
},
Complete, Complete,
Many(Vec<CommandKind>), Many(Vec<CommandKind>),
@ -194,6 +199,12 @@ impl<'a> CommandBuilder<'a>
self self
} }
pub fn clear_tasks(&mut self, lock: bool, title: Option<&'static str>) -> &mut Self
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Borrowed), lock });
self
}
/// Signal a shutdown to the worker /// Signal a shutdown to the worker
pub fn shutdown(&mut self) -> &mut Self pub fn shutdown(&mut self) -> &mut Self
{ {
@ -317,6 +328,11 @@ impl ProgressSender
self.send_command(CommandKind::RemoveTask(task_idx)).await self.send_command(CommandKind::RemoveTask(task_idx)).await
} }
pub async fn clear_tasks(&mut self, lock: bool, title: Option<String>) -> Result<CommandWaiter, Error>
{
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Owned), lock }).await
}
/// Signal a shutdown to the worker /// Signal a shutdown to the worker
pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error> pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error>
{ {
@ -463,6 +479,20 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str()); progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
} }
}, },
CommandKind::RemoveAll{ title: None, lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
},
CommandKind::RemoveAll{ title: Some(title), lock } => {
if lock {
list.poison();
}
list.clear();
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, title.as_ref()).as_str());
},
CommandKind::Complete => { CommandKind::Complete => {
if has_blanked { if has_blanked {
progress.refresh(); progress.refresh();

@ -1,5 +1,9 @@
use super::*; use super::*;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::sync::atomic::{
AtomicBool,
Ordering,
};
#[cfg(nightly)] type ListAbs<T> = std::collections::LinkedList<T>; #[cfg(nightly)] type ListAbs<T> = std::collections::LinkedList<T>;
#[cfg(not(nightly))] type ListAbs<T> = Vec<T>; #[cfg(not(nightly))] type ListAbs<T> = Vec<T>;
@ -9,6 +13,8 @@ pub struct TaskList {
list: ListAbs<(usize, String)>, list: ListAbs<(usize, String)>,
buffer: String, buffer: String,
index: usize, index: usize,
poisoned: AtomicBool,
} }
fn find<T,F>(list: &ListAbs<T>, mut fun: F) -> Option<usize> fn find<T,F>(list: &ListAbs<T>, mut fun: F) -> Option<usize>
@ -32,6 +38,8 @@ impl TaskList
list: ListAbs::new(), list: ListAbs::new(),
buffer: String::new(), buffer: String::new(),
index: 0, index: 0,
poisoned: AtomicBool::new(false),
} }
} }
@ -44,10 +52,15 @@ impl TaskList
/// Push a new task string, and return its ID. /// Push a new task string, and return its ID.
pub fn push(&mut self, string: impl Into<String>) -> usize pub fn push(&mut self, string: impl Into<String>) -> usize
{ {
if self.poisoned.load(Ordering::SeqCst) {
return 0;
}
let idx = { let idx = {
self.index+=1; self.index+=1;
self.index self.index
}; };
#[cfg(nightly)] self.list.push_back((idx,string.into())); #[cfg(nightly)] self.list.push_back((idx,string.into()));
#[cfg(not(nightly))] self.list.push((idx,string.into())); #[cfg(not(nightly))] self.list.push((idx,string.into()));
@ -58,6 +71,10 @@ impl TaskList
/// Pop a task off the string, returns `true` if successful, `false` if wasn't found. /// Pop a task off the string, returns `true` if successful, `false` if wasn't found.
pub fn pop(&mut self, idx: usize) -> bool pub fn pop(&mut self, idx: usize) -> bool
{ {
if idx == 0 && self.poisoned.load(Ordering::Relaxed) {
return false;
}
if let Some(idx) = find(&self.list, |&(i, _)| idx == i) if let Some(idx) = find(&self.list, |&(i, _)| idx == i)
{ {
self.list.remove(idx); self.list.remove(idx);
@ -68,6 +85,10 @@ impl TaskList
} }
} }
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst);
}
/// Clear the `TaskList` /// Clear the `TaskList`
pub fn clear(&mut self) pub fn clear(&mut self)
{ {
@ -110,6 +131,8 @@ impl<T> FromIterator<T> for TaskList
list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(), list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(),
buffer: Default::default(), buffer: Default::default(),
index: 0, index: 0,
poisoned: AtomicBool::new(false),
}; };
this.index = i; this.index = i;
this.recalc(); this.recalc();

@ -108,12 +108,28 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog
} }
} }
pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>> pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>, cancel: impl Future + Send + Unpin + 'static) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>, where I: IntoIterator<Item=T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator, <I as IntoIterator>::IntoIter: ExactSizeIterator,
T: AsRef<OsStr> + Send + Sync + 'static + Clone, T: AsRef<OsStr> + Send + Sync + 'static + Clone,
U: Into<PathBuf> U: Into<PathBuf>
{ {
let (cancel, cancel_register) = {
use futures::future::AbortHandle;
let (handle, reg) = AbortHandle::new_pair();
let rh = handle.clone();
tokio::spawn(async move {
let _ = cancel.await;
handle.abort();
});
(rh, reg)
};
/// Make a future cancellable by the passed `cancel` token.
macro_rules! pass_cancel {
($a:expr) => (::futures::future::Abortable::new($a, cancel_register));
}
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
@ -161,16 +177,19 @@ where I: IntoIterator<Item=T>,
}; };
let mut i=0usize; let mut i=0usize;
let results = let results =
join_all( pass_cancel!(join_all(
files files
.map(|filename| { .map(|filename| {
let semaphore = semaphore.clone(); let semaphore = semaphore.clone();
let process = Arc::clone(&process); let process = Arc::clone(&process);
let mut tx = tx.clone(); let mut tx = tx.clone();
let cancel = cancel.clone();
#[cfg(feature="progress")] let mut progress = progress.clone(); #[cfg(feature="progress")] let mut progress = progress.clone();
(tokio::spawn(async move { //TODO: Where to put the cancellation check in here?? (XXX: Current
//`AbortHandle` (old tokio version) does not have `is_aborted()`... :/)
(tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>; #[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>; #[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = { let _task_id: Opt<_> = {
@ -179,7 +198,7 @@ where I: IntoIterator<Item=T>,
// (task_id, worker_result) // (task_id, worker_result)
let worker = { let worker = {
cfg_if!{ cfg_if!{
if #[cfg(feature="progress")] { if #[cfg(feature="progress")] {
let worker = do_work(&process, &filename, progress.clone()); let worker = do_work(&process, &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref())); let task = progress.add_task(format!("{:?}", filename.as_ref()));
future::join(task, worker).await future::join(task, worker).await
@ -232,9 +251,23 @@ where I: IntoIterator<Item=T>,
let _ = progress.bump_min(1).await; let _ = progress.bump_min(1).await;
} }
}),i+=1).0 }),i+=1).0
})).await }))).await
.map(|x| x
.into_iter() .into_iter()
.filter_map(|x| x.err()); .filter_map(|x| x.err()));
let results = match results {
Err(_) => {
#[cfg(feature="progress")] {
progress.eprintln("[!] Child aborting...").await?.await?;
progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?;
}
#[cfg(not(feature="progress"))] eprintln!("[!] Child aborting...");
todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)");
},
Ok(v) => v,
};
#[cfg(feature="progress")] progress.shutdown().await?; #[cfg(feature="progress")] progress.shutdown().await?;
for failed in results for failed in results

Loading…
Cancel
Save