Compare commits

...

14 Commits

Author SHA1 Message Date
Avril d7421a1583
Go: Added --always
3 years ago
Avril d37532960c
Remove debugging prints
3 years ago
Avril 5115e3b032 Preserve extensions
3 years ago
Avril 40bfa1d01a Worker written
3 years ago
Avril 24c0fcb584
Concurrent object Pool<T>
3 years ago
Avril 01d842f72f
Very janky hex string iter.
3 years ago
Avril 6e9c5484f0
Fucking disasterous
3 years ago
Avril 2362a3d2d3 Added graceful shutdown to background hashing task.
3 years ago
Avril 2440cdeebb handle: Added `spawn`, `Options`.
3 years ago
Avril 884b58d1f4
Error results now wait for the mpsc channel to have space instead of deferring themselves.
3 years ago
Avril 45b55b85a8
Added semaphore to control max concurrent operations.
3 years ago
Avril 37e7270b76
On io error, `file_handler()` now sends both the path and the error to output channel, instead of ignoring failed attempts.
3 years ago
Avril fce66309d1 Rework `file_handle()` to return a stream of `(path, hash)` instead of taking a oneshot for return
3 years ago
Avril d5b7f870db
Started Rust verison: raw io_uring async file hashing loop in `handle::file_handler()`
3 years ago

2
.gitignore vendored

@ -1,2 +1,4 @@
test/
lazy-rebuild
Cargo.lock
target/

@ -0,0 +1,15 @@
[package]
name = "lazy-rebuild"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
atomic_refcell = "0.1.7"
crossbeam-queue = "0.3.2"
cryptohelpers = { version = "1.8.2", features = ["full", "async"] }
futures = "0.3.17"
tokio = { version = "1.11.0", features = ["full"] }
tokio-stream = "0.1.7"
tokio-uring = "0.1.0"

@ -1,5 +1,7 @@
all: clean lazy-rebuild test
.PHONY: all
all: | clean
$(MAKE) lazy-rebuild test
clean:
rm -rf test

@ -0,0 +1,3 @@
module lazy-rebuild
go 1.17

