From 283a2712c3a8d946b5d31e5616db9d3db4e83d33 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 10 Aug 2020 00:46:46 +0100 Subject: [PATCH] added async progress module --- src/.#progress.rs | 1 - src/defer.rs | 39 ++++++ src/main.rs | 2 + src/progress.rs | 313 +++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 348 insertions(+), 7 deletions(-) delete mode 120000 src/.#progress.rs create mode 100644 src/defer.rs diff --git a/src/.#progress.rs b/src/.#progress.rs deleted file mode 120000 index 5d9d145..0000000 --- a/src/.#progress.rs +++ /dev/null @@ -1 +0,0 @@ -avril@flan-laptop.204627:1596751578 \ No newline at end of file diff --git a/src/defer.rs b/src/defer.rs new file mode 100644 index 0000000..aa51a2e --- /dev/null +++ b/src/defer.rs @@ -0,0 +1,39 @@ +//! Defer a close to be ran on drop. + +use std::{ + ops::Drop, +}; + +pub struct Defer(Option) +where F: FnOnce() -> (); + +impl Defer +where F: FnOnce() +{ + #[cfg(nightly)] + #[inline] pub const fn new(from: F) -> Self + { + Self(Some(from)) + } + #[cfg(not(nightly))] + #[inline] pub fn new(from: F) -> Self + { + Self(Some(from)) + } + pub fn forget(mut self) -> F + { + let value = self.0.take(); + std::mem::forget(self); + value.unwrap() + } + #[inline] pub fn now(self) {} +} + +impl Drop for Defer +where F: FnOnce() +{ + fn drop(&mut self) + { + self.0.take().unwrap()(); + } +} diff --git a/src/main.rs b/src/main.rs index 2747854..df420bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ #![cfg_attr(nightly, feature(int_error_matching))] #![cfg_attr(nightly, feature(linked_list_remove))] +#![cfg_attr(nightly, feature(const_fn))] #![allow(dead_code)] +mod defer; mod ext; pub use ext::JoinStrsExt as _; diff --git a/src/progress.rs b/src/progress.rs index d0ef75a..82c182c 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -1,48 +1,349 @@ use super::*; use std::{ iter::FromIterator as _, + sync::Arc, + borrow::{ + Cow, + Borrow as _, + }, + marker::{ + Send, + }, + task::{ + Context, + Poll, + }, + pin::Pin, + }; use tokio::{ sync::{ oneshot, watch, + mpsc, }, task::{ self, JoinHandle, }, }; +use futures::{ + future::{ + Future, + }, +}; use termprogress::{ Display as _, WithTitle, ProgressBar, }; +#[derive(Debug)] +struct Task +{ + name: String, + idx: oneshot::Sender, +} + #[derive(Debug)] enum CommandKind { + BumpHigh(usize), + BumpLow(usize), + PrintLine(String), + PrintLineErr(String), + + AddTask(Task), + RemoveTask(usize), + + Complete, } #[derive(Debug)] -pub struct Command +struct Command { comm: CommandKind, comp: oneshot::Sender<()>, } +/// The type used to communicate with the progress worker #[derive(Debug, Clone)] pub struct ProgressSender { - shutdown: watch::Receiver>, + shutdown: watch::Receiver>>, + command: mpsc::Sender, + + stat: watch::Receiver, +} + +fn create_command(kind: CommandKind) -> (Command, oneshot::Receiver<()>) +{ + let (tx, rx) = oneshot::channel(); + + (Command{comm: kind, comp: tx}, rx) +} + +/// A future that completes when the worker has processed the command. +/// +/// Completes to `Err(Error::WorkerDropped)` if worker dropped before reading the command, OR the command has not been sent. +/// +/// # Panics +/// Awaiting on this multiple times will cause it to panic +#[derive(Debug)] +pub struct CommandWaiter(Option>); + +impl Future for CommandWaiter +{ + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll + { + let future = async { + self.0.take().unwrap().await.map_err(|_| Error::WorkerDropped) + }; + tokio::pin!(future); + future.poll(cx) + } +} + +/// A future that completes when the worker has processed the task adding command, and returned a value. +/// +/// Completes to `Err(Error::WorkerDropped)` the same way `CommandWaiter` will, and additionally, if the task index is not returned. +/// +/// # Panics +/// Awaiting on this multiple times will cause it to panic +#[derive(Debug)] +pub struct TaskWaiter(Option>,CommandWaiter); + +impl Future for TaskWaiter +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll + { + let future = async { + tokio::select! { + idx = self.0.take().unwrap() => { + return idx.map_err(|_| Error::WorkerDropped); + } + _ = &mut self.1 => { + return Err(Error::WorkerDropped); + } + } + }; + tokio::pin!(future); + future.poll(cx) + } +} + +impl ProgressSender +{ + /// Get the status of this progress + pub fn status(&self) -> Stat + { + self.stat.borrow().clone() + } + + /// Wait for shutdown without signalling it + /// + /// # Notes + /// Will return `Err(Error::WorkerDropped)` if this is called after the worker has already been dropped. + pub async fn wait(&mut self) -> Result<(), Error> + { + loop { + match self.shutdown.recv().await { + Some(Some(res)) => return res, + None => return Err(Error::WorkerDropped), + _ => {}, + } + } + } + + async fn send_command(&mut self, comm: CommandKind) -> Result + { + let (comm, rx) = create_command(comm); + + self.command.send(comm).await.map_err(|_| Error::WorkerDropped)?; + Ok(CommandWaiter(Some(rx))) + } + + // Commands + /// Print line on the worker's progress bar + pub async fn println(&mut self, line: impl Into) -> Result + { + self.send_command(CommandKind::PrintLine(line.into())).await + } + + /// Print error line on the worker's progress bar + pub async fn eprintln(&mut self, line: impl Into) -> Result + { + self.send_command(CommandKind::PrintLineErr(line.into())).await + } + + /// Increase the worker's max number + pub async fn bump_max(&mut self, by: usize) -> Result + { + self.send_command(CommandKind::BumpHigh(by)).await + } + + /// Increase the worker's min number + pub async fn bump_min(&mut self, by: usize) -> Result + { + self.send_command(CommandKind::BumpLow(by)).await + } + + /// Add a task to the worker's progress bar title line + /// + /// This function returns a [TaskWaiter]`TaskWaiter` future, upon successful `await`ing will yield the task's ID. + pub async fn add_task(&mut self, name: impl Into) -> Result + { + let (tx, rx) = oneshot::channel(); + + let com = self.send_command(CommandKind::AddTask(Task{name: name.into(), idx: tx})).await?; + + Ok(TaskWaiter(Some(rx), com)) + } + + /// Remove a task by ID. + pub async fn remove_task(&mut self, task_idx: usize) -> Result + { + self.send_command(CommandKind::RemoveTask(task_idx)).await + } + + /// Signal a shutdown to the worker + pub async fn shutdown(&mut self) -> Result + { + self.send_command(CommandKind::Complete).await + } +} + +/// Status of the worker +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Stat +{ + pub high: usize, + pub low: usize, +} + +impl Stat +{ + #[inline] fn get_pct(&self) -> f64 + { + self.low as f64 / (self.high as f64) + } } /// Create the async progress counter and return a sender object and a handle to join the worker task. -pub async fn create_progress>(high: usize, tasks: I) -> (ProgressSender, JoinHandle<()>) +pub fn create_progress>(high: usize, tasks: I) -> (ProgressSender, JoinHandle>) { - let list = task_list::TaskList::from_iter(tasks); + let mut list = task_list::TaskList::from_iter(tasks); let mut progress = P::with_title(50, &list); - todo!() + let (shutdown_tx, shutdown_rx) = watch::channel(None); + let (tx, mut rx) = mpsc::channel::(16); + + let stat = Stat { + high: high, + low: 0, + }; + let (stat_tx, stat_rx) = watch::channel(stat.clone()); + + + let handle = { + let handle = task::spawn(async move { + // To handle worker panics, spawn a seperate task + + let res = task::spawn(async move { + //Actual worker + let mut last_stat = stat; + while let Some(command) = rx.recv().await { + let mut stat= Cow::Borrowed(&last_stat); + let (command, _defer) = { + let Command{comm, comp} = command; + let d = defer::Defer::new(|| { + let _ =comp.send(()); + }); + (comm, d) + }; + + // Match the command + match command { + CommandKind::BumpHigh(high) => { + let stat = stat.to_mut(); + stat.high+=high; + progress.set_progress(stat.get_pct()); + }, + CommandKind::BumpLow(low) => { + let stat = stat.to_mut(); + stat.low+=low; + progress.set_progress(stat.get_pct()); + }, + CommandKind::PrintLine(line) => { + progress.println(&line[..]); + }, + CommandKind::PrintLineErr(line) => { + progress.eprintln(&line[..]); + }, + CommandKind::AddTask(task) => { + let idx = list.push(task.name); + if let Err(_) = task.idx.send(idx) { + list.pop(idx); + } else { + progress.set_title(list.as_ref()); + } + }, + CommandKind::RemoveTask(task) => { + if list.pop(task) { + progress.set_title(list.as_ref()); + } + }, + CommandKind::Complete => { + break; + }, + } + + if let Cow::Owned(stat) = std::mem::replace(&mut stat, Cow::Borrowed(&last_stat /* wtf? how is this legal? idk*/)) { + //It's been written to + let _ = stat_tx.broadcast(stat.clone()); + last_stat = stat; + } + + } + }).await.map_err(|_| Error::WorkerPanic); + shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters"); + res + }); + handle + }; + + let prog = ProgressSender { + shutdown: shutdown_rx, + command: tx, + stat: stat_rx, + }; + + (prog, handle) +} + +#[derive(Debug,Clone)] +pub enum Error +{ + WorkerPanic, + WorkerDropped, + Unknown, +} + +impl std::error::Error for Error{} +impl std::fmt::Display for Error +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result + { + match self { + Error::WorkerPanic => write!(f, "worker panicked"), + Error::WorkerDropped => write!(f, "tried to communicate with dropped worker"), + _ => write!(f, "unknown error"), + } + } }