progress works

master
Avril 4 years ago
parent fbe6fa399a
commit 1cc0e38987
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
Cargo.lock generated

@ -179,6 +179,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
name = "leanify-many"
version = "0.1.0"
dependencies = [
"cfg-if",
"futures",
"lazy_static",
"rustc_version",

@ -19,6 +19,7 @@ lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]}
futures = "0.3"
termprogress = {version="0.2", optional=true}
cfg-if = "0.1.10"
[build-dependencies]
rustc_version = "0.2"

@ -77,7 +77,7 @@ impl From<ParseIntError> for Error
fn from(_er: ParseIntError) -> Self
{
#[cfg(nightly)] return Self::BadNumber(_er.kind().clone());
#[cfg(not(nightly))] Self::BadNumber
#[cfg(not(nightly))] Self::BadNumber(())
}
}

@ -114,7 +114,7 @@ impl<T> Iterator for IntoIter<T>
{
match self {
Self::Many(many) => many.size_hint(),
Self::Single(single) => (0, Some(1)),
Self::Single(_) => (0, Some(1)),
}
}
}

@ -17,20 +17,28 @@ use std::{
io,
path::Path,
};
use cfg_if::cfg_if;
/// Spawn the process, and contain its standard output.
///
/// # Notes
/// Standard error is printed immediately instead.
pub async fn contained_spawn<T,U,V>(process: T, args: U, mut output_to: mpsc::Sender<String>) -> Result<(), Error>
pub async fn contained_spawn<T,U,V>(process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error>
where T: AsRef<Path>,
U: IntoIterator<Item=V>,
V: AsRef<std::ffi::OsStr>
{
cfg_if!{
if #[cfg(feature="progress")] {
let stderr = std::process::Stdio::piped();
} else {
let stderr = std::process::Stdio::inherit();
}
};
let mut child = match Command::new(process.as_ref())
.args(args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.stderr(stderr)
.stdin(std::process::Stdio::null())
.spawn() {
Ok(chi) => chi,
@ -39,11 +47,24 @@ where T: AsRef<Path>,
}
};
let stdout = child.stdout.take().unwrap();
#[cfg(feature="progress")] let stderr_sender = {
let stderr = child.stderr.take().unwrap();
let mut output = output_to.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Some(Ok(line)) = lines.next().await {
if let Err(_) = output.send((true,line)).await {
break;
}
}
})
};
let sender = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Some(Ok(line)) = lines.next().await {
if let Err(_) = output_to.send(line).await {
if let Err(_) = output_to.send((false, line)).await {
break;
}
}
@ -52,7 +73,19 @@ where T: AsRef<Path>,
match child.await {
Ok(exit) => {
if exit.success() {
sender.await.expect("Child panic");
cfg_if!{
if #[cfg(feature="progress")] {
let (o1, o2) = futures::future::join(
sender,
stderr_sender
).await;
o1.expect("Child (stdout) panic");
o2.expect("Child (stderr) panic");
}
else {
sender.await.expect("Child panic");
}
};
Ok(())
} else {
Err(Error::Process(exit.code()))

@ -105,11 +105,15 @@ impl Future for CommandWaiter
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
{
let future = async {
self.0.take().unwrap().await.map_err(|_| Error::WorkerDropped)
};
tokio::pin!(future);
future.poll(cx)
if let Some(value) = self.0.take() {
let future = async move {
value.await.map_err(|_| Error::WorkerDropped)
};
tokio::pin!(future);
future.poll(cx)
} else {
Poll::Ready(Err(Error::WorkerDropped))
}
}
}
@ -128,15 +132,12 @@ impl Future for TaskWaiter
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
{
let future = async {
tokio::select! {
idx = self.0.take().unwrap() => {
return idx.map_err(|_| Error::WorkerDropped);
}
_ = &mut self.1 => {
return Err(Error::WorkerDropped);
}
}
let value = self.0.take().unwrap();
let one = &mut self.1;
let future = async {
let val = value.await.map_err(|_| Error::WorkerDropped)?;
one.await.map_err(|_| Error::WorkerDropped)?;
Ok(val)
};
tokio::pin!(future);
future.poll(cx)
@ -161,7 +162,7 @@ impl<'a> CommandBuilder<'a>
}
/// Add another Builder to this one
#[inline] pub fn chain<I: IntoIterator<Item = CommandKind>>(mut self, other: I) -> Self
#[inline] pub fn chain<I: IntoIterator<Item = CommandKind>>(&mut self, other: I) -> &mut Self
{
self.extend(other);
self
@ -169,42 +170,42 @@ impl<'a> CommandBuilder<'a>
// Commands
/// Print line on the worker's progress bar
pub fn println(mut self, line: impl Into<String>) -> Self
pub fn println(&mut self, line: impl Into<String>) -> &mut Self
{
self.send_command(CommandKind::PrintLine(line.into()));
self
}
/// Print error line on the worker's progress bar
pub fn eprintln(mut self, line: impl Into<String>) -> Self
pub fn eprintln(&mut self, line: impl Into<String>) -> &mut Self
{
self.send_command(CommandKind::PrintLineErr(line.into()));
self
}
/// Increase the worker's max number
pub fn bump_max(mut self, by: usize) -> Self
pub fn bump_max(&mut self, by: usize) -> &mut Self
{
self.send_command(CommandKind::BumpHigh(by));
self
}
/// Increase the worker's min number
pub fn bump_min(mut self, by: usize) -> Self
pub fn bump_min(&mut self, by: usize) -> &mut Self
{
self.send_command(CommandKind::BumpLow(by));
self
}
/// Remove a task by ID.
pub fn remove_task(mut self, task_idx: usize) -> Self
pub fn remove_task(&mut self, task_idx: usize) -> &mut Self
{
self.send_command(CommandKind::RemoveTask(task_idx));
self
}
/// Signal a shutdown to the worker
pub fn shutdown(mut self) -> Self
pub fn shutdown(&mut self) -> &mut Self
{
self.send_command(CommandKind::Complete);
self
@ -363,7 +364,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
I: IntoIterator<Item=String>>(high: usize, tasks: I) -> (ProgressSender, JoinHandle<Result<(), Error>>)
{
let mut list = task_list::TaskList::from_iter(tasks);
let mut progress = P::with_title(50, &list);
let mut progress = P::with_title(50, format!("(0/0) {}",list.as_str()));
let (shutdown_tx, shutdown_rx) = watch::channel(None);
let (tx, mut rx) = mpsc::channel::<Command>(16);
@ -387,11 +388,11 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
let (command, _defer) = {
let Command{comm, comp} = command;
let d = defer::Defer::new(|| {
let _ =comp.send(());
let _ = comp.send(());
});
(comm, d)
};
enum MaybeSingle<T>
where T: IntoIterator<Item = CommandKind>
{
@ -432,39 +433,51 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
};
stage::Stage::from_iter(command)
};
let mut has_blanked = false;
while let Some(command) = commands.next().await {
match command {
CommandKind::BumpHigh(high) => {
let stat = stat.to_mut();
stat.high+=high;
progress.set_progress(stat.get_pct());
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
},
CommandKind::BumpLow(low) => {
let stat = stat.to_mut();
stat.low+=low;
progress.set_progress(stat.get_pct());
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
},
CommandKind::PrintLine(line) => {
progress.println(&line[..]);
progress.blank();
has_blanked = true;
println!("{}", &line[..]);
},
CommandKind::PrintLineErr(line) => {
progress.eprintln(&line[..]);
progress.blank();
has_blanked = true;
eprintln!("{}", &line[..]);
},
CommandKind::AddTask(task) => {
let idx = list.push(task.name);
if let Err(_) = task.idx.send(idx) {
list.pop(idx);
} else {
progress.set_title(list.as_ref());
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
}
},
CommandKind::RemoveTask(task) => {
if list.pop(task) {
progress.set_title(list.as_ref());
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
}
},
CommandKind::Complete => {
break;
if has_blanked {
progress.refresh();
}
rx.close();
},
CommandKind::Many(many) => {
let _ = commands.sender().send_many(many).await;
@ -472,6 +485,10 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
}
}
if has_blanked {
progress.refresh();
}
if let Cow::Owned(stat) = std::mem::replace(&mut stat, Cow::Borrowed(&last_stat /* wtf? how is this legal? idk*/)) {
//It's been written to
let _ = stat_tx.broadcast(stat.clone());
@ -479,6 +496,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
}
}
progress.complete();
}).await.map_err(|_| Error::WorkerPanic);
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
res

@ -48,7 +48,7 @@ impl TaskList
self.index+=1;
self.index
};
#[cfg(nightly)] self.list.push_front((idx,string.into()));
#[cfg(nightly)] self.list.push_back((idx,string.into()));
#[cfg(not(nightly))] self.list.push((idx,string.into()));
self.recalc();
@ -87,7 +87,7 @@ impl TaskList
self.buffer = self.list.iter().map(|(_, s)| s.as_str()).join(", ");
}
#[cfg(not(nightly))] {
self.buffer = self.list.iter().rev().map(|(_, s)| s.as_str()).join(", ");
self.buffer = self.list.iter().map(|(_, s)| s.as_str()).join(", ");
}
}
}

