You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
videl/src/progress/mod.rs

427 lines
10 KiB

//! Async progression
use super::*;
use termprogress::{
prelude::*,
};
use futures::{
prelude::*,
};
use tokio::{
sync::{
mpsc,
RwLock,
watch,
oneshot,
RwLockReadGuard,
RwLockWriteGuard,
},
task::{self, JoinHandle},
};
use std::{
sync::{
Weak,
},
fmt,
error,
};
mod tasklist;
pub use tasklist::{
TaskId,
IdNotFoundError,
};
/// Command to send to worker task.
#[derive(Debug)]
pub enum CommandKind
{
Line(String),
LineErr(String),
Bump(isize),
BumpHigh(isize),
Set{low: Option<usize>, high: Option<usize>},
Refresh,
/// Add a task to the tasklist
///
/// # Response
/// Will respond with the task's `TaskId`.
AddTask(String),
/// Remove a task from the tasklist.
///
/// # Response
/// Will respond with `Result<String, IdNotFoundError>` of the removal operation
RemoveTask(TaskId),
Shutdown,
Many(Vec<CommandKind>),
}
/// The type sent in response to a `Command`.
pub type Response = Option<Box<dyn std::any::Any + Send+ Sync + 'static>>;
#[derive(Debug)]
enum CommandIter
{
One(std::iter::Once<CommandKind>),
Many(std::vec::IntoIter<CommandKind>),
}
impl ExactSizeIterator for CommandIter{}
impl Iterator for CommandIter
{
type Item = CommandKind;
fn next(&mut self) -> Option<Self::Item>
{
match self {
Self::One(one) => one.next(),
Self::Many(many) => many.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>)
{
let sz = match self {
Self::One(_) => 1,
Self::Many(m) => m.len(),
};
(sz, Some(sz))
}
}
impl std::iter::FusedIterator for CommandIter{}
impl CommandKind
{
/// Enumerate all possible commands if this is `Many`.
///
/// The outputs may still contain `Many`.
//TODO: Make this work recursively
fn enumerate(self) -> CommandIter
{
match self {
Self::Many(many) => CommandIter::Many(many.into_iter()),
other => CommandIter::One(std::iter::once(other)),
}
}
}
#[derive(Debug, Clone)]
pub struct BarRef<B>(Arc<RwLock<B>>);
#[derive(Debug)]
struct Command(CommandKind, oneshot::Sender<Response>);
/// The bar's state
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct State
{
max: usize,
cur: usize,
//TODO: Tasks
}
impl State
{
/// The current progress
pub fn prog(&self) -> f64
{
(self.cur as f64) / (self.max as f64)
}
}
/// A handle to a running async progress bar
#[derive(Debug)]
pub struct Handle<B = Bar>
where B: ProgressBar,
{
// Channel to send commands to the worker
chan: mpsc::Sender<Command>,
// A weak reference to the worker's bar itself
bar: Weak<RwLock<B>>,
// A strong reference to the bar's state
state: Arc<RwLock<State>>,
// Has the worker shut down?
dead: watch::Receiver<bool>,
}
impl<B: ProgressBar> Clone for Handle<B>
{
fn clone(&self)->Self
{
Self {
chan: self.chan.clone(),
bar: self.bar.clone(),
state: self.state.clone(),
dead: self.dead.clone(),
}
}
}
impl<B: ProgressBar> Handle<B>
{
/// Is the worker alive?
pub fn is_alive(&self) -> bool
{
self.bar.strong_count()>0 && !*self.dead.borrow()
}
/// Yields until the worker shutds down gracefully
pub async fn closed(&mut self) -> Result<(),WorkerCommError>
{
loop {
match self.dead.recv().await {
Some(true) => return Ok(()),
None => return Err(WorkerCommError),
_ => continue,
}
}
}
/// Send a command to the worker.
///
/// Returns a future that completes to `Ok` when the worker successfully processes the command, and `Err` if the worker exits before processing it
pub async fn send_command(&mut self, command: CommandKind) -> Result<impl Future<Output=Result<Response, WorkerCommError>>, WorkerCommError>
{
let (tx, rx) = oneshot::channel();
self.chan.send(Command(command, tx)).await.map_err(|_| WorkerCommError)?;
Ok(rx.map(|res| res.map_err(|_| WorkerCommError)))
}
/// Send a command to the worker and then wait for it to be processed
pub async fn send_command_and_wait(&mut self, command: CommandKind) -> Result<Response, WorkerCommError>
{
self.send_command(command).await?.await
}
/// Send a command to the worker but do not wait for it to be processed
pub async fn send_command_and_detach(&mut self, command: CommandKind) -> Result<(), WorkerCommError>
{
let _ = self.send_command(command).await?;
Ok(())
}
/// Get a reference to the state
pub async fn state(&self) -> RwLockReadGuard<'_, State>
{
self.state.read().await
}
/// Act on a mutable reference to the bar within this closure
///
/// # Notes
/// Acquiring this will prevent the worker from exiting until the closure finishes.
pub async fn with_bar_mut<F,T>(&self, fun: F) -> Result<T, WorkerCommError>
where F: FnOnce(&'_ mut B) -> T,
{
let handle = self.bar.upgrade().ok_or(WorkerCommError)?;
let mut h = handle.write().await;
use std::ops::DerefMut;
Ok(fun(h.deref_mut()))
}
/// Act on a reference to the bar within this closure
///
/// # Notes
/// Acquiring this will prevent the worker from exiting until the closure finishes.
pub async fn with_bar<F,T>(&self, fun: F) -> Result<T, WorkerCommError>
where F: FnOnce(&'_ B) -> T,
{
let handle = self.bar.upgrade().ok_or(WorkerCommError)?;
let h = handle.read().await;
use std::ops::Deref;
Ok(fun(h.deref()))
}
}
/// Error communicating with worker
#[derive(Debug)]
pub struct WorkerCommError;
impl error::Error for WorkerCommError{}
impl fmt::Display for WorkerCommError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to communicate with worker.")
}
}
/// Host a progress bar and detach it
pub fn host<B: ProgressBar + Send + Sync + 'static>(bar: B) -> (Handle<B>, JoinHandle<B>)
{
let state = Arc::new(RwLock::new(Default::default()));
let (mut rx, death, bar, handle) = {
let (tx, rx) = mpsc::channel(24);
let (death, dead) = watch::channel(false);
let bar = Arc::new(RwLock::new(bar));
let handle = Handle {
chan: tx,
dead,
bar: Arc::downgrade(&bar),
state: Arc::clone(&state),
};
(rx,death,bar,handle)
};
(handle, tokio::spawn(async move {
({
let mut tasks = tasklist::TaskList::new();
macro_rules! update_bar {
(refresh $($tt:tt)*) => {
{
let bar = bar.read().await;
bar.refresh();
update_bar!($($tt)*);
}
};
(to $state:ident $($tt:tt)*) => {
{
let mut bar = bar.write().await;
bar.set_progress($state.prog());
update_bar!($($tt)*);
}
};
(write error $line:ident $($tt:tt)*) => {
{
let bar = bar.read().await;
let string = &$line[..];
bar.eprintln(string);
update_bar!($($tt)*);
}
};
(write $(std)? $line:ident $($tt:tt)*) => {
{
let bar = bar.read().await;
let string = &$line[..];
bar.println(string);
update_bar!($($tt)*);
}
};
(title $($tt:tt)*) => {
{
let mut bar = bar.write().await;
bar.set_title(tasks.as_str());
update_bar!($($tt)*);
}
};
(+task $task:ident $($tt:tt)*) => {
{
let id = tasks.add($task);
update_bar!($($tt)*);
id
}
};
(-task $id:ident $($tt:tt)*) => {
{
let res = tasks.remove(&$id);
update_bar!($($tt)*);
res
}
};
() => {};
}
update_bar!(refresh);
while let Some(Command(command, response)) = rx.recv().await {
let response = Arc::new(std::sync::Mutex::new(Some(response)));
/// Send a response if one has not already been sent.
///
/// # Returns
/// * `Some(Ok(())` - if response was sent okay
/// * `Some(Err(_))` - if response failed to send.
/// * `None` - if response has already been sent
///
/// # Panics
/// If mutex is poisoned (this should be impossible).
macro_rules! send_response {
($value:expr) => (send_response!(@ response => Some(Box::new($value))));
(@ $response:ident => $value:expr) => {
{
let value: Response = $value;
{
if let Some(response) = $response.lock().unwrap().take() {
Some(response.send(value))
} else {
None
}
}
}
};
}
// Guard that ensures a `None` response is sent after this command has been processed, if an explicit response has not yet been sent.
let _resp = {
let response = Arc::clone(&response);
util::defer(move || send_response!(@ response => None).ignore())
};
match command {
CommandKind::Shutdown => break,
CommandKind::BumpHigh(sz) if sz >= 0 => {
let mut state = state.write().await;
state.max = state.max.saturating_add(sz as usize);
update_bar!(to state);
},
CommandKind::BumpHigh(sz) => {
debug_assert!(sz <0);
let mut state = state.write().await;
state.max = state.max.saturating_sub(sz.abs() as usize);
update_bar!(to state);
},
CommandKind::Bump(sz) if sz >= 0 => {
let mut state = state.write().await;
state.cur = state.cur.saturating_add(sz as usize);
update_bar!(to state);
},
CommandKind::Bump(sz) => {
debug_assert!(sz <0);
let mut state = state.write().await;
state.cur = state.cur.saturating_sub(sz.abs() as usize);
update_bar!(to state);
},
CommandKind::Set{low: None, high: None} => (),
CommandKind::Set{low, high} => {
let mut state = state.write().await;
state.cur = low.unwrap_or(state.cur);
state.max = high.unwrap_or(state.max);
update_bar!(to state);
},
CommandKind::Line(line) => update_bar!(write line),
CommandKind::LineErr(line) => update_bar!(write error line),
CommandKind::AddTask(string) => {
send_response!(update_bar!(+task string title));
},
CommandKind::RemoveTask(id) => {
send_response!(update_bar!(-task id title));
},
CommandKind::Refresh => update_bar!(refresh),
CommandKind::Many(_) => unimplemented!(),
}
}
// Consume the bar and return
{
let mut bar = bar;
loop {
bar = match Arc::try_unwrap(bar) {
Ok(bar) => break bar,
Err(bar) => bar,
};
task::yield_now().await;
}.into_inner()
}
}, death.broadcast(true)).0
}))
}