use super::*; use std::{ num::NonZeroUsize, sync::Arc, marker::{ Send, Sync, }, path::PathBuf, ffi::OsStr, }; #[allow(unused_imports)] use std::iter; use tokio::{ sync::{ Semaphore, mpsc, }, }; #[allow(unused_imports)] use futures::future::{ self, join_all, Future, OptionFuture, }; use process::Process; async fn maybe_await<T>(from: Option<T>) -> Option<<T as Future>::Output> where T: Future { if let Some(v) = from { Some(v.await) } else { None } } #[cfg(feature="progress")] type ProgressSender = progress::ProgressSender; #[cfg(not(feature="progress"))] type ProgressSender = (); #[cfg(feature="collect_err")] struct Error { pub internal: process::Error, pub stack: fixed_stack::FixedStack<(bool, String)>, } #[cfg(not(feature="collect_err"))] type Error = process::Error; #[allow(unused_mut)] #[allow(unused_variables)] async fn do_work<'cli>(flags: Option<&'cli arg::Flags>, process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog: ProgressSender) -> Result<fixed_stack::IntoIter<(bool, String)>, Error> { let file = file.as_ref(); let (tx, mut rx) = mpsc::channel::<(bool, String)>(16); #[cfg(feature="progress")] let _opt_await: OptionFuture<_> = prog.println(format!("[p]: Processing {:?}", file)).await.ok().into(); #[cfg(not(feature="progress"))] let _opt_await = println!("[p]: Processing {:?}", file); let collector = { let file = file.to_owned(); tokio::spawn(async move { let mut stack = fixed_stack::FixedStack::new(100); while let Some((err, value)) = rx.recv().await { cfg_if! { if #[cfg(feature="collect_err")] { stack.push((err, value)); } else { if err { cfg_if!{ if #[cfg(feature="progress")] { let value = format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); if let Err(_) = prog.eprintln(&value[..]).await { eprintln!("\n{}", value); } } else { eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); } } } else { stack.push((false, value)); } } } } stack }) }; let _ = tokio::task::yield_now().await; //let _ = opt_await.await; match process::contained_spawn(flags.map(Into::into), process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Err(error) => { cfg_if! { if #[cfg(feature="collect_err")] { Err(Error{ internal: error, stack: collector.await.expect("Child panic"), }) } else { Err(error) } } }, } } pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>, cancel: impl Future + Send + Unpin + 'static) -> Result<(), Box<dyn std::error::Error>> where I: IntoIterator<Item=T>, <I as IntoIterator>::IntoIter: ExactSizeIterator, T: AsRef<OsStr> + Send + Sync + 'static + Clone, U: Into<PathBuf> { let (cancel, cancel_register) = { use futures::future::AbortHandle; let (handle, reg) = AbortHandle::new_pair(); let rh = handle.clone(); tokio::spawn(async move { let _ = cancel.await; handle.abort(); }); (rh, reg) }; /// Make a future cancellable by the passed `cancel` token. macro_rules! pass_cancel { ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); } let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); let files = files.into_iter(); #[cfg(feature="progress")] let (mut progress, prog_handle) = { if flags.progress { progress::create_progress::<termprogress::progress::Bar,_>(files.len(), iter::empty()) } else { progress::create_progress::<termprogress::silent::Silent,_>(files.len(), iter::empty()) } }; let display = { #[cfg(feature="progress")] let mut progress = progress.clone(); tokio::spawn(async move { //let mut last: OptionFuture<_> = None.into(); while let Some((file, values, i)) = rx.recv().await { cfg_if!{ if #[cfg(feature="progress")] { let mut builder =progress.builder(); for (err, line) in values.into_iter() { if err { builder.eprintln(format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line))); } else { let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line); builder.println(line); } } let _ = builder.send().await; } else { for (err, line) in values.into_iter() { if err { eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line)); } else { println!(" -> ({}) {:?}: {}", i, file.as_ref(), line); } } } } } }) }; let mut i=0usize; let results = pass_cancel!(join_all( files .map(|filename| -> OptionFuture<_> { let cancel = cancel.clone(); // Check if cancellation has happened before the spawn attempt of the child tasks, and do nothing if it has. if cancel.is_aborted() { return None.into(); } let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); let flags = flags.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); Some((tokio::spawn(async move { #[cfg(feature="progress")] type Opt<T> = OptionFuture<T>; #[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>; let _task_id: Opt<_> = { let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; // Check if cancellation has happened while we were awaiting the lock, and return doing nothing if it has. if cancel.is_aborted() { return; } // (task_id, worker_result) let worker = { let flags = &flags; cfg_if!{ if #[cfg(feature="progress")] { let worker = do_work(Some(flags), &process, &filename, progress.clone()); let task = progress.add_task(format!("{:?}", filename.as_ref())); future::join(task, worker).await } else { #[cfg(nightly)] type NoReturn = !; #[cfg(not(nightly))] type NoReturn = (); (Option::<NoReturn>::None, do_work(Some(flags), &process, &filename, ()).await) } } }; match worker.1 { Ok(strings) => tx.send((filename, strings, i)).await.map_err(|_| "Child panic").unwrap(), Err(error) => { #[cfg(feature="collect_err")] let error = { let Error{internal, stack} = error; tx.send((filename.clone(), stack.into_iter(), i)).await.map_err(|_| "Child panic").unwrap(); internal }; #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await .or_else(|e| { if !e.can_ignore_on_com_send_failure() { eprintln!("\n{}",e); } Err(e) }); }, } cfg_if!{ if #[cfg(feature="progress")] { worker.0.ok().into() } else { cfg_if!{ if #[cfg(nightly)] { std::marker::PhantomData::<!> } else { std::marker::PhantomData::<()> } } } } }; #[cfg(feature="progress")] if let Some(Ok(id)) = _task_id.await { let mut builder = progress.builder(); builder.bump_min(1); builder.remove_task(id); let _ = builder.send().await; } else { let _ = progress.bump_min(1).await; } }),i+=1).0).into() }))).await .map(|x| x .into_iter() .filter_map(|x| x?.err())); let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results { Err(_) => { let msg = format!("[{}] Requested interrupt! Waiting for existing children...", colour::style(colour!(Color::Yellow),"!")); #[cfg(feature="progress")] { progress.eprintln(msg).await?.await?; //progress.clear_tasks(true, Some("Waiting for existing...".to_string())).await?; } #[cfg(not(feature="progress"))] eprintln!("{}", msg); // We do not have the results, return an empty iterator Box::new(std::iter::empty()) }, Ok(v) => Box::new(v), }; for failed in results { #[cfg(feature="progress")] progress.eprintln(format!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[{}] Child panic {:?}", colour::style(colour!(Color::BrightRed), "e"), failed); } #[cfg(feature="progress")] progress.shutdown().await?; drop(tx); display.await?; // Display task has completed, child tasks are complete; we are about to exit. if cancel.is_aborted() { eprintln!("[.] Running children complete, ignoring rest due to cancellation."); } #[cfg(feature="progress")] prog_handle.await.expect("Child panic")?; Ok(()) }