From 6a2570a0c480cf049b1bb5e94ed188fa82d2689b Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 5 Nov 2020 15:20:03 +0000 Subject: [PATCH] start async progress --- Cargo.lock | 21 ++++ Cargo.toml | 1 + src/ext.rs | 13 ++ src/main.rs | 1 + src/progress/mod.rs | 292 ++++++++++++++++++++++++++++++++++++++++++++ src/util.rs | 14 +++ 6 files changed, 342 insertions(+) create mode 100644 src/progress/mod.rs diff --git a/Cargo.lock b/Cargo.lock index cd88fac..269d2c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -732,6 +732,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a14cd9f8c72704232f0bfc8455c0e861f0ad4eb60cc9ec8a170e231414c1e13" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "termprogress" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14" +dependencies = [ + "rustc_version", + "terminal_size", +] + [[package]] name = "thread_local" version = "1.0.1" @@ -871,6 +891,7 @@ dependencies = [ "rustc_version", "sha2", "smallmap", + "termprogress", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 46e4d71..c536df9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ futures = "0.3.6" smallmap = "^1.1.6" log = "0.4.11" pretty_env_logger = "0.4.0" +termprogress = "0.3.4" [build-dependencies] rustc_version = "0.2" diff --git a/src/ext.rs b/src/ext.rs index 016aaeb..6aa7382 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -175,3 +175,16 @@ where I: Iterator } } } + +pub trait IgnoreResultExt +{ + fn ignore(self); +} + +impl IgnoreResultExt for Result +{ + #[inline(always)] fn ignore(self) + { + //Do nothing + } +} diff --git a/src/main.rs b/src/main.rs index f8ffb84..2d08f98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ mod database; mod args; mod config; +mod progress; mod state; mod delete; mod restore; diff --git a/src/progress/mod.rs b/src/progress/mod.rs new file mode 100644 index 0000000..955b22c --- /dev/null +++ b/src/progress/mod.rs @@ -0,0 +1,292 @@ +//! Async progression +use super::*; +use termprogress::{ + prelude::*, +}; +use futures::{ + prelude::*, +}; +use tokio::{ + sync::{ + mpsc, + RwLock, + watch, + oneshot, + + RwLockReadGuard, + RwLockWriteGuard, + }, + task::{self, JoinHandle}, +}; +use std::{ + sync::{ + Weak, + }, + fmt, + error, +}; + +/// Command to send to worker task. +#[derive(Debug, Clone)] +pub enum CommandKind +{ + Line(String), + LineErr(String), + + Bump(isize), + BumpHigh(isize), + + Shutdown, + + Many(Vec), +} + +#[derive(Debug)] +enum CommandIter +{ + One(std::iter::Once), + Many(std::vec::IntoIter), +} +impl ExactSizeIterator for CommandIter{} + +impl Iterator for CommandIter +{ + type Item = CommandKind; + fn next(&mut self) -> Option + { + match self { + Self::One(one) => one.next(), + Self::Many(many) => many.next(), + } + } + + fn size_hint(&self) -> (usize, Option) + { + let sz = match self { + Self::One(_) => 1, + Self::Many(m) => m.len(), + }; + (sz, Some(sz)) + } +} +impl std::iter::FusedIterator for CommandIter{} + + +impl CommandKind +{ + /// Enumerate all possible commands if this is `Many`. + /// + /// The outputs may still contain `Many`. + fn enumerate(self) -> CommandIter + { + match self { + Self::Many(many) => CommandIter::Many(many.into_iter()), + other => CommandIter::One(std::iter::once(other)), + } + } +} + +#[derive(Debug, Clone)] +pub struct BarRef(Arc>); + +#[derive(Debug)] +struct Command(CommandKind, oneshot::Sender<()>); + +/// The bar's state +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct State +{ + max: usize, + cur: usize, + //TODO: Tasks +} + +impl State +{ + /// The current progress + pub fn prog(&self) -> f64 + { + (self.cur as f64) / (self.max as f64) + } +} + +/// A handle to a running async progress bar +#[derive(Debug, Clone)] +pub struct Handle +where B: ProgressBar, +{ + // Channel to send commands to the worker + chan: mpsc::Sender, + // A weak reference to the worker's bar itself + bar: Weak>, + // A strong reference to the bar's state + state: Arc>, + // Has the worker shut down? + dead: watch::Receiver, +} + +impl Handle +{ + /// Is the worker alive? + pub fn is_alive(&self) -> bool + { + self.bar.strong_count()>0 && !*self.dead.borrow() + } + + /// Yields until the worker shutds down gracefully + pub async fn closed(&mut self) -> Result<(),WorkerCommError> + { + loop { + match self.dead.recv().await { + Some(true) => return Ok(()), + None => return Err(WorkerCommError), + _ => continue, + } + } + } + + /// Send a command to the worker. + /// + /// Returns a future that completes to `Ok` when the worker successfully processes the command, and `Err` if the worker exits before processing it + pub async fn send_command(&mut self, command: CommandKind) -> Result>, WorkerCommError> + { + let (tx, rx) = oneshot::channel(); + self.chan.send(Command(command, tx)).await.map_err(|_| WorkerCommError)?; + + Ok(rx.map(|res| res.map_err(|_| WorkerCommError))) + } + + /// Send a command to the worker and then wait for it to be processed + pub async fn send_command_and_wait(&mut self, command: CommandKind) -> Result<(), WorkerCommError> + { + self.send_command(command).await?.await + } + + /// Send a command to the worker but do not wait for it to be processed + pub async fn send_command_and_detach(&mut self, command: CommandKind) -> Result<(), WorkerCommError> + { + let _ = self.send_command(command).await?; + Ok(()) + } + + /// Get a reference to the state + pub async fn state(&self) -> RwLockReadGuard<'_, State> + { + self.state.read().await + } + + /// Get an upgraded reference to the bar itself, if it still exists. + /// + /// This kinda messes things up... Hiding for now. + async fn bar_ref(&self) -> Result, WorkerCommError> + { + Ok(BarRef(self.bar.upgrade().ok_or(WorkerCommError)?)) + } +} + +/// Error communicating with worker +#[derive(Debug)] +pub struct WorkerCommError; + +impl fmt::Display for WorkerCommError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "failed to communicate with worker.") + } +} + +/// Host a progress bar detached +pub fn host(bar: B) -> (Handle, JoinHandle) +{ + let state = Arc::new(RwLock::new(Default::default())); + let (mut rx, death, bar, handle) = { + let (tx, rx) = mpsc::channel(24); + let (death, dead) = watch::channel(false); + let bar = Arc::new(RwLock::new(bar)); + let handle = Handle { + chan: tx, + dead, + bar: Arc::downgrade(&bar), + state: Arc::clone(&state), + }; + (rx,death,bar,handle) + }; + (handle, tokio::spawn(async move { + ({ + macro_rules! update_bar { + (to $state:ident $($tt:tt)*) => { + { + let mut bar = bar.write().await; + bar.set_progress($state.prog()); + update_bar!($($tt)*); + } + }; + (write error $line:ident $($tt:tt)*) => { + { + let bar = bar.read().await; + let string = &$line[..]; + bar.eprintln(string); + update_bar!($($tt)*); + } + }; + (write $(std)? $line:ident $($tt:tt)*) => { + { + let bar = bar.read().await; + let string = &$line[..]; + bar.println(string); + update_bar!($($tt)*); + } + }; + () => {}; + } + while let Some(Command(command, response)) = rx.recv().await { + let _response = util::defer(move || response.send(()).ignore()); + match command { + CommandKind::Shutdown => break, + CommandKind::BumpHigh(sz) if sz >= 0 => { + let mut state = state.write().await; + state.max = state.max.saturating_add(sz as usize); + + update_bar!(to state); + }, + CommandKind::BumpHigh(sz) => { + debug_assert!(sz <0); + let mut state = state.write().await; + state.max = state.max.saturating_sub(sz.abs() as usize); + + update_bar!(to state); + }, + CommandKind::Bump(sz) if sz >= 0 => { + let mut state = state.write().await; + state.cur = state.cur.saturating_add(sz as usize); + + update_bar!(to state); + }, + CommandKind::Bump(sz) => { + debug_assert!(sz <0); + let mut state = state.write().await; + state.cur = state.cur.saturating_sub(sz.abs() as usize); + + update_bar!(to state); + }, + CommandKind::Line(line) => update_bar!(write line), + CommandKind::LineErr(line) => update_bar!(write error line), + CommandKind::Many(_) => unimplemented!(), + } + } + + // Consume the bar and return + { + let mut bar = bar; + loop { + bar = match Arc::try_unwrap(bar) { + Ok(bar) => break bar, + Err(bar) => bar, + }; + task::yield_now().await; + }.into_inner() + } + }, death.broadcast(true)).0 + })) +} diff --git a/src/util.rs b/src/util.rs index a847e28..f11b172 100644 --- a/src/util.rs +++ b/src/util.rs @@ -121,3 +121,17 @@ impl NewWithCap for String (_, Some(x)) | (x, None) => T::with_capacity(x), } } + +/// Defer an action until drop +pub fn defer(fun: F) -> impl std::ops::Drop +{ + struct DropWrap(Option); + impl std::ops::Drop for DropWrap + { + fn drop(&mut self) + { + self.0.take().map(|x| x()); + } + } + DropWrap(Some(fun)) +}