From fbe6fa399a138ca7d509d476c2a87fcd4d55b2df Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 10 Aug 2020 17:21:57 +0100 Subject: [PATCH] progress command builder --- src/main.rs | 1 + src/maybe_single.rs | 120 +++++++++++++++++++++++++++++++ src/progress.rs | 172 ++++++++++++++++++++++++++++++++++++++++---- src/stage.rs | 16 +++++ 4 files changed, 294 insertions(+), 15 deletions(-) create mode 100644 src/maybe_single.rs diff --git a/src/main.rs b/src/main.rs index df420bc..6b1187a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ mod fixed_stack; mod process; mod work; +#[cfg(feature="progress")] mod maybe_single; #[cfg(feature="progress")] mod task_list; #[cfg(feature="progress")] mod progress; diff --git a/src/maybe_single.rs b/src/maybe_single.rs new file mode 100644 index 0000000..9ab6700 --- /dev/null +++ b/src/maybe_single.rs @@ -0,0 +1,120 @@ +//! A one-or-more iterator type + +use std::{ + iter::{self, Once,FromIterator,Extend}, +}; + +pub enum IntoIter +{ + Many(std::vec::IntoIter), + Single(Once), +} + +/// A type that might hold once or more values +pub enum MaybeSingle { + Single(Option), + Many(Vec), +} + +impl Extend for MaybeSingle +{ + fn extend>(&mut self, iter: I) + { + match self { + Self::Single(single) => { + *self = Self::Many(iter::once(single.take().unwrap()).chain(iter).collect()); + }, + Self::Many(ref mut many) => { + many.extend(iter); + }, + } + } +} + +impl MaybeSingle { + pub fn is_single(&self) -> bool + { + if let Self::Single(_) = &self { + true + } else { + false + } + } + + pub fn push(&mut self, value: T) + { + match self { + Self::Single(single) => { + let first = single.take().unwrap(); + *self = Self::Many(vec![first, value]); + }, + Self::Many(ref mut many) => { + many.push(value); + } + } + } + + pub const fn single(from: T) -> Self + { + Self::Single(Some(from)) + } +} + +impl FromIterator for MaybeSingle +{ + fn from_iter>(iter: I) -> Self + { + let iter = iter.into_iter(); + let mut vec = match iter.size_hint() { + (0, None) => Vec::new(), + (_, Some(value)) | (value, None) => Vec::with_capacity(value), + }; + vec.extend(iter); + Self::Many(vec) + } +} + +impl ExactSizeIterator for IntoIter{} + +impl From> for MaybeSingle +{ + fn from(from: Vec) -> Self + { + Self::Many(from) + } +} + +impl IntoIterator for MaybeSingle +{ + type Item = T; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter + { + match self { + Self::Single(Some(single)) => IntoIter::Single(iter::once(single)), + Self::Many(many) => IntoIter::Many(many.into_iter()), + _ => panic!("Invalid state"), + } + } +} + +impl Iterator for IntoIter +{ + type Item = T; + fn next(&mut self) -> Option + { + match self { + Self::Many(ref mut many) => many.next(), + Self::Single(ref mut once) => once.next(), + } + } + + fn size_hint(&self) -> (usize, Option) + { + match self { + Self::Many(many) => many.size_hint(), + Self::Single(single) => (0, Some(1)), + } + } +} diff --git a/src/progress.rs b/src/progress.rs index 584eb35..2e5c514 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -29,6 +29,7 @@ use tokio::{ self, JoinHandle, }, + stream::StreamExt as _, }; use futures::{ future::{ @@ -42,14 +43,14 @@ use termprogress::{ }; #[derive(Debug)] -struct Task +pub struct Task { name: String, idx: oneshot::Sender, } #[derive(Debug)] -enum CommandKind +pub enum CommandKind { BumpHigh(usize), BumpLow(usize), @@ -142,6 +143,115 @@ impl Future for TaskWaiter } } +pub struct CommandBuilder<'a> +{ + sender: &'a mut ProgressSender, + commands: Option>, +} + +impl<'a> CommandBuilder<'a> +{ + fn send_command(&mut self, comm: CommandKind) + { + if let Some(ref mut commands) = self.commands { + commands.push(comm); + } else { + self.commands = Some(maybe_single::MaybeSingle::single(comm)); + } + } + + /// Add another Builder to this one + #[inline] pub fn chain>(mut self, other: I) -> Self + { + self.extend(other); + self + } + + // Commands + /// Print line on the worker's progress bar + pub fn println(mut self, line: impl Into) -> Self + { + self.send_command(CommandKind::PrintLine(line.into())); + self + } + + /// Print error line on the worker's progress bar + pub fn eprintln(mut self, line: impl Into) -> Self + { + self.send_command(CommandKind::PrintLineErr(line.into())); + self + } + + /// Increase the worker's max number + pub fn bump_max(mut self, by: usize) -> Self + { + self.send_command(CommandKind::BumpHigh(by)); + self + } + + /// Increase the worker's min number + pub fn bump_min(mut self, by: usize) -> Self + { + self.send_command(CommandKind::BumpLow(by)); + self + } + + /// Remove a task by ID. + pub fn remove_task(mut self, task_idx: usize) -> Self + { + self.send_command(CommandKind::RemoveTask(task_idx)); + self + } + + /// Signal a shutdown to the worker + pub fn shutdown(mut self) -> Self + { + self.send_command(CommandKind::Complete); + self + } + + /// Send this as an atom, + pub async fn send(self) -> Result + { + self.sender.send_command(match self.commands { + Some(maybe_single::MaybeSingle::Single(Some(single))) => { + single + }, + Some(maybe_single::MaybeSingle::Many(many)) => { + CommandKind::Many(many) + }, + _ => return Err(Error::NoCommands), + }).await + } +} + +impl<'a> Extend for CommandBuilder<'a> +{ + fn extend>(&mut self, other: I) + { + match self.commands { + Some(ref mut commands) => commands.extend(other), + _ => self.commands = Some(other.into_iter().collect()), + }; + } +} + +impl<'a> IntoIterator for CommandBuilder<'a> +{ + type Item= CommandKind; + type IntoIter = maybe_single::IntoIter; + + fn into_iter(self) -> Self::IntoIter + { + match self.commands { + Some(value) => value.into_iter(), + None => maybe_single::MaybeSingle::from(vec![]).into_iter(), + } + } +} + + + impl ProgressSender { /// Get the status of this progress @@ -221,6 +331,15 @@ impl ProgressSender { self.send_command(CommandKind::Complete).await } + + /// Create a new command builder + pub fn builder<'a>(&'a mut self) -> CommandBuilder<'a> + { + CommandBuilder{ + commands: None, + sender: self, + } + } } /// Status of the worker @@ -273,29 +392,47 @@ pub fn create_progress + where T: IntoIterator { Single(Once), - Many(Vec), + Many(::IntoIter), } - impl From for MaybeSingle + impl ExactSizeIterator for MaybeSingle + where T: IntoIterator, + ::IntoIter: ExactSizeIterator{} + + impl Iterator for MaybeSingle + where T: IntoIterator, + ::IntoIter: ExactSizeIterator { - fn from(from: CommandKind) -> Self + type Item = ::Item; + fn next(&mut self) -> Option { - match from { - CommandKind::Many(many) => Self::Many(many), - x => Self::Single(iter::once(x)), + match self { + Self::Many(ref mut many) => many.next(), + Self::Single(ref mut single) => single.next(), } } - } - - //TODO: IntoIterator for Maybesingle??? We'll need our own type, I think. Maybe generalise it, and put in ext or util? - let command = MaybeSingle::from(command); + fn size_hint(&self) -> (usize, Option) + { + match &self { + Self::Many(many) => (many.len(), Some(many.len())), + Self::Single(_) => (1, Some(1)), + } + } + } - // Match the command - for command in command.into_iter() { + let mut commands: stage::Stage = { + let command: MaybeSingle> = match command { + CommandKind::Many(many) => MaybeSingle::Many(many.into_iter()), + other => MaybeSingle::Single(iter::once(other)), + }; + stage::Stage::from_iter(command) + }; + while let Some(command) = commands.next().await { match command { CommandKind::BumpHigh(high) => { let stat = stat.to_mut(); @@ -329,6 +466,9 @@ pub fn create_progress { break; }, + CommandKind::Many(many) => { + let _ = commands.sender().send_many(many).await; + }, } } @@ -358,6 +498,7 @@ pub fn create_progress) -> std::fmt::Result { match self { + Error::NoCommands => write!(f, "an attempt was made to send 0 commands"), Error::WorkerPanic => write!(f, "worker panicked"), Error::WorkerDropped => write!(f, "tried to communicate with dropped worker"), _ => write!(f, "unknown error"), diff --git a/src/stage.rs b/src/stage.rs index 78c7a97..bfd2298 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -38,6 +38,22 @@ impl StageSender } } } + + /// Send many values at once + pub async fn send_many>(&self, values: I) -> Result<(), I> + { + loop { + if let Some(internal) = self.internal.upgrade() { + let mut write = internal.write().await; + for item in values.into_iter() { + write.push_back(item); + } + break Ok(()); + } else { + break Err(values); + } + } + } } impl Stage