From 40bfa1d01a0efa1b1421910eec38142b7a057094 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 22 Sep 2021 20:24:33 +0100 Subject: [PATCH] Worker written TODO: Preserve file ext MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for lazy-rebuild's current commit: Curse − 凶 --- src/ext.rs | 13 ++++++ src/handle.rs | 4 +- src/main.rs | 24 +++-------- src/work.rs | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+), 20 deletions(-) create mode 100644 src/work.rs diff --git a/src/ext.rs b/src/ext.rs index 63f971d..b35c530 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -78,3 +78,16 @@ where I: IntoIterator, } } } + +pub trait HexStringSliceExt +{ + fn to_hex_string(&self) -> String; +} + +impl HexStringSliceExt for T +where T: AsRef<[u8]> +{ + fn to_hex_string(&self) -> String { + self.as_ref().iter().copied().hex_string().collect() + } +} diff --git a/src/handle.rs b/src/handle.rs index e8d119e..aaf8e84 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -92,7 +92,7 @@ impl From<()> for Options /// /// # Returns /// The input stream and output stream `(tx, rx)`. -pub fn spawn_with_cancel(opt: Options, cancel: impl Future + 'static + Send + Unpin) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +pub fn spawn_with_cancel(opt: Options, cancel: impl Future + 'static + Send) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) { let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); let rx = file_handler(rx,cancel , match opt.max_operations { @@ -132,7 +132,7 @@ pub fn spawn(opt: Options) -> (mpsc::Sender, impl Stream(mut recv: mpsc::Receiver, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream)> + Unpin + Send + Sync + 'static -where C: Future + 'static + Unpin + Send +where C: Future + 'static + Send { let (r_tx, r_rx) = mpsc::channel(backpressure); diff --git a/src/main.rs b/src/main.rs index 16b3611..fd81cb4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,24 +6,12 @@ mod ext; use ext::*; mod pool; mod handle; +mod work; + #[tokio::main] -async fn main() { +async fn main() -> std::io::Result<()> { //TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default. - let (tx, mut rx) = handle::spawn(Default::default()); - let _res = tokio::join![ - tokio::spawn(async move { - for file in ["./src/main.rs", "./src/handle.rs", "./target", "./tet", "./"] - { - eprintln!("Sending {}", file); - tx.send(std::path::Path::new(file).into()).await.unwrap(); - } - }), - tokio::spawn(async move { - use futures::prelude::*; - while let Some((file, hash)) = rx.next().await { - println!("Got file hash for {:?}: {:?}", file, hash.map(|x| x.as_ref().iter().copied().hex_string().collect::())); - } - }), - ]; - eprintln!("Done: {:?}", _res); + + use futures::prelude::*; + work::start(std::env::args().skip(1), tokio::signal::ctrl_c().map(|_| ())).await } diff --git a/src/work.rs b/src/work.rs new file mode 100644 index 0000000..efc55f1 --- /dev/null +++ b/src/work.rs @@ -0,0 +1,107 @@ +use super::*; + +use futures::{ + Future, + Stream, StreamExt, +}; +use tokio::io::{ + AsyncReadExt, + AsyncWriteExt, +}; +use tokio::sync::{ + mpsc, +}; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::fs; + +#[derive(Debug, Clone)] +struct State +{ + handle_file: mpsc::Sender, +} + +async fn handle_file(state: State, file: PathBuf) -> io::Result<()> +{ + debug_assert!(file.is_file()); + + state.handle_file.send(file).await.unwrap(); + + Ok(()) +} + +#[inline(always)] fn handle_dir2(state: State, dir: PathBuf) -> futures::future::BoxFuture<'static, io::Result<()>> +{ + use futures::prelude::*; + + handle_dir(state, dir).boxed() +} + +async fn handle_dir(state: State, dir: impl AsRef) -> io::Result<()> +{ + debug_assert!(dir.as_ref().is_dir()); + let mut read = fs::read_dir(dir).await?; + while let Some(item) = read.next_entry().await? + { + let path = item.path(); + if path.is_file() { + tokio::spawn(handle_file(state.clone(), path)); + } else { + tokio::spawn(handle_dir2(state.clone(), path)); + } + } + Ok(()) +} + +pub async fn start(bases: I, cancel: impl Future + Send + 'static) -> io::Result<()> +where I: IntoIterator, + T: AsRef, +{ + let (tx, rx) = handle::spawn_with_cancel(Default::default(), cancel); + let renamer = tokio::spawn(async move { + use futures::prelude::*; + rx.for_each_concurrent(4, |(path, hash)| async move + { + match hash { + Ok(hash) => { + let new_name = path.parent().unwrap().join(hash.to_hex_string()); + if let Err(_) = tokio::spawn(async move { + match fs::rename(&path, &new_name).await + { + Ok(_) => println!("[.] {:?} -> {:?}", path, new_name), + Err(err) => println!("[!] {:?}: {}", path, err), + } + }).await { + eprintln!("[!] panic: rename"); + } + }, + Err(err) => { + eprintln!("[!] {:?}: {}", path, err); + }, + } + }).await; + }); + + let res = 'result: loop { + let state = State { handle_file: tx}; + let res = futures::future::join_all(bases.into_iter().map(|base| { + use futures::prelude::*; + if base.as_ref().is_dir() { + handle_dir(state.clone(), base).boxed_local() + } else { + handle_file(state.clone(), base.as_ref().to_owned()).boxed_local() + } + })).await; + for res in res { + match res { + Ok(_) => (), + Err(err) => break 'result Err(err), + } + } + break Ok(()); + }; + + assert!(renamer.await.is_ok(), "[!] fatal: renamer task panic"); + + res +}