@ -8,6 +8,8 @@ use std::{
},
path::{Path,PathBuf,},
ffi::OsStr,
borrow::BorrowMut,
iter,
};
use tokio::{
prelude::*,
@ -18,8 +20,10 @@ use tokio::{
io,
};
use futures::future::{
self,
join_all,
Future,
OptionFuture,
};
async fn maybe_await<T>(from: Option<T>) -> Option<<T as Future>::Output>
@ -32,21 +36,34 @@ where T: Future
}
}
async fn do_work(process: impl AsRef<Path>, file: impl AsRef<OsStr>) -> Result<fixed_stack::IntoIter<String>, process::Error>
async fn do_work(process: impl AsRef<Path>, file: impl AsRef<OsStr>, mut prog: progress::ProgressSender) -> Result<fixed_stack::IntoIter<String>, process::Error>
{
let process = process.as_ref();
let file = file.as_ref();
let (tx, mut rx) = mpsc::channel(16);
println!("[p]: Processing {:?}", file);
let collector = tokio::spawn(async move {
let mut stack = fixed_stack::FixedStack::new(100);
while let Some(value) = rx.recv().await {
stack.push(value);
}
stack
});
let (tx, mut rx) = mpsc::channel::<(bool, String)>(16);
let _opt_await: OptionFuture<_> = prog.println(format!("[p]: Processing {:?}", file)).await.ok().into();
let collector = {
let mut prog = prog.clone();
let file = file.to_owned();
tokio::spawn(async move {
let mut stack = fixed_stack::FixedStack::new(100);
while let Some((err, value)) = rx.recv().await {
if err {
let value = format!("[!] {:?}: {}", file, value);
if let Err(_) = prog.eprintln(&value[..]).await {
eprintln!("\n{}", value);
}
} else {
stack.push(value);
}
}
stack
})
};
tokio::task::yield_now().await;
//let _ = opt_await.await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter()),
Err(error) => Err(error),
@ -55,44 +72,78 @@ async fn do_work(process: impl AsRef<Path>, file: impl AsRef<OsStr>) -> Result<f
pub async fn work<I,T,U>(process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>,
T: AsRef<OsStr> + Send + Sync + 'static,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
T: AsRef<OsStr> + Send + Sync + 'static + Clone,
U: Into<PathBuf>
{
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<String>)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<String>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(process.into());
let display = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some((file, values)) = rx.recv().await {
for line in values.into_iter()
{
let line = format!("[i] {:?}: {}\n", file.as_ref(), line);
let _ = stdout.write_all(line.as_bytes()).await;
}
}
});
let files = files.into_iter();
#[cfg(feature="progress")] let (mut progress, prog_handle) = progress::create_progress::<termprogress::progress::Bar,_>(files.len(), iter::empty());
for failed in join_all(files.into_iter()
.map(|filename| {
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
tokio::spawn(async move {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
match do_work(process.as_ref(), &filename).await {
Ok(strings) => tx.send((filename, strings)).await.map_err(|_| "Child panic").unwrap(),
Err(error) => eprintln!("[!] {:?}: {}", filename.as_ref(), error),
}
})
})).await
let display = {
let mut progress = progress.clone();
tokio::spawn(async move {
//let mut last: OptionFuture<_> = None.into();
while let Some((file, values, i)) = rx.recv().await {
let mut builder =progress.builder();
for line in values.into_iter()
{
let line = format!(" -> ({}) {:?}: {}", i, file.as_ref(), line);
builder.println(line);
}
let _ = builder.send().await;
}
})
};
let mut i=0usize;
let results = join_all(files
.map(|filename| {
let semaphore = semaphore.clone();
let process = Arc::clone(&process);
let mut tx = tx.clone();
let mut progress = progress.clone();
(tokio::spawn(async move {
let task_id: OptionFuture<_> = {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
let worker = do_work(process.as_ref(), &filename, progress.clone());
let task = progress.add_task(format!("{:?}", filename.as_ref()));
let worker = future::join(task, worker).await;
match worker.1 {
Ok(strings) => tx.send((filename, strings, i)).await.map_err(|_| "Child panic").unwrap(),
Err(error) => {
//eprintln!("[!] {:?}: {}", filename.as_ref(), error)
let _ = progress.eprintln(format!("[!] ({}) {:?}: {}", i, filename.as_ref(), error)).await.or_else(|e| {eprintln!("\n{}",e); Err(e)});
},
}
worker.0.ok().into()
};
if let Some(Ok(id)) = task_id.await {
let mut builder = progress.builder();
builder.bump_min(1);
builder.remove_task(id);
let _ = builder.send().await;
} else {
let _ = progress.bump_min(1).await;
}
}),i+=1).0
})).await
.into_iter()
.filter_map(|x| x.err())
.filter_map(|x| x.err());
progress.shutdown().await?;
for failed in results
{
eprintln!("Child panic: {}", failed);
}
progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?;
}
drop(tx);
display.await?;
prog_handle.await.expect("Child panic")?;
Ok(())
}

Loading…
Cancel
Save