You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
577 lines
15 KiB
577 lines
15 KiB
#![allow(unused_must_use)] // Suppress `#[pin_project]` spamming warnings on build.
|
|
|
|
use super::*;
|
|
use pin_project::pin_project;
|
|
use std::{
|
|
iter::FromIterator as _,
|
|
borrow::{
|
|
Cow,
|
|
},
|
|
marker::{
|
|
Send,
|
|
},
|
|
task::{
|
|
Context,
|
|
Poll,
|
|
},
|
|
pin::Pin,
|
|
iter::{
|
|
self,
|
|
Once,
|
|
},
|
|
};
|
|
use tokio::{
|
|
sync::{
|
|
oneshot,
|
|
watch,
|
|
mpsc,
|
|
},
|
|
task::{
|
|
self,
|
|
JoinHandle,
|
|
},
|
|
stream::StreamExt as _,
|
|
};
|
|
use futures::{
|
|
future::{
|
|
Future,
|
|
},
|
|
};
|
|
use termprogress::{
|
|
WithTitle,
|
|
ProgressBar,
|
|
};
|
|
|
|
#[derive(Debug)]
|
|
pub struct Task
|
|
{
|
|
name: String,
|
|
idx: oneshot::Sender<usize>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum CommandKind
|
|
{
|
|
BumpHigh(usize),
|
|
BumpLow(usize),
|
|
|
|
PrintLine(String),
|
|
PrintLineErr(String),
|
|
|
|
AddTask(Task),
|
|
RemoveTask(usize),
|
|
|
|
RemoveAll {
|
|
lock: bool,
|
|
title: Option<Cow<'static, str>>,
|
|
},
|
|
|
|
Complete,
|
|
|
|
Many(Vec<CommandKind>),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct Command
|
|
{
|
|
comm: CommandKind,
|
|
comp: oneshot::Sender<()>,
|
|
}
|
|
|
|
/// The type used to communicate with the progress worker
|
|
#[derive(Debug, Clone)]
|
|
pub struct ProgressSender
|
|
{
|
|
shutdown: watch::Receiver<Option<Result<(), Error>>>,
|
|
command: mpsc::Sender<Command>,
|
|
|
|
stat: watch::Receiver<Stat>,
|
|
}
|
|
|
|
fn create_command(kind: CommandKind) -> (Command, oneshot::Receiver<()>)
|
|
{
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
(Command{comm: kind, comp: tx}, rx)
|
|
}
|
|
|
|
/// A future that completes when the worker has processed the command.
|
|
///
|
|
/// Completes to `Err(Error::WorkerDropped)` if worker dropped before reading the command, OR the command has not been sent.
|
|
///
|
|
/// # Panics
|
|
/// Awaiting on this multiple times will cause it to panic
|
|
#[pin_project]
|
|
#[derive(Debug)]
|
|
pub struct CommandWaiter(#[pin] Option<oneshot::Receiver<()>>);
|
|
|
|
impl Future for CommandWaiter
|
|
{
|
|
type Output = Result<(), Error>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
|
|
{
|
|
match self.project().0.as_pin_mut() {
|
|
Some(x) => x.poll(cx).map_err(|_| Error::WorkerDropped),
|
|
None => Poll::Ready(Err(Error::WorkerDropped)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A future that completes when the worker has processed the task adding command, and returned a value.
|
|
///
|
|
/// Completes to `Err(Error::WorkerDropped)` the same way `CommandWaiter` will, and additionally, if the task index is not returned.
|
|
///
|
|
/// # Panics
|
|
/// Awaiting on this multiple times will cause it to panic
|
|
#[pin_project]
|
|
#[derive(Debug)]
|
|
pub struct TaskWaiter(#[pin] Option<oneshot::Receiver<usize>>,CommandWaiter);
|
|
|
|
impl Future for TaskWaiter
|
|
{
|
|
type Output = Result<usize, Error>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
|
|
{
|
|
match self.project().0.as_pin_mut() {
|
|
Some(x) => x.poll(cx).map_err(|_| Error::WorkerDropped),
|
|
None => Poll::Ready(Err(Error::WorkerDropped)),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct CommandBuilder<'a>
|
|
{
|
|
sender: &'a mut ProgressSender,
|
|
commands: Option<maybe_single::MaybeSingle<CommandKind>>,
|
|
}
|
|
|
|
impl<'a> CommandBuilder<'a>
|
|
{
|
|
fn send_command(&mut self, comm: CommandKind)
|
|
{
|
|
if let Some(ref mut commands) = self.commands {
|
|
commands.push(comm);
|
|
} else {
|
|
self.commands = Some(maybe_single::MaybeSingle::single(comm));
|
|
}
|
|
}
|
|
|
|
/// Add another Builder to this one
|
|
#[inline] pub fn chain<I: IntoIterator<Item = CommandKind>>(&mut self, other: I) -> &mut Self
|
|
{
|
|
self.extend(other);
|
|
self
|
|
}
|
|
|
|
// Commands
|
|
/// Print line on the worker's progress bar
|
|
pub fn println(&mut self, line: impl Into<String>) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::PrintLine(line.into()));
|
|
self
|
|
}
|
|
|
|
/// Print error line on the worker's progress bar
|
|
pub fn eprintln(&mut self, line: impl Into<String>) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::PrintLineErr(line.into()));
|
|
self
|
|
}
|
|
|
|
/// Increase the worker's max number
|
|
pub fn bump_max(&mut self, by: usize) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::BumpHigh(by));
|
|
self
|
|
}
|
|
|
|
/// Increase the worker's min number
|
|
pub fn bump_min(&mut self, by: usize) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::BumpLow(by));
|
|
self
|
|
}
|
|
|
|
/// Remove a task by ID.
|
|
pub fn remove_task(&mut self, task_idx: usize) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::RemoveTask(task_idx));
|
|
self
|
|
}
|
|
|
|
pub fn clear_tasks(&mut self, lock: bool, title: Option<&'static str>) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Borrowed), lock });
|
|
self
|
|
}
|
|
|
|
/// Signal a shutdown to the worker
|
|
pub fn shutdown(&mut self) -> &mut Self
|
|
{
|
|
self.send_command(CommandKind::Complete);
|
|
self
|
|
}
|
|
|
|
/// Send this as an atom,
|
|
pub async fn send(self) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.sender.send_command(match self.commands {
|
|
Some(maybe_single::MaybeSingle::Single(Some(single))) => {
|
|
single
|
|
},
|
|
Some(maybe_single::MaybeSingle::Many(many)) => {
|
|
CommandKind::Many(many)
|
|
},
|
|
_ => return Err(Error::NoCommands),
|
|
}).await
|
|
}
|
|
}
|
|
|
|
impl<'a> Extend<CommandKind> for CommandBuilder<'a>
|
|
{
|
|
fn extend<I: IntoIterator<Item = CommandKind>>(&mut self, other: I)
|
|
{
|
|
match self.commands {
|
|
Some(ref mut commands) => commands.extend(other),
|
|
_ => self.commands = Some(other.into_iter().collect()),
|
|
};
|
|
}
|
|
}
|
|
|
|
impl<'a> IntoIterator for CommandBuilder<'a>
|
|
{
|
|
type Item= CommandKind;
|
|
type IntoIter = maybe_single::IntoIter<CommandKind>;
|
|
|
|
fn into_iter(self) -> Self::IntoIter
|
|
{
|
|
match self.commands {
|
|
Some(value) => value.into_iter(),
|
|
None => maybe_single::MaybeSingle::from(vec![]).into_iter(),
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
impl ProgressSender
|
|
{
|
|
/// Get the status of this progress
|
|
pub fn status(&self) -> Stat
|
|
{
|
|
self.stat.borrow().clone()
|
|
}
|
|
|
|
/// Wait for shutdown without signalling it
|
|
///
|
|
/// # Notes
|
|
/// Will return `Err(Error::WorkerDropped)` if this is called after the worker has already been dropped.
|
|
pub async fn wait(&mut self) -> Result<(), Error>
|
|
{
|
|
loop {
|
|
match self.shutdown.recv().await {
|
|
Some(Some(res)) => return res,
|
|
None => return Err(Error::WorkerDropped),
|
|
_ => {},
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn send_command(&mut self, comm: CommandKind) -> Result<CommandWaiter, Error>
|
|
{
|
|
let (comm, rx) = create_command(comm);
|
|
|
|
self.command.send(comm).await.map_err(|_| Error::WorkerDropped)?;
|
|
Ok(CommandWaiter(Some(rx)))
|
|
}
|
|
|
|
// Commands
|
|
/// Print line on the worker's progress bar
|
|
pub async fn println(&mut self, line: impl Into<String>) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::PrintLine(line.into())).await
|
|
}
|
|
|
|
/// Print error line on the worker's progress bar
|
|
pub async fn eprintln(&mut self, line: impl Into<String>) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::PrintLineErr(line.into())).await
|
|
}
|
|
|
|
/// Increase the worker's max number
|
|
pub async fn bump_max(&mut self, by: usize) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::BumpHigh(by)).await
|
|
}
|
|
|
|
/// Increase the worker's min number
|
|
pub async fn bump_min(&mut self, by: usize) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::BumpLow(by)).await
|
|
}
|
|
|
|
/// Add a task to the worker's progress bar title line
|
|
///
|
|
/// This function returns a [TaskWaiter]`TaskWaiter` future, upon successful `await`ing will yield the task's ID.
|
|
pub async fn add_task(&mut self, name: impl Into<String>) -> Result<TaskWaiter, Error>
|
|
{
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
let com = self.send_command(CommandKind::AddTask(Task{name: name.into(), idx: tx})).await?;
|
|
|
|
Ok(TaskWaiter(Some(rx), com))
|
|
}
|
|
|
|
/// Remove a task by ID.
|
|
pub async fn remove_task(&mut self, task_idx: usize) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::RemoveTask(task_idx)).await
|
|
}
|
|
|
|
pub async fn clear_tasks(&mut self, lock: bool, title: Option<String>) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::RemoveAll{ title: title.map(Cow::Owned), lock }).await
|
|
}
|
|
|
|
/// Signal a shutdown to the worker
|
|
pub async fn shutdown(&mut self) -> Result<CommandWaiter, Error>
|
|
{
|
|
self.send_command(CommandKind::Complete).await
|
|
}
|
|
|
|
/// Create a new command builder
|
|
pub fn builder<'a>(&'a mut self) -> CommandBuilder<'a>
|
|
{
|
|
CommandBuilder{
|
|
commands: None,
|
|
sender: self,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Status of the worker
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct Stat
|
|
{
|
|
pub high: usize,
|
|
pub low: usize,
|
|
}
|
|
|
|
impl Stat
|
|
{
|
|
#[inline] fn get_pct(&self) -> f64
|
|
{
|
|
self.low as f64 / (self.high as f64)
|
|
}
|
|
}
|
|
|
|
/// Create the async progress counter and return a sender object and a handle to join the worker task.
|
|
pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static,
|
|
I: IntoIterator<Item=String>>(high: usize, tasks: I) -> (ProgressSender, JoinHandle<Result<(), Error>>)
|
|
{
|
|
let mut list = task_list::TaskList::from_iter(tasks);
|
|
let mut progress = P::with_title(50, format!("(0/0) {}",list.as_str()));
|
|
|
|
let (shutdown_tx, shutdown_rx) = watch::channel(None);
|
|
let (tx, mut rx) = mpsc::channel::<Command>(16);
|
|
|
|
let stat = Stat {
|
|
high: high,
|
|
low: 0,
|
|
};
|
|
let (stat_tx, stat_rx) = watch::channel(stat.clone());
|
|
|
|
|
|
let handle = {
|
|
let handle = task::spawn(async move {
|
|
// To handle worker panics, spawn a seperate task
|
|
|
|
let res = task::spawn(async move {
|
|
//Actual worker
|
|
let mut last_stat = stat;
|
|
while let Some(command) = rx.recv().await {
|
|
let mut stat= Cow::Borrowed(&last_stat);
|
|
let (command, _defer) = {
|
|
let Command{comm, comp} = command;
|
|
let d = defer::Defer::new(|| {
|
|
let _ = comp.send(());
|
|
});
|
|
(comm, d)
|
|
};
|
|
|
|
enum MaybeSingle<T>
|
|
where T: IntoIterator<Item = CommandKind>
|
|
{
|
|
Single(Once<CommandKind>),
|
|
Many(<T as IntoIterator>::IntoIter),
|
|
}
|
|
|
|
impl<T> ExactSizeIterator for MaybeSingle<T>
|
|
where T: IntoIterator<Item = CommandKind>,
|
|
<T as IntoIterator>::IntoIter: ExactSizeIterator{}
|
|
|
|
impl<T> Iterator for MaybeSingle<T>
|
|
where T: IntoIterator<Item = CommandKind>,
|
|
<T as IntoIterator>::IntoIter: ExactSizeIterator
|
|
{
|
|
type Item = <T as IntoIterator>::Item;
|
|
fn next(&mut self) -> Option<Self::Item>
|
|
{
|
|
match self {
|
|
Self::Many(ref mut many) => many.next(),
|
|
Self::Single(ref mut single) => single.next(),
|
|
}
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>)
|
|
{
|
|
match &self {
|
|
Self::Many(many) => (many.len(), Some(many.len())),
|
|
Self::Single(_) => (1, Some(1)),
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut commands: stage::Stage<CommandKind> = {
|
|
let command: MaybeSingle<Vec<CommandKind>> = match command {
|
|
CommandKind::Many(many) => MaybeSingle::Many(many.into_iter()),
|
|
other => MaybeSingle::Single(iter::once(other)),
|
|
};
|
|
stage::Stage::from_iter(command)
|
|
};
|
|
let mut has_blanked = false;
|
|
while let Some(command) = commands.next().await {
|
|
match command {
|
|
CommandKind::BumpHigh(high) => {
|
|
let stat = stat.to_mut();
|
|
stat.high+=high;
|
|
progress.set_progress(stat.get_pct());
|
|
|
|
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
|
|
},
|
|
CommandKind::BumpLow(low) => {
|
|
let stat = stat.to_mut();
|
|
stat.low+=low;
|
|
progress.set_progress(stat.get_pct());
|
|
|
|
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
|
|
},
|
|
CommandKind::PrintLine(line) => {
|
|
progress.blank();
|
|
has_blanked = true;
|
|
println!("{}", &line[..]);
|
|
},
|
|
CommandKind::PrintLineErr(line) => {
|
|
progress.blank();
|
|
has_blanked = true;
|
|
eprintln!("{}", &line[..]);
|
|
},
|
|
CommandKind::AddTask(task) => {
|
|
let idx = list.push(task.name);
|
|
if let Err(_) = task.idx.send(idx) {
|
|
list.pop(idx);
|
|
} else {
|
|
progress.set_title(format!("({}/{}) {}",stat.low, stat.high, list.as_ref()).as_str());
|
|
}
|
|
},
|
|
CommandKind::RemoveTask(task) => {
|
|
if list.pop(task) {
|
|
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
|
|
}
|
|
},
|
|
CommandKind::RemoveAll{ title: None, lock } => {
|
|
if lock {
|
|
list.poison();
|
|
}
|
|
list.clear();
|
|
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, list.as_ref()).as_str());
|
|
},
|
|
CommandKind::RemoveAll{ title: Some(title), lock } => {
|
|
if lock {
|
|
list.poison();
|
|
}
|
|
list.clear();
|
|
progress.set_title(format!("({}/{}) {}", stat.low, stat.high, title.as_ref()).as_str());
|
|
},
|
|
CommandKind::Complete => {
|
|
if has_blanked {
|
|
progress.refresh();
|
|
}
|
|
rx.close();
|
|
},
|
|
CommandKind::Many(many) => {
|
|
let _ = commands.sender().send_many(many).await;
|
|
},
|
|
}
|
|
}
|
|
|
|
if has_blanked {
|
|
progress.refresh();
|
|
}
|
|
|
|
if let Cow::Owned(stat) = std::mem::replace(&mut stat, Cow::Borrowed(&last_stat /* wtf? how is this legal? idk*/)) {
|
|
//It's been written to
|
|
let _ = stat_tx.broadcast(stat.clone());
|
|
last_stat = stat;
|
|
}
|
|
|
|
}
|
|
progress.complete();
|
|
}).await.map_err(|_| Error::WorkerPanic);
|
|
shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters");
|
|
|
|
res
|
|
});
|
|
handle
|
|
};
|
|
|
|
let prog = ProgressSender {
|
|
shutdown: shutdown_rx,
|
|
command: tx,
|
|
stat: stat_rx,
|
|
};
|
|
|
|
(prog, handle)
|
|
}
|
|
|
|
#[derive(Debug,Clone)]
|
|
pub enum Error
|
|
{
|
|
NoCommands,
|
|
WorkerPanic,
|
|
WorkerDropped,
|
|
Unknown,
|
|
}
|
|
|
|
impl Error
|
|
{
|
|
/// If this error comes from an attempt to communicate with the worker that failed but can be ignored.
|
|
///
|
|
/// # Usage
|
|
/// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so.
|
|
/// Those tasks can ignore those errors entirely if this function returns `true`.
|
|
pub fn can_ignore_on_com_send_failure(&self) -> bool
|
|
{
|
|
match self {
|
|
Self::WorkerDropped => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for Error{}
|
|
impl std::fmt::Display for Error
|
|
{
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
|
|
{
|
|
match self {
|
|
Error::NoCommands => write!(f, "an attempt was made to send 0 commands"),
|
|
Error::WorkerPanic => write!(f, "worker panicked"),
|
|
Error::WorkerDropped => write!(f, "tried to communicate with dropped worker"),
|
|
_ => write!(f, "unknown error"),
|
|
}
|
|
}
|
|
}
|