From 5b975333f1b352e212d6005d25b523e2b6002ab9 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 13 Jul 2020 01:26:09 +0100 Subject: [PATCH] shit just werks --- TODO | 1 + src/main.rs | 1 + src/work_async.rs | 53 +++++--- src/work_async/progress.rs | 225 +++++++++++++++++++++++++++++++ src/work_async/progress/error.rs | 41 ++++++ src/work_async/tasklist.rs | 83 ++++++++++++ 6 files changed, 384 insertions(+), 20 deletions(-) create mode 100644 TODO create mode 100644 src/work_async/progress.rs create mode 100644 src/work_async/progress/error.rs create mode 100644 src/work_async/tasklist.rs diff --git a/TODO b/TODO new file mode 100644 index 0000000..3a0de40 --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +Remove Cargo.lock when termprogress is uploaded and Cargo.toml is properly formatted diff --git a/src/main.rs b/src/main.rs index 72f4c54..1f8db96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(linked_list_remove)] #![allow(dead_code)] use termprogress::{ diff --git a/src/work_async.rs b/src/work_async.rs index 6903700..f6aaf4b 100644 --- a/src/work_async.rs +++ b/src/work_async.rs @@ -9,57 +9,66 @@ use tokio::{ prelude::*, stream::StreamExt, }; -use termprogress::{ - Display, ProgressBar, Spinner, progress, spinner, -}; + +mod tasklist; +#[macro_use] +mod progress; //TODO: Create a progress task module for atomically printing infos and stuff //TODO: Create a module for temp files, pass the temp file to `perform` and do the regex fixing after `perform` /// Download a loli async -pub async fn perform(url: impl AsRef, path: impl AsRef) -> Result +pub async fn perform(url: impl AsRef, path: impl AsRef, mut progress: progress::CommandSender) -> Result<(), error::Error> { let url = url.as_ref(); let path = path.as_ref(); - let mut progress = spinner::Spin::with_title("Starting request...", Default::default()); - progress.refresh(); + let task = url.to_owned(); //TODO: Real task name + progress.push_task(&task).await; + let mut resp = reqwest::get(url).await?; - progress.bump(); - progress.set_title(&format!("Starting download to {:?}...", path)); + let len = resp.content_length(); + + let mut file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(path).await?; - let len = resp.content_length(); - let mut version_inc = || { - progress.bump(); - }; - version_inc(); - + progress.println(format!("req ok {:?}, file ok ({:?}), starting download", len, path)).await; let mut bytes = resp.bytes_stream(); while let Some(buffer) = bytes.next().await { file.write(buffer?.as_ref()).await?; - version_inc(); + progress.bump().await; } - progress.complete_with("OK"); + progress.println(format!("done for {}", url)).await; + progress.bump().await; + + progress.pop_task(task).await; - loop{} + Ok(()) } pub async fn work(conf: config::Config) -> Result<(), Box> { let rating = conf.rating; let mut children = Vec::new(); + + let prog = progress::AsyncProgressCounter::new("Initialising..."); + let mut prog_writer = prog.writer(); + + let prog = prog.host(); for path in conf.output.into_iter() { let url = url::parse(&rating); + let mut prog = prog_writer.clone(); children.push(tokio::task::spawn(async move { - println!("Starting download ({})...", url); + + //println!("Starting download ({})...", url); + prog.println(format!("Starting download ({})...", url)).await.expect("fatal"); let path = match path { config::OutputType::File(file) => file, config::OutputType::Directory(dir) => { @@ -67,13 +76,14 @@ pub async fn work(conf: config::Config) -> Result<(), Box unimplemented!(); }, }; - match perform(&url, &path).await { - Err(e) => panic!("Failed downloading {} -> {:?}: {}", url, path, e), + match perform(&url, &path, prog).await { + Err(e) => panic!("Failed downloading {} -> {:?}: {}", url, path, e), //TODO: Make real error handler Ok(v) => v, } })); } + prog_writer.println("Waiting for children...").await; for child in children.into_iter() { match child.await { @@ -84,5 +94,8 @@ pub async fn work(conf: config::Config) -> Result<(), Box } } + prog_send!(try link prog_writer.kill()); + prog.await.expect("fatal"); + Ok(()) } diff --git a/src/work_async/progress.rs b/src/work_async/progress.rs new file mode 100644 index 0000000..b54d403 --- /dev/null +++ b/src/work_async/progress.rs @@ -0,0 +1,225 @@ +use super::*; +use tokio::{ + sync::{ + mpsc::{ + channel, + unbounded_channel, + Sender, + Receiver, + UnboundedReceiver, + UnboundedSender + }, + Mutex, + }, + task::{ + self, + JoinHandle, + }, +}; + +use termprogress::{ + spinner, + Display, + Spinner, +}; + +use std::{ + sync::Arc, + fmt, +}; + +enum CommandInternal +{ + PrintLine(String), + Bump, + Kill(Option), + PushTask(String), + PopTask(String), +} + +struct Command +{ + internal: CommandInternal, + callback: UnboundedSender<()>, +} + +impl std::ops::Drop for Command +{ + fn drop(&mut self) + { + let _ = self.callback.send(()); + } +} + +pub struct AsyncProgressCounter +{ + writer: Sender, + reader: Receiver, + title: String, +} + +#[derive(Clone)] +pub struct CommandSender(Sender); + +pub struct CommandCallback(UnboundedReceiver<()>); + +pub mod error; + +impl Command +{ + pub fn new(internal: CommandInternal) -> (Self, CommandCallback) + { + let (tx, rx) = unbounded_channel(); + (Self { + internal, + callback: tx, + }, CommandCallback(rx)) + } +} + +macro_rules! prog_send { + (try link unwind $expression:expr) => { + { + $expression.await.expect("fatal").wait().await.expect("fatal") + } + }; + (try link $expression:expr) => { + { + $expression.await?.wait().await? + } + }; + (try unind $expression:expr) => { + { + let _ = $expression.await.expect("fatal"); + } + }; + (try $expression:expr) => { + { + let _ =$expression.await?; + } + }; + ($expression:expr) => { + { + let _ = $expression.await; + } + }; +} + +impl CommandSender +{ + /// Print a line + pub async fn println(&mut self, string: impl Into) -> Result + { + let (com, call) = Command::new(CommandInternal::PrintLine(string.into())); + self.0.send(com).await?; + Ok(call) + } + + /// Finalise the progress counter + pub async fn bump(&mut self) -> Result + { + let (com, call) = Command::new(CommandInternal::Bump); + self.0.send(com).await?; + Ok(call) + } + + /// Finalise the progress counter + pub async fn kill(&mut self) -> Result + { + let (com, call) = Command::new(CommandInternal::Kill(None)); + self.0.send(com).await?; + Ok(call) + } + + /// Finalise the progress counter and print a line + pub async fn kill_with(&mut self, string: String) -> Result + { + let (com, call) = Command::new(CommandInternal::Kill(Some(string))); + self.0.send(com).await?; + Ok(call) + } + + + /// Remove a task + pub async fn pop_task(&mut self, string: impl Into) -> Result + { + let (com, call) = Command::new(CommandInternal::PopTask(string.into())); + self.0.send(com).await?; + Ok(call) + } + + /// Add a task + pub async fn push_task(&mut self, string: impl Into) -> Result + { + let (com, call) = Command::new(CommandInternal::PushTask(string.into())); + self.0.send(com).await?; + Ok(call) + } +} + +impl CommandCallback +{ + /// Wait until the command has been processed. + pub async fn wait(mut self) -> Result<(), error::Error> + { + self.0.recv().await.ok_or(error::Error::RecvError) + } +} + +impl AsyncProgressCounter +{ + /// Create a new `AsyncProgressCounter` + pub fn new(title: impl Into) -> Self + { + let (tx, rx) = channel(16); + + Self { + reader: rx, + writer: tx, + title: title.into(), + } + } + + /// Create a new writer + pub fn writer(&self) -> CommandSender + { + CommandSender(self.writer.clone()) + } + + /// Consume the instance and host it's receiver + pub fn host(mut self) -> JoinHandle<()> + { + let mut spin = spinner::Spin::with_title(&self.title[..], Default::default()); + + let mut task = tasklist::TaskList::new(); + + task::spawn(async move { + while let Some(com) = self.reader.recv().await { + match &com.internal { + CommandInternal::PrintLine(line) => spin.println(&line[..]), + CommandInternal::Bump => { + spin.bump(); //TODO: We'll have to change this when we change to real progress + }, + CommandInternal::Kill(Some(line)) => { + spin.complete_with(line.as_str()); + self.reader.close(); + break; + }, + CommandInternal::Kill(_) => { + spin.complete(); + self.reader.close(); + break; + }, + CommandInternal::PushTask(tstr) => { + task.push(tstr); + spin.set_title(task.as_str()); + }, + CommandInternal::PopTask(tstr) => { + task.pop_value(tstr); + spin.set_title(task.as_str()); + }, + } + } + }) + } +} diff --git a/src/work_async/progress/error.rs b/src/work_async/progress/error.rs new file mode 100644 index 0000000..88049ed --- /dev/null +++ b/src/work_async/progress/error.rs @@ -0,0 +1,41 @@ +use super::*; +use tokio::{ + sync::{ + mpsc::{ + error::SendError, + }, + }, +}; +use std::{ + fmt, +}; + +#[derive(Debug)] +pub enum Error +{ + SendError, + RecvError, + Unknown, +} + +impl std::error::Error for Error{} +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "sync (fatal): ")?; + match self { + Error::SendError => write!(f, "there was an mpsc send error"), + Error::RecvError => write!(f, "there was an mpsc recv error"), + _ => write!(f, "unknown error"), + } + } +} + +impl From> for Error +{ + fn from(_er: SendError) -> Self + { + Error::SendError + } +} diff --git a/src/work_async/tasklist.rs b/src/work_async/tasklist.rs new file mode 100644 index 0000000..0daba12 --- /dev/null +++ b/src/work_async/tasklist.rs @@ -0,0 +1,83 @@ + +use super::*; +use std::{ + collections::LinkedList, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TaskList(LinkedList<(usize, String)>, String, usize); + +fn find(list: &LinkedList, mut fun: F) -> Option + where F: FnMut(&T) -> bool +{ + for (i, x) in (0..).zip(list.iter()) + { + if fun(x) { + return Some(i); + } + } + None +} + +impl TaskList +{ + /// Create a new tasklist + pub fn new() -> Self + { + Self(LinkedList::new(), String::new(), 0) + } + + pub fn push(&mut self, string: impl Into) -> usize + { + let idx = { + self.2 += 1; + self.2 + }; + self.0.push_front((idx, string.into())); + self.recalc_buffer(); + idx + } + + pub fn pop(&mut self, idx: usize) -> bool + { + if let Some(idx) = find(&self.0, |&(i, _)| idx == i) + { + self.0.remove(idx); + self.recalc_buffer(); + true + } else { + false + } + } + + pub fn pop_value(&mut self, string: impl AsRef) -> bool + { + let string = string.as_ref(); + if let Some(idx) = find(&self.0, |(_, s)| s.as_str() == string) + { + self.0.remove(idx); + self.recalc_buffer(); + true + } else { + false + } + } + + pub fn recalc_buffer(&mut self) + { + self.1 = self.0.iter().map(|(_, s)| s.as_str()).collect(); + } + + pub fn as_str(&self) -> &str + { + &self.1[..] + } +} + +impl AsRef for TaskList +{ + fn as_ref(&self) -> &str + { + self.as_str() + } +}