You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.4 KiB

use super::*;
use std::{
num::NonZeroUsize,
sync::Arc,
marker::{
Send,
Sync,
},
path::PathBuf,
ffi::OsStr,
};
#[allow(unused_imports)]
use std::iter;
use tokio::{
sync::{
Semaphore,
mpsc,
},
};
#[allow(unused_imports)]
use futures::future::{
self,
join_all,
Future,
OptionFuture,
};
use process::Process;
async fn maybe_await<T>(from: Option<T>) -> Option<<T as Future>::Output>
where T: Future
{
if let Some(v) = from {
Some(v.await)
} else {
None
}
}
#[cfg(feature="progress")]
type ProgressSender = progress::ProgressSender;
#[cfg(not(feature="progress"))]
type ProgressSender = ();
#[cfg(feature="collect_err")]
struct Error
{
pub internal: process::Error,
pub stack: fixed_stack::FixedStack<(bool, String)>,
}
#[cfg(not(feature="collect_err"))]
type Error = process::Error;
#[allow(unused_mut)]
#[allow(unused_variables)]
async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog: ProgressSender) -> Result<fixed_stack::IntoIter<(bool, String)>, Error>
{
let file = file.as_ref();
let (tx, mut rx) = mpsc::channel::<(bool, String)>(16);
#[cfg(feature="progress")] let _opt_await: OptionFuture<_> = prog.println(format!("[p]: Processing {:?}", file)).await.ok().into();
#[cfg(not(feature="progress"))] let _opt_await = println!("[p]: Processing {:?}", file);
let collector = {
let file = file.to_owned();
tokio::spawn(async move {
let mut stack = fixed_stack::FixedStack::new(100);
while let Some((err, value)) = rx.recv().await {
cfg_if! {
if #[cfg(feature="collect_err")] {
stack.push((err, value));
} else {
if err {
cfg_if!{
if #[cfg(feature="progress")] {
let value = format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value));
if let Err(_) = prog.eprintln(&value[..]).await {
eprintln!("\n{}", value);
}
} else {
eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file, colour::style(colour!(Color::Yellow),value));
}
}
} else {
stack.push((false, value));
}
}
}
}
stack
})
};
tokio::task::yield_now().await;
//let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => {
cfg_if! {
if #[cfg(feature="collect_err")] {
Err(Error{
internal: error,
stack: collector.await.expect("Child panic"),
})
} else {
Err(error)
}
}
},
}
}
pub async fn work<I,T,U>(flags: &arg::Flags, process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
T: AsRef<OsStr> + Send + Sync + 'static + Clone,
U: Into<PathBuf>
{
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(Process::new(process, flags.leanify_flags.clone()));
let files = files.into_iter();
#[cfg(feature="progress")] let (mut progress, prog_handle) = {
if flags.progress {
progress::create_progress::<termprogress::progress::Bar,_>(files.len(), iter::empty())
} else {
progress::create_progress::<termprogress::silent::Silent,_>(files.len(), iter::empty())
}
};
let display = {
#[cfg(feature="progress")] let mut progress = progress.clone();
tokio::spawn(async move {
//let mut last: OptionFuture<_> = None.into();
while let Some((file, values, i)) = rx.recv().await {
cfg_if!{
if #[cfg(feature="progress")] {
let mut builder =progress.builder();
for (err, line) in values.into_iter()
{
if err {
builder.eprintln(format!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line)));
} else {
let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line);
builder.println(line);
}
}
let _ = builder.send().await;
} else {
for (err, line) in values.into_iter()
{
if err {
eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::BrightYellow), "!"), file.as_ref(), colour::style(colour!(Color::Yellow),line));
} else {
println!(" -> ({}) {:?}: {}", i, file.as_ref(), line);
}
}
}
}
}
})
};
let mut i=0usize;
let results =
join_all(
files
.map(|filename| {
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
#[cfg(feature="progress")] let mut progress = progress.clone();
(tokio::spawn(async move {
#[cfg(feature="progress")] type Opt<T> = OptionFuture<T>;
#[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>;
let _task_id: Opt<_> = {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
// (task_id, worker_result)
let worker = {
cfg_if!{
if #[cfg(feature="progress")] {
let worker = do_work(&process, &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref()));
future::join(task, worker).await
} else {
#[cfg(nightly)] type NoReturn = !;
#[cfg(not(nightly))] type NoReturn = ();
(Option::<NoReturn>::None, do_work(&process, &filename, ()).await)
}
}
};
match worker.1 {
Ok(strings) => tx.send((filename, strings, i)).await.map_err(|_| "Child panic").unwrap(),
Err(error) => {
#[cfg(feature="collect_err")] let error = {
let Error{internal, stack} = error;
tx.send((filename.clone(), stack.into_iter(), i)).await.map_err(|_| "Child panic").unwrap();
internal
};
#[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error));
#[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await
.or_else(|e| {
eprintln!("\n{}",e); Err(e)
});
},
}
cfg_if!{
if #[cfg(feature="progress")] {
worker.0.ok().into()
} else {
cfg_if!{
if #[cfg(nightly)] {
std::marker::PhantomData::<!>
} else {
std::marker::PhantomData::<()>
}
}
}
}
};
#[cfg(feature="progress")]
if let Some(Ok(id)) = _task_id.await {
let mut builder = progress.builder();
builder.bump_min(1);
builder.remove_task(id);
let _ = builder.send().await;
} else {
let _ = progress.bump_min(1).await;
}
}),i+=1).0
})).await
.into_iter()
.filter_map(|x| x.err());
#[cfg(feature="progress")] progress.shutdown().await?;
for failed in results
{
#[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
#[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed);
}
drop(tx);
display.await?;
#[cfg(feature="progress")] prog_handle.await.expect("Child panic")?;
Ok(())
}