@ -1,4 +1,3 @@
//go:binary-only-package
package main
import (
@ -12,6 +11,7 @@ import (
"os"
"sync"
"strconv"
"regexp"
)
var keepLongExt bool = false
@ -42,6 +42,12 @@ func hash(file string) []byte {
return h.Sum(nil)
}
var isHashRE *regexp.Regexp = regexp.MustCompile("^[0-9a-f]{64}$")
func is_hash(name string) bool {
name = strings.Split(name, ".")[0]
return isHashRE.MatchString(name)
}
func extractExt(x string) string {
if (keepLongExt) {
return last(strings.SplitN(x, ".", 2)[1:])
@ -50,8 +56,13 @@ func extractExt(x string) string {
}
}
var alwaysCheck bool = false
func dofile(i int, y string) {
loc, x := filepath.Split(y)
if !alwaysCheck && is_hash(x) {
fmt.Printf("E: %s already a hash, skipping.\n", x)
return
}
sha := hash(y)
str := fmt.Sprintf("%x", sha)
newname := fmt.Sprintf("%s%s%s", loc, str, extractExt(x))
@ -72,7 +83,7 @@ func main() {
var wg sync.WaitGroup
if (len(ar) < 1) {
fmt.Printf("Usage: %s [--long] [--fake] [--recurse] [-quiet] [--threads <number>] <file ...>\n\t--long\tKeep long file extensions\n\t--fake\tDo not rename files.\n\t--recurse\tWalk path recursively.\n\t--quiet\tDo not show (some) errors.\n", os.Args[0])
fmt.Printf("Usage: %s [--long] [--fake] [--recurse] [-quiet] [--threads <number>] [--always] <file ...>\n\t--long\tKeep long file extensions\n\t--fake\tDo not rename files.\n\t--recurse\tWalk path recursively.\n\t--quiet\tDo not show (some) errors.\n\t--always\tDo not skip filenames which are already hashes.\n", os.Args[0])
return;
}
var ignore = false
@ -91,6 +102,8 @@ func main() {
recurse = !recurse
case "quiet":
noErr = !noErr
case "always":
alwaysCheck = true
case "threads":
if i<len(ar)-1 {
if th, err := strconv.Atoi(ar[i+1]); err ==nil {

@ -0,0 +1,93 @@
use super::*;
use std::iter::{
FusedIterator,
Fuse,
};
pub struct HexStringIter<I>
{
ar: Fuse<I>,
hex1: u8,
}
impl<I> From<HexStringIter<I>> for String
where I: Iterator,
I::Item: Into<u8>
{
#[inline] fn from(from: HexStringIter<I>) -> Self
{
from.collect()
}
}
const fn gen_hex_table() -> [(u8, u8); 256]
{
let mut res = [(0, 0); 256];
let mut i =0;
const HEX: &'static [u8] = b"0123456789abcdef";
while i < 256 {
let by = i as u8;
res[i] = (HEX[(by >> 4) as usize], HEX[(by & 0xf) as usize]);
i+=1;
}
res
}
static HEX_TABLE: [(u8, u8); 256] = gen_hex_table();
impl<I> Iterator for HexStringIter<I>
where I: Iterator,
I::Item: Into<u8>
{
type Item = char;
fn next(&mut self) -> Option<Self::Item>
{
if self.hex1 != 0 {
return Some(std::mem::replace(&mut self.hex1, 0) as char);
}
let by = self.ar.next()?.into();
let (h0, h1) = HEX_TABLE[by as usize];
self.hex1 = h1;
Some(h0 as char)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (s, l) = self.ar.size_hint();
(s * 2, l.map(|x| x*2))
}
}
impl<I> ExactSizeIterator for HexStringIter<I>
where I: Iterator + ExactSizeIterator,
I::Item: Into<u8>{}
impl<I> FusedIterator for HexStringIter<I>
where I: Iterator,
I::Item: Into<u8>{}
pub trait HexStringIterExt<I>: Sized
{
fn hex_string(self) -> HexStringIter<I>;
}
impl<I> HexStringIterExt<I::IntoIter> for I
where I: IntoIterator,
I::Item: Into<u8>
{
#[inline] fn hex_string(self) -> HexStringIter<I::IntoIter> {
HexStringIter {
ar: self.into_iter().fuse(),
hex1: 0,
}
}
}
pub trait HexStringSliceExt
{
fn to_hex_string(&self) -> String;
}
impl<T> HexStringSliceExt for T
where T: AsRef<[u8]>
{
fn to_hex_string(&self) -> String {
self.as_ref().iter().copied().hex_string().collect()
}
}

@ -0,0 +1,231 @@
use super::*;
use std::io;
use cryptohelpers::sha256;
use cryptohelpers::sha2::{
Digest, Sha256,
};
use tokio_uring::fs::{
File, OpenOptions,
};
use tokio::sync::{
mpsc,
oneshot,
Semaphore,
};
use std::num::NonZeroUsize;
use tokio_stream::wrappers::ReceiverStream;
use futures::prelude::*;
use futures::future::OptionFuture;
async fn uring_read<F>(file: &mut File, mut to: F) -> io::Result<usize>
where F: FnMut(&[u8]) -> io::Result<()>
{
let mut full_buffer = vec![0u8; 4096]; // we need to allocate this so &buffer[0] is always the same.
let mut read = 0usize;
loop {
let buffer = {
let (res, n_full_buffer) = file.read_at(full_buffer, read as u64).await;
full_buffer = n_full_buffer;
&full_buffer[..(match res {
Ok(0) => break,
Ok(n) => n,
Err(e) => return Err(e),
})]
};
to(buffer)?;
read += buffer.len();
}
Ok(read)
}
/// Options for spanwed file hasher
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Options
{
/// Maximum number of operations allowed to be running at a time.
/// Or `None` for unlimited.
///
/// The hasher uses a single thread.
pub max_operations: Option<NonZeroUsize>,
/// The maximum buffer size of the **output** stream.
pub back_pressure: NonZeroUsize,
/// The maximum buffer size of the **input** stream.
pub forward_pressure: NonZeroUsize,
}
impl Options
{
pub const DEFAULT: Self = Self::new();
pub const fn new() -> Self
{
Self {
max_operations: NonZeroUsize::new(32),
back_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
forward_pressure: unsafe { NonZeroUsize::new_unchecked(64) },
}
}
}
impl Default for Options
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
impl From<()> for Options
{
#[inline] fn from(_: ()) -> Self
{
Self::new()
}
}
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// When the `cancel` future completes, the operation shuts down gracefully. Otherwise it continues until all senders to this handle are dropped.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn_with_cancel(opt: Options, cancel: impl Future<Output = ()> + 'static + Send) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
let rx = file_handler(rx,cancel , match opt.max_operations {
Some(n) => n.into(),
None => 0,
}, opt.back_pressure.into());
(tx, rx)
}
/// Spawn a thread with a `io_uring` file hasher using these options.
///
/// # Returns
/// The input stream and output stream `(tx, rx)`.
pub fn spawn(opt: Options) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
{
#[derive(Debug)]
struct NeverFuture;
use std::task::Poll;
impl Future for NeverFuture
{
type Output = super::Infallible;
fn poll(self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
spawn_with_cancel(opt, NeverFuture.map(|_| ()))
}
/// Raw handler for io_uring file hashing.
///
/// # Parameters
/// * `recv` - Takes the incoming file path to hash
/// * `max_ops` - The maximum number of allowed concurrent operations. (0 for unlimited.)
/// * `backpressure` - The maximum backing size of the output stream. Operations sending results will wait for there to be space before returning them. If results are not taken from the stream, the operation will wait until there is space.
///
/// # Returns
/// A stream yielding a tuple of the input file path and the file's hash, or the IO error responsible for the failure
fn file_handler<C>(mut recv: mpsc::Receiver<PathBuf>, cancel: C, max_ops: usize, backpressure: usize) -> impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Unpin + Send + Sync + 'static
where C: Future<Output = ()> + 'static + Send
{
let (r_tx, r_rx) = mpsc::channel(backpressure);
let (h_tx, h_rx) = oneshot::channel::<super::Infallible>();
std::thread::spawn(move || {
tokio_uring::start(async move {
tokio::pin!(cancel);
// No need for Arc, this is single threaded.
let sem = Some(std::rc::Rc::new(Semaphore::new(max_ops)));
let h_tx = std::rc::Rc::new(h_tx);
while let Some(path) = tokio::select!{
n = recv.recv() => n,
_ = &mut cancel => None,
} {
let ret = r_tx.clone();
let sem = sem.clone();
let h_tx = h_tx.clone();
tokio_uring::spawn(async move {
let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await {
Some(Err(_e)) => return, // Semaphore has been closed.
Some(Ok(v)) => Some(v),
None => None,
};
let _h_tx = h_tx;
let mut file = match OpenOptions::new()
.read(true)
.open(&path).await {
Ok(v) => v,
Err(e) => {
let _ = ret.send((path, Err(e))).await;
return;
},
};
let mut hasher = Sha256::new();
let ring_res = uring_read(&mut file, |buffer| {
if ret.is_closed() {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "return channel dropped, stopping operation"));
}
hasher.update(buffer);
Ok(())
}).await;
let _ = tokio::join![
file.close(), // We are in a unique task per file, so awaiting this here concurrently with the returning async block is fine.
async move {
match ring_res {
Ok(_n) => {
let _ = ret.send((path, Ok(hasher.into()))).await;
},
Err(e) => {
// To prevent DOSing this task, we do not defer the writing of failed results like we used to. If the stream is full, we wait regardless of the result.
let _ = ret.send((path, Err(e))).await;
/*
match ret.try_send((path, Err(e))) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(val)) => { tokio_uring::spawn(async move { let _ = ret.send(val).await; }); },
_ => return,
}*/
}
}
}
];
});
}
//Yield the current task to allow the newly spawned one to run.
//XXX: Is this a safe way of passing the semaphore to the task?
tokio::task::yield_now().await;
// --- End of new inputs
//XXX: FUUUUUUUCK why can't i just acquire_owned() without using Arc? Fucking hell...
//let _sem = sem.as_ref().map(|x| x.try_acquire_many(x.available_permits() as u32).unwrap());
// Drop the master refcount of `h_tx`.
drop(h_tx);
// Drop sender that we're cloning from
drop(r_tx);
// When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error.
let _ = h_rx.await;
// Close and drop the semaphore source refcount.
if let Some(sem) = &sem {
sem.close();
}
drop(sem);
});
});
ReceiverStream::new(r_rx)
}

@ -0,0 +1,17 @@
use std::convert::Infallible;
use std::path::PathBuf;
mod ext; use ext::*;
mod pool;
mod handle;
mod work;
#[tokio::main]
async fn main() -> std::io::Result<()> {
//TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default.
use futures::prelude::*;
work::start(std::env::args().skip(1), tokio::signal::ctrl_c().map(|_| ())).await
}

@ -0,0 +1,138 @@
use super::*;
use std::sync::{Arc,Weak};
use std::mem::ManuallyDrop;
use crossbeam_queue::ArrayQueue;
use std::ops::{
Drop,
Deref, DerefMut,
};
use std::borrow::Borrow;
/// An owned handle to a rented value in a pool.
///
/// When the handle is dropped, the value will be placed back in the pool if the pool has not already been filled by a replacement(s) or dropped..
#[derive(Debug)]
pub struct Handle<T>(ManuallyDrop<T>, Weak<ArrayQueue<T>>);
impl<T> Handle<T>
{
/// Detach this instance from its owning pool.
///
/// The value will not be replaced when this handle is dropped.
pub fn detach(&mut self)
{
self.1 = Weak::new();
}
/// Detach this instance, and insert a new value into the pool in its place.
///
/// # Returns
/// `true` if the replacement succeeded.
/// `false` if there was no room in the pool, or if the pool has been dropped.
pub fn replace(&mut self) -> bool
where T: Default
{
match std::mem::replace(&mut self.1, Weak::new()).upgrade()
{
Some(v) => v.push(T::default()).is_ok(),
_ => false,
}
}
/// Remove the value from the pool.
pub fn into_inner(mut self) -> T
{
self.1 = Weak::new();
let val = unsafe {ManuallyDrop::take(&mut self.0)};
std::mem::forget(self);
val
}
/// Is the pool still alive?
pub fn has_owner(&self) -> bool
{
self.1.strong_count() > 0
}
}
impl<T> Deref for Handle<T>
{
type Target = T;
#[inline] fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for Handle<T>
{
#[inline] fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> Borrow<T> for Handle<T>
{
#[inline] fn borrow(&self) -> &T
{
&self.0
}
}
/// A self-replacing concurrent pool of rentable objects.
///
/// # State
/// Objects are ephemeral and may be dropped and replaced whenever.
/// Do not rely on the state of objects in the pool to remain consistent.
#[derive(Debug)]
pub struct Pool<T>
{
objects: Arc<ArrayQueue<T>>,
}
impl<T> Drop for Handle<T>
{
fn drop(&mut self) {
if let Some(owner) = self.1.upgrade() {
let value = unsafe {ManuallyDrop::take(&mut self.0)};
drop(owner.push(value));
} else {
unsafe {
ManuallyDrop::drop(&mut self.0);
}
}
}
}
impl<T: Default> Pool<T>
{
/// Create a new pool with a specific number of objects.
/// This number as a maximum capacity will not change.
pub fn with_capacity(cap: usize) -> Self
{
let objects = ArrayQueue::new(cap);
for x in
std::iter::repeat_with(T::default).take(cap)
{
assert!(objects.push(x).is_ok());
}
Self {
objects: Arc::new(objects),
}
}
/// Create a new pool constructed with the default number of objects.
#[inline] pub fn new() -> Self
{
Self::with_capacity(32)
}
/// Rent an object from the pool of objects.
/// If one is not available, a new one is constructed.
///
/// The object is moved from the pool to a handle.
/// It is therefore recommended to box large objects that are in a pool.
pub fn rent(&self) -> Handle<T>
{
if let Some(last) = self.objects.pop()
{
Handle(ManuallyDrop::new(last), Arc::downgrade(&self.objects))
} else {
Handle(ManuallyDrop::new(T::default()), Arc::downgrade(&self.objects))
}
}
}

@ -0,0 +1,119 @@
use super::*;
use futures::{
Future,
Stream, StreamExt,
};
use tokio::io::{
AsyncReadExt,
AsyncWriteExt,
};
use tokio::sync::{
mpsc,
};
use std::io;
use std::path::{Path, PathBuf};
use tokio::fs;
#[derive(Debug, Clone)]
struct State
{
handle_file: mpsc::Sender<PathBuf>,
}
async fn handle_file(state: State, file: PathBuf) -> io::Result<()>
{
debug_assert!(file.is_file());
state.handle_file.send(file).await.unwrap();
Ok(())
}
#[inline(always)] fn handle_dir2(state: State, dir: PathBuf) -> futures::future::BoxFuture<'static, io::Result<()>>
{
use futures::prelude::*;
handle_dir(state, dir).boxed()
}
async fn handle_dir(state: State, dir: impl AsRef<Path>) -> io::Result<()>
{
debug_assert!(dir.as_ref().is_dir());
let mut read = fs::read_dir(dir).await?;
while let Some(item) = read.next_entry().await?
{
let path = item.path();
if path.is_file() {
tokio::spawn(handle_file(state.clone(), path));
} else {
tokio::spawn(handle_dir2(state.clone(), path));
}
}
Ok(())
}
pub async fn start<I, T>(bases: I, cancel: impl Future<Output=()> + Send + 'static) -> io::Result<()>
where I: IntoIterator<Item = T>,
T: AsRef<Path>,
{
let (tx, rx) = handle::spawn_with_cancel(Default::default(), cancel);
let renamer = tokio::spawn(async move {
use futures::prelude::*;
// XXX: Renameing concurrently is dangerous
rx.for_each(|(path, hash)| async move
{
match hash {
Ok(hash) => {
let new_name = {
let name = path.parent().unwrap().join(hash.to_hex_string());
if let Some(ext) = path.extension() {
name.with_extension(ext)
} else {
name
}
};
if let Err(_) = tokio::spawn(async move {
if !new_name.exists() {
match fs::rename(&path, &new_name).await
{
Ok(_) => println!("[.] {:?} -> {:?}", path, new_name),
Err(err) => println!("[!] {:?}: {}", path, err),
}
} else {
eprintln!("[w] file exists {:?}", new_name);
}
}).await { //XXX: No need to spawn here at all.
eprintln!("[!] panic: rename");
}
},
Err(err) => {
eprintln!("[!] {:?}: {}", path, err);
},
}
}).await;
});
let res = 'result: loop {
let state = State { handle_file: tx};
let res = futures::future::join_all(bases.into_iter().map(|base| {
use futures::prelude::*;
if base.as_ref().is_dir() {
handle_dir(state.clone(), base).boxed_local()
} else {
handle_file(state.clone(), base.as_ref().to_owned()).boxed_local()
}
})).await;
for res in res {
match res {
Ok(_) => (),
Err(err) => break 'result Err(err),
}
}
break Ok(());
};
assert!(renamer.await.is_ok(), "[!] fatal: renamer task panic");
res
}
Loading…
Cancel
Save