From db20ca106f4d21c77541cbd686beb098b96bc575 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 20 Nov 2024 14:54:08 +0000 Subject: [PATCH] Added CTRL+C cancel infrastructure into async work flow tree, but haven"t figured out how we can both stop adding new tasks *and* `await` on *all* current child processes before returning in error, which is what we need. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a seperate branch right now since I need to re-learn this codebase to add this properly i think... Fortune for leanify-many's current commit: Blessing − 吉 --- src/main.rs | 3 ++- src/progress.rs | 30 ++++++++++++++++++++++++++++++ src/task_list.rs | 23 +++++++++++++++++++++++ src/work.rs | 45 +++++++++++++++++++++++++++++++++++++++------ 4 files changed, 94 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4a8d5c3..1f68a92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,7 @@ async fn work() -> Result<(), Box> } } + use futures::future::FutureExt; work::work(&args.flags, leanify, args.files, match args.flags.hard_limit { Some(hard_limit) => { // We have hard limit @@ -58,7 +59,7 @@ async fn work() -> Result<(), Box> } }, _ => args.max_children, - }).await + }, tokio::signal::ctrl_c().boxed()).await } diff --git a/src/progress.rs b/src/progress.rs index 4a711b4..510f7e5 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -59,6 +59,11 @@ pub enum CommandKind AddTask(Task), RemoveTask(usize), + RemoveAll { + lock: bool, + title: Option>, + }, + Complete, Many(Vec), @@ -194,6 +199,12 @@ impl<'a> CommandBuilder<'a> self } + pub fn clear_tasks(&mut self, lock: bool, title: Option<&'static str>) -> &mut Self + { + self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Borrowed), lock }); + self + } + /// Signal a shutdown to the worker pub fn shutdown(&mut self) -> &mut Self { @@ -317,6 +328,11 @@ impl ProgressSender self.send_command(CommandKind::RemoveTask(task_idx)).await } + pub async fn clear_tasks(&mut self, lock: bool, title: Option) -> Result + { + self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Owned), lock }).await + } + /// Signal a shutdown to the worker pub async fn shutdown(&mut self) -> Result { @@ -463,6 +479,20 @@ pub fn create_progress { + if lock { + list.poison(); + } + list.clear(); + progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str()); + }, + CommandKind::RemoveAll{ title: Some(title), lock } => { + if lock { + list.poison(); + } + list.clear(); + progress.set_title(format!("({}/{}) {}", stat.low, stat.high, title.as_ref()).as_str()); + }, CommandKind::Complete => { if has_blanked { progress.refresh(); diff --git a/src/task_list.rs b/src/task_list.rs index 43cb903..63a9d6b 100644 --- a/src/task_list.rs +++ b/src/task_list.rs @@ -1,5 +1,9 @@ use super::*; use std::iter::FromIterator; +use std::sync::atomic::{ + AtomicBool, + Ordering, +}; #[cfg(nightly)] type ListAbs = std::collections::LinkedList; #[cfg(not(nightly))] type ListAbs = Vec; @@ -9,6 +13,8 @@ pub struct TaskList { list: ListAbs<(usize, String)>, buffer: String, index: usize, + + poisoned: AtomicBool, } fn find(list: &ListAbs, mut fun: F) -> Option @@ -32,6 +38,8 @@ impl TaskList list: ListAbs::new(), buffer: String::new(), index: 0, + + poisoned: AtomicBool::new(false), } } @@ -44,10 +52,15 @@ impl TaskList /// Push a new task string, and return its ID. pub fn push(&mut self, string: impl Into) -> usize { + if self.poisoned.load(Ordering::SeqCst) { + return 0; + } + let idx = { self.index+=1; self.index }; + #[cfg(nightly)] self.list.push_back((idx,string.into())); #[cfg(not(nightly))] self.list.push((idx,string.into())); @@ -58,6 +71,10 @@ impl TaskList /// Pop a task off the string, returns `true` if successful, `false` if wasn't found. pub fn pop(&mut self, idx: usize) -> bool { + if idx == 0 && self.poisoned.load(Ordering::Relaxed) { + return false; + } + if let Some(idx) = find(&self.list, |&(i, _)| idx == i) { self.list.remove(idx); @@ -68,6 +85,10 @@ impl TaskList } } + pub fn poison(&self) { + self.poisoned.store(true, Ordering::SeqCst); + } + /// Clear the `TaskList` pub fn clear(&mut self) { @@ -110,6 +131,8 @@ impl FromIterator for TaskList list: iter.into_iter().map(|x| ((i, x.into()), i+=1).0).collect(), buffer: Default::default(), index: 0, + + poisoned: AtomicBool::new(false), }; this.index = i; this.recalc(); diff --git a/src/work.rs b/src/work.rs index 2a0751d..55d31fb 100644 --- a/src/work.rs +++ b/src/work.rs @@ -108,12 +108,28 @@ async fn do_work(process: impl AsRef, file: impl AsRef, mut prog } } -pub async fn work(flags: &arg::Flags, process: U, files: I, children: Option) -> Result<(), Box> +pub async fn work(flags: &arg::Flags, process: U, files: I, children: Option, cancel: impl Future + Send + Unpin + 'static) -> Result<(), Box> where I: IntoIterator, ::IntoIter: ExactSizeIterator, T: AsRef + Send + Sync + 'static + Clone, U: Into { + let (cancel, cancel_register) = { + use futures::future::AbortHandle; + + let (handle, reg) = AbortHandle::new_pair(); + let rh = handle.clone(); + tokio::spawn(async move { + let _ = cancel.await; + handle.abort(); + }); + (rh, reg) + }; + /// Make a future cancellable by the passed `cancel` token. + macro_rules! pass_cancel { + ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); + } + let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); @@ -161,16 +177,19 @@ where I: IntoIterator, }; let mut i=0usize; let results = - join_all( + pass_cancel!(join_all( files .map(|filename| { let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); + let cancel = cancel.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); - (tokio::spawn(async move { + //TODO: Where to put the cancellation check in here?? (XXX: Current + //`AbortHandle` (old tokio version) does not have `is_aborted()`... :/) + (tokio::spawn(async move { #[cfg(feature="progress")] type Opt = OptionFuture; #[cfg(not(feature="progress"))] type Opt = std::marker::PhantomData; let _task_id: Opt<_> = { @@ -179,7 +198,7 @@ where I: IntoIterator, // (task_id, worker_result) let worker = { cfg_if!{ - if #[cfg(feature="progress")] { + if #[cfg(feature="progress")] { let worker = do_work(&process, &filename, progress.clone()); let task = progress.add_task(format!("{:?}", filename.as_ref())); future::join(task, worker).await @@ -232,9 +251,23 @@ where I: IntoIterator, let _ = progress.bump_min(1).await; } }),i+=1).0 - })).await + }))).await + .map(|x| x .into_iter() - .filter_map(|x| x.err()); + .filter_map(|x| x.err())); + + let results = match results { + Err(_) => { + #[cfg(feature="progress")] { + progress.eprintln("[!] Child aborting...").await?.await?; + progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?; + } + #[cfg(not(feature="progress"))] eprintln!("[!] Child aborting..."); + + todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); + }, + Ok(v) => v, + }; #[cfg(feature="progress")] progress.shutdown().await?; for failed in results