From 5645ff417c3ea4e10163525db721de787eb1cc5a Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 15 Apr 2022 19:21:37 +0100 Subject: [PATCH] Effectively working interface. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for phash's current commit: Great blessing − 大吉 --- Cargo.toml | 6 +- src/encode.rs | 5 +- src/ext.rs | 49 ++++++++++ src/hash.rs | 260 +++++++++++++++++++++++++++++++++++++++++++++++--- src/lib.rs | 119 ++++++++++++++++++++++- 5 files changed, 414 insertions(+), 25 deletions(-) create mode 100644 src/ext.rs diff --git a/Cargo.toml b/Cargo.toml index 075e3f6..ca6c267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -atomic_refcell = "0.1.8" +#atomic_refcell = "0.1.8" +bytes = "1.1.0" digest = "0.10.3" -parking_lot = { version = "0.12.0", features = ["hardware-lock-elision", "arc_lock"] } +#parking_lot = { version = "0.12.0", features = ["hardware-lock-elision", "arc_lock"] } rayon = "1.5.2" sha2 = "0.10.2" -smallmap = "1.3.3" diff --git a/src/encode.rs b/src/encode.rs index 02aaf14..a4d01c7 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -3,10 +3,7 @@ use std::fmt::{ self, Write, }; -use smallmap::{ - Primitive, - Map, -}; + pub trait Encoder { fn encode_byte(&self, byte: u8, f: &mut W) -> fmt::Result; diff --git a/src/ext.rs b/src/ext.rs new file mode 100644 index 0000000..674e771 --- /dev/null +++ b/src/ext.rs @@ -0,0 +1,49 @@ +//! Extensions +use bytes::Buf; +use std::io::BufRead; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BufIter(B); + +pub trait BufIterExt +{ + fn chunks_iter(self) -> BufIter; +} + +impl Iterator for BufIter +{ + type Item = bytes::Bytes; + fn next(&mut self) -> Option + { + if self.0.has_remaining() { + Some(self.0.copy_to_bytes(self.0.chunk().len())) + } else { + None + } + } +} + +impl BufIterExt for B +{ + fn chunks_iter(self) -> BufIter { + BufIter(self) + } +} + +/* +pub struct BufReadIter(usize, B); + +pub trait BufReadExt +{ + fn chunks_iter(self, sz: usize) -> BufReadIter; +} + +impl Iterator for BufReadIter +{ + type Item = bytes::Bytes; + fn next(&mut self) -> Option + { + self.1.read_exact(buf) + } +} +*/ diff --git a/src/hash.rs b/src/hash.rs index 605c02b..2422a54 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -2,18 +2,150 @@ //use super::pool; use digest::{ Digest, - DynDigest, Output, }; -use std::marker::PhantomData; -use std::{ - fmt, -}; +use std::fmt; +use bytes::Buf; #[derive(Debug, Clone, PartialEq, Eq)] -pub struct ParallelDigest(usize, PhantomData D>); +pub struct ParallelDigest{ + chunk_size: usize, + completed: ParallelHash, + last_slice: Vec, +} + +#[derive(Debug, Clone, Hash)] +pub struct ParallelHash(Vec>, usize); + + +impl Eq for ParallelHash{} +impl PartialEq for ParallelHash +{ + fn eq(&self, other: &Self) -> bool + { + self.1 == other.1 && self.0.iter().map(|x| &x[..]).eq(other.0.iter().map(|x| &x[..])) + } +} + + +#[derive(Debug, Clone, Hash)] +pub struct ParallelHashOutput(usize, Output); + +impl Eq for ParallelHashOutput{} +impl PartialEq for ParallelHashOutput +{ + fn eq(&self, other: &Self) -> bool + { + self.0 == other.0 && &self.1[..] == &other.1[..] + } +} + +impl ParallelHashOutput +{ + #[inline] + pub fn chunk_size(&self) -> usize + { + self.0 + } + #[inline] + pub fn condensed_hash(&self) -> &Output + { + &self.1 + } + pub fn into_bytes(self) -> bytes::Bytes + { + let mut bytes = bytes::BytesMut::with_capacity(self.1.len() + std::mem::size_of::()); + use bytes::BufMut; + bytes.put_u64(self.0.try_into().unwrap()); + bytes.extend(self.1.into_iter()); + bytes.freeze() + } + pub fn from_bytes(mut bytes: impl bytes::Buf) -> Self + { + let mut output = Output::::default(); + Self(bytes.get_u64().try_into().unwrap(), { + bytes.copy_to_slice(&mut output[..]); + output + }) + } +} + +impl From> for bytes::Bytes +{ + #[inline] + fn from(from: ParallelHashOutput) -> Self + { + from.into_bytes() + } +} -pub struct ParallelHash(Vec>); + +impl From> for ParallelHashOutput +{ + #[inline] + fn from(from: ParallelHash) -> Self + { + from.finalize() + } +} + +impl ParallelHash +{ + pub fn finalize(self) -> ParallelHashOutput + { + let mut digest = D::new(); + for buf in self.0.into_iter() { + digest.update(buf); + } + ParallelHashOutput(self.1, digest.finalize()) + } + + #[inline(always)] + fn digest_single_chunk(slice: impl AsRef<[u8]>) -> Output + { + D::digest(slice) + } + + /// # Safety + /// `last` must either be: + /// * the last slice written to `self` + /// * a full chunk + #[inline] + unsafe fn append_slice_unchecked(&mut self, last: impl AsRef<[u8]>) + { + self.0.push(Self::digest_single_chunk(last)); + } + + #[inline] + pub(crate) fn append_chunks(&mut self, chunks: impl rayon::iter::IndexedParallelIterator>) -> usize + { + let mut temp = Vec::new(); + chunks.collect_into_vec(&mut temp); + self.0.extend_from_slice(&temp[..]); + temp.len() + } + + fn append_chunks_unzipped(&mut self, chunks: impl rayon::iter::IndexedParallelIterator>, Option)>) -> (usize, impl std::iter::Iterator) + { + let (mut ok, mut err) = (Vec::new(), Vec::new()); + chunks.unzip_into_vecs(&mut ok, &mut err); + let l = self.0.len(); + self.0.extend(ok.into_iter().filter_map(std::convert::identity)); + (self.0.len() - l, err.into_iter().filter_map(std::convert::identity)) + } +} + + +impl fmt::Display for ParallelHashOutput +{ + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + use crate::encode::*; + write!(f, "{}:", self.0)?; + HexEncoder::encode_slice(&HexEncoder, &self.1, f) + } +} impl fmt::Display for ParallelHash { @@ -21,7 +153,7 @@ impl fmt::Display for ParallelHash { use crate::encode::*; let mut d = D::new(); - write!(f, "{}:", self.0.len())?; + write!(f, "{}:", self.1)?; for output in self.0.iter() { d.update(&output[..]); @@ -34,21 +166,107 @@ impl fmt::Display for ParallelHash impl ParallelDigest { /// Create a new digest - pub const fn new(chunk: usize) -> Self + pub fn new(chunk_size: usize) -> Self { - Self(chunk, PhantomData) + Self{ + chunk_size, + completed: ParallelHash(Vec::new(), chunk_size), + last_slice: Vec::with_capacity(chunk_size), + } + } + + fn append_slice_raw<'s>(&mut self, slice: &'s [u8]) -> Option<&'s [u8]> + { + use rayon::prelude::*; + let (_, mut rest) = self + .completed + .append_chunks_unzipped(slice + .par_chunks(self.chunk_size) + .map(|slice| if slice.len() == self.chunk_size { + (Some(ParallelHash::::digest_single_chunk(slice)), None) + } else { + (None, Some(slice)) + })); + rest.next() // We know there will only be one of these + } + + fn fill_and_take_buf(&mut self, slice: &mut &[u8]) -> Option + { + if !self.last_slice.is_empty() { + let rest = self.chunk_size - self.last_slice.len(); + let rest = std::cmp::min(slice.len(), rest); + self.last_slice.extend_from_slice(&slice[..rest]); //XXX: This is probably quite slow... + if slice.len() != rest { + *slice = &slice[rest..]; + } else { + *slice = &[]; + } + + if self.last_slice.len() == self.chunk_size { + let output = (&mut &self.last_slice[..]).copy_to_bytes(self.chunk_size); + self.last_slice.clear(); + Some(output) + } else { + None + } + } else { + None + } + } + + pub fn append_buf(&mut self, buf: &mut (impl Buf + ?Sized)) + { + while buf.has_remaining() { + let cs = { + let chunk = buf.chunk(); + self.append_slice(chunk); + chunk.len() + }; + buf.advance(cs); + } + } + + pub fn append_slice(&mut self, slice: impl AsRef<[u8]>) + { + let mut slice = slice.as_ref(); + if let Some(chunk0) = self.fill_and_take_buf(&mut slice) { + debug_assert_eq!(chunk0.len(), self.chunk_size, "internal buffer release not chunk_size long"); + // SAFETY: we know `chunk0` is a full chunk. + unsafe { + self.completed.append_slice_unchecked(chunk0) + } + } + if slice.len() > 0 { + if let Some(rest) = self.append_slice_raw(slice) { + self.last_slice.extend_from_slice(rest); + } + } + } + + pub fn complete(self) -> ParallelHash + { + if self.last_slice.is_empty() { + self.completed + } else { + let mut output = self.completed; + // SAFETY: This is the final slice written to `output`. + unsafe { + output.append_slice_unchecked(self.last_slice); + } + output + } } - /// Compute a parallel digest from `data`. + /// Compute a parallel digest from `data` only. #[inline] - pub fn digest(&self, data: impl AsRef<[u8]>) -> ParallelHash + pub fn digest_slice(chunk_size: usize, data: impl AsRef<[u8]>) -> ParallelHash { - ParallelHash(phash_digest_into::(data.as_ref(), self.0)) + ParallelHash(phash_digest_into::(data.as_ref(), chunk_size), chunk_size) } } #[inline(always)] -fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator + 'a +pub(crate) fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator + 'a where F: Fn(&[u8]) -> T + Send + Sync { use rayon::prelude::*; @@ -57,7 +275,7 @@ where F: Fn(&[u8]) -> T + Send + Sync } #[inline(always)] -fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec +pub(crate) fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec where F: FnOnce(&'a [u8], usize) -> U, U: rayon::iter::IndexedParallelIterator + 'a { @@ -118,3 +336,15 @@ fn phash(data: impl AsRef<[u8]>, chunk: usize) -> Paral { }*/ + +#[cfg(test)] +mod tests +{ + use super::*; + #[test] + fn paralell_hashing() + { + let mut zeros = [0u8; 512]; + //ParallelDigest::new(10).digest(data) + } +} diff --git a/src/lib.rs b/src/lib.rs index f829e2a..522685b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,117 @@ -//mod pool; A)JR AOISJOAIJOIA JODIJAOSI JDFUCK THIS DOESN@T WORK FUCK ATOMICREFCELL FUCK YOU FFUCK EVERYTHING AAAAAAAAAAAAA -pub mod encode; -pub mod hash; +//mod ext; use ext::*; +mod encode; +mod hash; + +pub use digest; + +pub use hash::{ + ParallelHashOutput, + ParallelHash, + ParallelDigest, +}; + +pub type DefaultDigest = sha2::Sha256; + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct Builder +{ + digest: std::marker::PhantomData, + chunk_size: Option, +} + +impl Builder +{ + #[inline] + pub fn create_default() -> ParallelDigest + { + Self::new().create() + } + pub const fn new_default() -> Self + { + Self { + digest: std::marker::PhantomData, + chunk_size: None + } + } +} + +impl Builder +{ + //pub type Digest = D; + pub const fn new() -> Self + { + Self { + digest: std::marker::PhantomData, + chunk_size: None + } + } + pub const fn with_digest(self) -> Builder + { + Builder { + chunk_size: self.chunk_size, + digest: std::marker::PhantomData, + } + } + pub const fn with_chunk_size(self, chunk_size: usize) -> Self + { + Self { + chunk_size: std::num::NonZeroUsize::new(chunk_size), + ..self + } + } +} +impl Builder +{ + #[inline] + pub fn chunk_size(&self) -> usize + { + extern "C" { + fn getpagesize() -> std::os::raw::c_int; + } + self.chunk_size + .or_else(|| unsafe { std::num::NonZeroUsize::new(getpagesize().try_into().expect("Failed to get page size")) }) + .expect("Failed to get page size: cannot be zero") + .get() + } + #[inline] + pub fn create(self) -> ParallelDigest + { + ParallelDigest::::new(self.chunk_size()) + } + #[inline] + pub fn digest_slice(self, slice: impl AsRef<[u8]>) -> ParallelHash + { + ParallelDigest::::digest_slice(self.chunk_size(), slice) + } +} + +impl From> for ParallelDigest +{ + #[inline] + fn from(from: Builder) -> Self + { + from.create() + } +} + +#[cfg(test)] +mod tests +{ + use super::*; + #[test] + fn parallel_hash_specific_cs() + { + const CHUNK_SIZE: usize = 0; + let slice = [10u8; 4096 << 2];//b"One two three for fize size seven eight."; + let mut digest = Builder::new().with_chunk_size(CHUNK_SIZE).create(); + digest.append_slice(slice); + let digest= digest.complete().finalize(); + println!("digest: {digest}"); + + assert_eq!(digest, Builder::new_default().with_chunk_size(CHUNK_SIZE).digest_slice(slice).finalize(), "phash of same input produced different output"); + + assert_ne!(digest, Builder::new_default().with_chunk_size(CHUNK_SIZE).digest_slice(&slice[..8]).finalize(), "phash of differing input produced same output"); + } +} +