start async progress

progress
Avril 4 years ago
parent 583b5fd0e6
commit 6a2570a0c4
Signed by: flanchan
GPG Key ID: 284488987C31F630

21
Cargo.lock generated

@ -732,6 +732,26 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "terminal_size"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a14cd9f8c72704232f0bfc8455c0e861f0ad4eb60cc9ec8a170e231414c1e13"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "termprogress"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14"
dependencies = [
"rustc_version",
"terminal_size",
]
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "1.0.1" version = "1.0.1"
@ -871,6 +891,7 @@ dependencies = [
"rustc_version", "rustc_version",
"sha2", "sha2",
"smallmap", "smallmap",
"termprogress",
"tokio", "tokio",
] ]

@ -30,6 +30,7 @@ futures = "0.3.6"
smallmap = "^1.1.6" smallmap = "^1.1.6"
log = "0.4.11" log = "0.4.11"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
termprogress = "0.3.4"
[build-dependencies] [build-dependencies]
rustc_version = "0.2" rustc_version = "0.2"

@ -175,3 +175,16 @@ where I: Iterator<Item = u8>
} }
} }
} }
pub trait IgnoreResultExt
{
fn ignore(self);
}
impl<T,E> IgnoreResultExt for Result<T,E>
{
#[inline(always)] fn ignore(self)
{
//Do nothing
}
}

@ -36,6 +36,7 @@ mod database;
mod args; mod args;
mod config; mod config;
mod progress;
mod state; mod state;
mod delete; mod delete;
mod restore; mod restore;

