Worker written

TODO: Preserve file ext

Fortune for lazy-rebuild's current commit: Curse − 凶
rust-version
Avril 3 years ago
parent 24c0fcb584
commit 40bfa1d01a

@ -78,3 +78,16 @@ where I: IntoIterator,
} }
} }
} }
pub trait HexStringSliceExt
{
fn to_hex_string(&self) -> String;
}
impl<T> HexStringSliceExt for T
where T: AsRef<[u8]>
{
fn to_hex_string(&self) -> String {
self.as_ref().iter().copied().hex_string().collect()
}
}

@ -92,7 +92,7 @@ impl From<()> for Options
/// ///
/// # Returns /// # Returns
/// The input stream and output stream `(tx, rx)`. /// The input stream and output stream `(tx, rx)`.
pub fn spawn_with_cancel(opt: Options, cancel: impl Future<Output = ()> + 'static + Send + Unpin) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static) pub fn spawn_with_cancel(opt: Options, cancel: impl Future<Output = ()> + 'static + Send) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{ {
let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
let rx = file_handler(rx,cancel , match opt.max_operations { let rx = file_handler(rx,cancel , match opt.max_operations {
@ -132,7 +132,7 @@ pub fn spawn(opt: Options) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBu
/// # Returns /// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure /// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler<C>(mut recv: mpsc::Receiver<PathBuf>, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Unpin + Send + Sync + 'static fn file_handler<C>(mut recv: mpsc::Receiver<PathBuf>, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Unpin + Send + Sync + 'static
where C: Future<Output = ()> + 'static + Unpin + Send where C: Future<Output = ()> + 'static + Send
{ {
let (r_tx, r_rx) = mpsc::channel(backpressure); let (r_tx, r_rx) = mpsc::channel(backpressure);

@ -6,24 +6,12 @@ mod ext; use ext::*;
mod pool; mod pool;
mod handle; mod handle;
mod work;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> std::io::Result<()> {
//TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default. //TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default.
let (tx, mut rx) = handle::spawn(Default::default());
let _res = tokio::join![
tokio::spawn(async move {
for file in ["./src/main.rs", "./src/handle.rs", "./target", "./tet", "./"]
{
eprintln!("Sending {}", file);
tx.send(std::path::Path::new(file).into()).await.unwrap();
}
}),
tokio::spawn(async move {
use futures::prelude::*; use futures::prelude::*;
while let Some((file, hash)) = rx.next().await { work::start(std::env::args().skip(1), tokio::signal::ctrl_c().map(|_| ())).await
println!("Got file hash for {:?}: {:?}", file, hash.map(|x| x.as_ref().iter().copied().hex_string().collect::<String>()));
}
}),
];
eprintln!("Done: {:?}", _res);
} }

@ -0,0 +1,107 @@
use super::*;
use futures::{
Future,
Stream, StreamExt,
};
use tokio::io::{
AsyncReadExt,
AsyncWriteExt,
};
use tokio::sync::{
mpsc,
};
use std::io;
use std::path::{Path, PathBuf};
use tokio::fs;
#[derive(Debug, Clone)]
struct State
{
handle_file: mpsc::Sender<PathBuf>,
}
async fn handle_file(state: State, file: PathBuf) -> io::Result<()>
{
debug_assert!(file.is_file());
state.handle_file.send(file).await.unwrap();
Ok(())
}
#[inline(always)] fn handle_dir2(state: State, dir: PathBuf) -> futures::future::BoxFuture<'static, io::Result<()>>
{
use futures::prelude::*;
handle_dir(state, dir).boxed()
}
async fn handle_dir(state: State, dir: impl AsRef<Path>) -> io::Result<()>
{
debug_assert!(dir.as_ref().is_dir());
let mut read = fs::read_dir(dir).await?;
while let Some(item) = read.next_entry().await?
{
let path = item.path();
if path.is_file() {
tokio::spawn(handle_file(state.clone(), path));
} else {
tokio::spawn(handle_dir2(state.clone(), path));
}
}
Ok(())
}
pub async fn start<I, T>(bases: I, cancel: impl Future<Output=()> + Send + 'static) -> io::Result<()>
where I: IntoIterator<Item = T>,
T: AsRef<Path>,
{
let (tx, rx) = handle::spawn_with_cancel(Default::default(), cancel);
let renamer = tokio::spawn(async move {
use futures::prelude::*;
rx.for_each_concurrent(4, |(path, hash)| async move
{
match hash {
Ok(hash) => {
let new_name = path.parent().unwrap().join(hash.to_hex_string());
if let Err(_) = tokio::spawn(async move {
match fs::rename(&path, &new_name).await
{
Ok(_) => println!("[.] {:?} -> {:?}", path, new_name),
Err(err) => println!("[!] {:?}: {}", path, err),
}
}).await {
eprintln!("[!] panic: rename");
}
},
Err(err) => {
eprintln!("[!] {:?}: {}", path, err);
},
}
}).await;
});
let res = 'result: loop {
let state = State { handle_file: tx};
let res = futures::future::join_all(bases.into_iter().map(|base| {
use futures::prelude::*;
if base.as_ref().is_dir() {
handle_dir(state.clone(), base).boxed_local()
} else {
handle_file(state.clone(), base.as_ref().to_owned()).boxed_local()
}
})).await;
for res in res {
match res {
Ok(_) => (),
Err(err) => break 'result Err(err),
}
}
break Ok(());
};
assert!(renamer.await.is_ok(), "[!] fatal: renamer task panic");
res
}
Loading…
Cancel
Save