use super::*; use std::{ num::NonZeroUsize, sync::Arc, marker::{ Send, Sync, }, path::PathBuf, ffi::OsStr, }; #[allow(unused_imports)] use std::iter; use tokio::{ sync::{ Semaphore, mpsc, }, }; #[allow(unused_imports)] use futures::future::{ self, join_all, Future, OptionFuture, }; use process::Process; async fn maybe_await(from: Option) -> Option<::Output> where T: Future { if let Some(v) = from { Some(v.await) } else { None } } #[cfg(feature="progress")] type ProgressSender = progress::ProgressSender; #[cfg(not(feature="progress"))] type ProgressSender = (); #[cfg(feature="collect_err")] struct Error { pub internal: process::Error, pub stack: fixed_stack::FixedStack<(bool, String)>, } #[cfg(not(feature="collect_err"))] type Error = process::Error; #[allow(unused_mut)] #[allow(unused_variables)] async fn do_work(process: impl AsRef, file: impl AsRef, mut prog: ProgressSender) -> Result, Error> { let file = file.as_ref(); let (tx, mut rx) = mpsc::channel::<(bool, String)>(16); #[cfg(feature="progress")] let _opt_await: OptionFuture<_> = prog.println(format!("[p]: Processing {:?}", file)).await.ok().into(); #[cfg(not(feature="progress"))] let _opt_await = println!("[p]: Processing {:?}", file); let collector = { let file = file.to_owned(); tokio::spawn(async move { let mut stack = fixed_stack::FixedStack::new(100); while let Some((err, value)) = rx.recv().await { cfg_if! { if #[cfg(feature="collect_err")] { stack.push((err, value)); } else { if err { cfg_if!{ if #[cfg(feature="progress")] { let value = format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); if let Err(_) = prog.eprintln(&value[..]).await { eprintln!("\n{}", value); } } else { eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value)); } } } else { stack.push((false, value)); } } } } stack }) }; tokio::task::yield_now().await; //let _ = opt_await.await; match process::contained_spawn(process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Err(error) => { cfg_if! { if #[cfg(feature="collect_err")] { Err(Error{ internal: error, stack: collector.await.expect("Child panic"), }) } else { Err(error) } } }, } } pub async fn work(flags: &arg::Flags, process: U, files: I, children: Option) -> Result<(), Box> where I: IntoIterator, ::IntoIter: ExactSizeIterator, T: AsRef + Send + Sync + 'static + Clone, U: Into { let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); let files = files.into_iter(); #[cfg(feature="progress")] let (mut progress, prog_handle) = { if flags.progress { progress::create_progress::(files.len(), iter::empty()) } else { progress::create_progress::(files.len(), iter::empty()) } }; let display = { #[cfg(feature="progress")] let mut progress = progress.clone(); tokio::spawn(async move { //let mut last: OptionFuture<_> = None.into(); while let Some((file, values, i)) = rx.recv().await { cfg_if!{ if #[cfg(feature="progress")] { let mut builder =progress.builder(); for (err, line) in values.into_iter() { if err { builder.eprintln(format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line))); } else { let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line); builder.println(line); } } let _ = builder.send().await; } else { for (err, line) in values.into_iter() { if err { eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line)); } else { println!(" -> ({}) {:?}: {}", i, file.as_ref(), line); } } } } } }) }; let mut i=0usize; let results = join_all( files .map(|filename| { let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); (tokio::spawn(async move { #[cfg(feature="progress")] type Opt = OptionFuture; #[cfg(not(feature="progress"))] type Opt = std::marker::PhantomData; let _task_id: Opt<_> = { let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; // (task_id, worker_result) let worker = { cfg_if!{ if #[cfg(feature="progress")] { let worker = do_work(&process, &filename, progress.clone()); let task = progress.add_task(format!("{:?}", filename.as_ref())); future::join(task, worker).await } else { #[cfg(nightly)] type NoReturn = !; #[cfg(not(nightly))] type NoReturn = (); (Option::::None, do_work(&process, &filename, ()).await) } } }; match worker.1 { Ok(strings) => tx.send((filename, strings, i)).await.map_err(|_| "Child panic").unwrap(), Err(error) => { #[cfg(feature="collect_err")] let error = { let Error{internal, stack} = error; tx.send((filename.clone(), stack.into_iter(), i)).await.map_err(|_| "Child panic").unwrap(); internal }; #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await .or_else(|e| { eprintln!("\n{}",e); Err(e) }); }, } cfg_if!{ if #[cfg(feature="progress")] { worker.0.ok().into() } else { cfg_if!{ if #[cfg(nightly)] { std::marker::PhantomData:: } else { std::marker::PhantomData::<()> } } } } }; #[cfg(feature="progress")] if let Some(Ok(id)) = _task_id.await { let mut builder = progress.builder(); builder.bump_min(1); builder.remove_task(id); let _ = builder.send().await; } else { let _ = progress.bump_min(1).await; } }),i+=1).0 })).await .into_iter() .filter_map(|x| x.err()); #[cfg(feature="progress")] progress.shutdown().await?; for failed in results { #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); } drop(tx); display.await?; #[cfg(feature="progress")] prog_handle.await.expect("Child panic")?; Ok(()) }