@ -0,0 +1,292 @@
//! Async progression
use super::*;
use termprogress::{
prelude::*,
};
use futures::{
prelude::*,
};
use tokio::{
sync::{
mpsc,
RwLock,
watch,
oneshot,
RwLockReadGuard,
RwLockWriteGuard,
},
task::{self, JoinHandle},
};
use std::{
sync::{
Weak,
},
fmt,
error,
};
/// Command to send to worker task.
#[derive(Debug, Clone)]
pub enum CommandKind
{
Line(String),
LineErr(String),
Bump(isize),
BumpHigh(isize),
Shutdown,
Many(Vec<CommandKind>),
}
#[derive(Debug)]
enum CommandIter
{
One(std::iter::Once<CommandKind>),
Many(std::vec::IntoIter<CommandKind>),
}
impl ExactSizeIterator for CommandIter{}
impl Iterator for CommandIter
{
type Item = CommandKind;
fn next(&mut self) -> Option<Self::Item>
{
match self {
Self::One(one) => one.next(),
Self::Many(many) => many.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>)
{
let sz = match self {
Self::One(_) => 1,
Self::Many(m) => m.len(),
};
(sz, Some(sz))
}
}
impl std::iter::FusedIterator for CommandIter{}
impl CommandKind
{
/// Enumerate all possible commands if this is `Many`.
///
/// The outputs may still contain `Many`.
fn enumerate(self) -> CommandIter
{
match self {
Self::Many(many) => CommandIter::Many(many.into_iter()),
other => CommandIter::One(std::iter::once(other)),
}
}
}
#[derive(Debug, Clone)]
pub struct BarRef<B>(Arc<RwLock<B>>);
#[derive(Debug)]
struct Command(CommandKind, oneshot::Sender<()>);
/// The bar's state
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct State
{
max: usize,
cur: usize,
//TODO: Tasks
}
impl State
{
/// The current progress
pub fn prog(&self) -> f64
{
(self.cur as f64) / (self.max as f64)
}
}
/// A handle to a running async progress bar
#[derive(Debug, Clone)]
pub struct Handle<B = Bar>
where B: ProgressBar,
{
// Channel to send commands to the worker
chan: mpsc::Sender<Command>,
// A weak reference to the worker's bar itself
bar: Weak<RwLock<B>>,
// A strong reference to the bar's state
state: Arc<RwLock<State>>,
// Has the worker shut down?
dead: watch::Receiver<bool>,
}
impl<B: ProgressBar> Handle<B>
{
/// Is the worker alive?
pub fn is_alive(&self) -> bool
{
self.bar.strong_count()>0 && !*self.dead.borrow()
}
/// Yields until the worker shutds down gracefully
pub async fn closed(&mut self) -> Result<(),WorkerCommError>
{
loop {
match self.dead.recv().await {
Some(true) => return Ok(()),
None => return Err(WorkerCommError),
_ => continue,
}
}
}
/// Send a command to the worker.
///
/// Returns a future that completes to `Ok` when the worker successfully processes the command, and `Err` if the worker exits before processing it
pub async fn send_command(&mut self, command: CommandKind) -> Result<impl Future<Output=Result<(), WorkerCommError>>, WorkerCommError>
{
let (tx, rx) = oneshot::channel();
self.chan.send(Command(command, tx)).await.map_err(|_| WorkerCommError)?;
Ok(rx.map(|res| res.map_err(|_| WorkerCommError)))
}
/// Send a command to the worker and then wait for it to be processed
pub async fn send_command_and_wait(&mut self, command: CommandKind) -> Result<(), WorkerCommError>
{
self.send_command(command).await?.await
}
/// Send a command to the worker but do not wait for it to be processed
pub async fn send_command_and_detach(&mut self, command: CommandKind) -> Result<(), WorkerCommError>
{
let _ = self.send_command(command).await?;
Ok(())
}
/// Get a reference to the state
pub async fn state(&self) -> RwLockReadGuard<'_, State>
{
self.state.read().await
}
/// Get an upgraded reference to the bar itself, if it still exists.
///
/// This kinda messes things up... Hiding for now.
async fn bar_ref(&self) -> Result<BarRef<B>, WorkerCommError>
{
Ok(BarRef(self.bar.upgrade().ok_or(WorkerCommError)?))
}
}
/// Error communicating with worker
#[derive(Debug)]
pub struct WorkerCommError;
impl fmt::Display for WorkerCommError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to communicate with worker.")
}
}
/// Host a progress bar detached
pub fn host<B: ProgressBar + Send + Sync + 'static>(bar: B) -> (Handle<B>, JoinHandle<B>)
{
let state = Arc::new(RwLock::new(Default::default()));
let (mut rx, death, bar, handle) = {
let (tx, rx) = mpsc::channel(24);
let (death, dead) = watch::channel(false);
let bar = Arc::new(RwLock::new(bar));
let handle = Handle {
chan: tx,
dead,
bar: Arc::downgrade(&bar),
state: Arc::clone(&state),
};
(rx,death,bar,handle)
};
(handle, tokio::spawn(async move {
({
macro_rules! update_bar {
(to $state:ident $($tt:tt)*) => {
{
let mut bar = bar.write().await;
bar.set_progress($state.prog());
update_bar!($($tt)*);
}
};
(write error $line:ident $($tt:tt)*) => {
{
let bar = bar.read().await;
let string = &$line[..];
bar.eprintln(string);
update_bar!($($tt)*);
}
};
(write $(std)? $line:ident $($tt:tt)*) => {
{
let bar = bar.read().await;
let string = &$line[..];
bar.println(string);
update_bar!($($tt)*);
}
};
() => {};
}
while let Some(Command(command, response)) = rx.recv().await {
let _response = util::defer(move || response.send(()).ignore());
match command {
CommandKind::Shutdown => break,
CommandKind::BumpHigh(sz) if sz >= 0 => {
let mut state = state.write().await;
state.max = state.max.saturating_add(sz as usize);
update_bar!(to state);
},
CommandKind::BumpHigh(sz) => {
debug_assert!(sz <0);
let mut state = state.write().await;
state.max = state.max.saturating_sub(sz.abs() as usize);
update_bar!(to state);
},
CommandKind::Bump(sz) if sz >= 0 => {
let mut state = state.write().await;
state.cur = state.cur.saturating_add(sz as usize);
update_bar!(to state);
},
CommandKind::Bump(sz) => {
debug_assert!(sz <0);
let mut state = state.write().await;
state.cur = state.cur.saturating_sub(sz.abs() as usize);
update_bar!(to state);
},
CommandKind::Line(line) => update_bar!(write line),
CommandKind::LineErr(line) => update_bar!(write error line),
CommandKind::Many(_) => unimplemented!(),
}
}
// Consume the bar and return
{
let mut bar = bar;
loop {
bar = match Arc::try_unwrap(bar) {
Ok(bar) => break bar,
Err(bar) => bar,
};
task::yield_now().await;
}.into_inner()
}
}, death.broadcast(true)).0
}))
}

@ -121,3 +121,17 @@ impl NewWithCap for String
(_, Some(x)) | (x, None) => T::with_capacity(x), (_, Some(x)) | (x, None) => T::with_capacity(x),
} }
} }
/// Defer an action until drop
pub fn defer<F: FnOnce()>(fun: F) -> impl std::ops::Drop
{
struct DropWrap<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> std::ops::Drop for DropWrap<F>
{
fn drop(&mut self)
{
self.0.take().map(|x| x());
}
}
DropWrap(Some(fun))
}

Loading…
Cancel
Save