Effectively working interface.

Fortune for phash's current commit: Great blessing − 大吉
master
Avril 2 years ago
parent c05d9f8304
commit 5645ff417c
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -6,9 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
atomic_refcell = "0.1.8"
#atomic_refcell = "0.1.8"
bytes = "1.1.0"
digest = "0.10.3"
parking_lot = { version = "0.12.0", features = ["hardware-lock-elision", "arc_lock"] }
#parking_lot = { version = "0.12.0", features = ["hardware-lock-elision", "arc_lock"] }
rayon = "1.5.2"
sha2 = "0.10.2"
smallmap = "1.3.3"

@ -3,10 +3,7 @@ use std::fmt::{
self,
Write,
};
use smallmap::{
Primitive,
Map,
};
pub trait Encoder
{
fn encode_byte<W: Write + ?Sized>(&self, byte: u8, f: &mut W) -> fmt::Result;

@ -0,0 +1,49 @@
//! Extensions
use bytes::Buf;
use std::io::BufRead;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BufIter<B: ?Sized>(B);
pub trait BufIterExt
{
fn chunks_iter(self) -> BufIter<Self>;
}
impl<T: ?Sized + Buf> Iterator for BufIter<T>
{
type Item = bytes::Bytes;
fn next(&mut self) -> Option<Self::Item>
{
if self.0.has_remaining() {
Some(self.0.copy_to_bytes(self.0.chunk().len()))
} else {
None
}
}
}
impl<B: Buf> BufIterExt for B
{
fn chunks_iter(self) -> BufIter<Self> {
BufIter(self)
}
}
/*
pub struct BufReadIter<B: ?Sized>(usize, B);
pub trait BufReadExt
{
fn chunks_iter(self, sz: usize) -> BufReadIter<Self>;
}
impl<B: ?Sized + BufRead> Iterator for BufReadIter<B>
{
type Item = bytes::Bytes;
fn next(&mut self) -> Option<Self::Item>
{
self.1.read_exact(buf)
}
}
*/

@ -2,18 +2,150 @@
//use super::pool;
use digest::{
Digest,
DynDigest,
Output,
};
use std::marker::PhantomData;
use std::{
fmt,
};
use std::fmt;
use bytes::Buf;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParallelDigest<D: Digest + Send + Sync>(usize, PhantomData<fn () -> D>);
pub struct ParallelDigest<D: Digest + Send + Sync>{
chunk_size: usize,
completed: ParallelHash<D>,
last_slice: Vec<u8>,
}
#[derive(Debug, Clone, Hash)]
pub struct ParallelHash<D: Digest>(Vec<Output<D>>, usize);
impl<D: Digest> Eq for ParallelHash<D>{}
impl<D: Digest> PartialEq for ParallelHash<D>
{
fn eq(&self, other: &Self) -> bool
{
self.1 == other.1 && self.0.iter().map(|x| &x[..]).eq(other.0.iter().map(|x| &x[..]))
}
}
#[derive(Debug, Clone, Hash)]
pub struct ParallelHashOutput<D: Digest>(usize, Output<D>);
impl<D: Digest> Eq for ParallelHashOutput<D>{}
impl<D: Digest> PartialEq for ParallelHashOutput<D>
{
fn eq(&self, other: &Self) -> bool
{
self.0 == other.0 && &self.1[..] == &other.1[..]
}
}
impl<D: Digest> ParallelHashOutput<D>
{
#[inline]
pub fn chunk_size(&self) -> usize
{
self.0
}
#[inline]
pub fn condensed_hash(&self) -> &Output<D>
{
&self.1
}
pub fn into_bytes(self) -> bytes::Bytes
{
let mut bytes = bytes::BytesMut::with_capacity(self.1.len() + std::mem::size_of::<u64>());
use bytes::BufMut;
bytes.put_u64(self.0.try_into().unwrap());
bytes.extend(self.1.into_iter());
bytes.freeze()
}
pub fn from_bytes(mut bytes: impl bytes::Buf) -> Self
{
let mut output = Output::<D>::default();
Self(bytes.get_u64().try_into().unwrap(), {
bytes.copy_to_slice(&mut output[..]);
output
})
}
}
impl<D: Digest> From<ParallelHashOutput<D>> for bytes::Bytes
{
#[inline]
fn from(from: ParallelHashOutput<D>) -> Self
{
from.into_bytes()
}
}
pub struct ParallelHash<D: Digest>(Vec<Output<D>>);
impl<D: Digest> From<ParallelHash<D>> for ParallelHashOutput<D>
{
#[inline]
fn from(from: ParallelHash<D>) -> Self
{
from.finalize()
}
}
impl<D: Digest> ParallelHash<D>
{
pub fn finalize(self) -> ParallelHashOutput<D>
{
let mut digest = D::new();
for buf in self.0.into_iter() {
digest.update(buf);
}
ParallelHashOutput(self.1, digest.finalize())
}
#[inline(always)]
fn digest_single_chunk(slice: impl AsRef<[u8]>) -> Output<D>
{
D::digest(slice)
}
/// # Safety
/// `last` must either be:
/// * the last slice written to `self`
/// * a full chunk
#[inline]
unsafe fn append_slice_unchecked(&mut self, last: impl AsRef<[u8]>)
{
self.0.push(Self::digest_single_chunk(last));
}
#[inline]
pub(crate) fn append_chunks(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item=Output<D>>) -> usize
{
let mut temp = Vec::new();
chunks.collect_into_vec(&mut temp);
self.0.extend_from_slice(&temp[..]);
temp.len()
}
fn append_chunks_unzipped<E: Send>(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item= (Option<Output<D>>, Option<E>)>) -> (usize, impl std::iter::Iterator<Item=E>)
{
let (mut ok, mut err) = (Vec::new(), Vec::new());
chunks.unzip_into_vecs(&mut ok, &mut err);
let l = self.0.len();
self.0.extend(ok.into_iter().filter_map(std::convert::identity));
(self.0.len() - l, err.into_iter().filter_map(std::convert::identity))
}
}
impl<D: Digest> fmt::Display for ParallelHashOutput<D>
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
use crate::encode::*;
write!(f, "{}:", self.0)?;
HexEncoder::encode_slice(&HexEncoder, &self.1, f)
}
}
impl<D: Digest> fmt::Display for ParallelHash<D>
{
@ -21,7 +153,7 @@ impl<D: Digest> fmt::Display for ParallelHash<D>
{
use crate::encode::*;
let mut d = D::new();
write!(f, "{}:", self.0.len())?;
write!(f, "{}:", self.1)?;
for output in self.0.iter() {
d.update(&output[..]);
@ -34,21 +166,107 @@ impl<D: Digest> fmt::Display for ParallelHash<D>
impl<D: Digest + Send + Sync> ParallelDigest<D>
{
/// Create a new digest
pub const fn new(chunk: usize) -> Self
pub fn new(chunk_size: usize) -> Self
{
Self(chunk, PhantomData)
Self{
chunk_size,
completed: ParallelHash(Vec::new(), chunk_size),
last_slice: Vec::with_capacity(chunk_size),
}
}
fn append_slice_raw<'s>(&mut self, slice: &'s [u8]) -> Option<&'s [u8]>
{
use rayon::prelude::*;
let (_, mut rest) = self
.completed
.append_chunks_unzipped(slice
.par_chunks(self.chunk_size)
.map(|slice| if slice.len() == self.chunk_size {
(Some(ParallelHash::<D>::digest_single_chunk(slice)), None)
} else {
(None, Some(slice))
}));
rest.next() // We know there will only be one of these
}
fn fill_and_take_buf(&mut self, slice: &mut &[u8]) -> Option<bytes::Bytes>
{
if !self.last_slice.is_empty() {
let rest = self.chunk_size - self.last_slice.len();
let rest = std::cmp::min(slice.len(), rest);
self.last_slice.extend_from_slice(&slice[..rest]); //XXX: This is probably quite slow...
if slice.len() != rest {
*slice = &slice[rest..];
} else {
*slice = &[];
}
if self.last_slice.len() == self.chunk_size {
let output = (&mut &self.last_slice[..]).copy_to_bytes(self.chunk_size);
self.last_slice.clear();
Some(output)
} else {
None
}
} else {
None
}
}
pub fn append_buf(&mut self, buf: &mut (impl Buf + ?Sized))
{
while buf.has_remaining() {
let cs = {
let chunk = buf.chunk();
self.append_slice(chunk);
chunk.len()
};
buf.advance(cs);
}
}
pub fn append_slice(&mut self, slice: impl AsRef<[u8]>)
{
let mut slice = slice.as_ref();
if let Some(chunk0) = self.fill_and_take_buf(&mut slice) {
debug_assert_eq!(chunk0.len(), self.chunk_size, "internal buffer release not chunk_size long");
// SAFETY: we know `chunk0` is a full chunk.
unsafe {
self.completed.append_slice_unchecked(chunk0)
}
}
if slice.len() > 0 {
if let Some(rest) = self.append_slice_raw(slice) {
self.last_slice.extend_from_slice(rest);
}
}
}
pub fn complete(self) -> ParallelHash<D>
{
if self.last_slice.is_empty() {
self.completed
} else {
let mut output = self.completed;
// SAFETY: This is the final slice written to `output`.
unsafe {
output.append_slice_unchecked(self.last_slice);
}
output
}
}
/// Compute a parallel digest from `data`.
/// Compute a parallel digest from `data` only.
#[inline]
pub fn digest(&self, data: impl AsRef<[u8]>) -> ParallelHash<D>
pub fn digest_slice(chunk_size: usize, data: impl AsRef<[u8]>) -> ParallelHash<D>
{
ParallelHash(phash_digest_into::<D>(data.as_ref(), self.0))
ParallelHash(phash_digest_into::<D>(data.as_ref(), chunk_size), chunk_size)
}
}
#[inline(always)]
fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator<Item = T> + 'a
pub(crate) fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator<Item = T> + 'a
where F: Fn(&[u8]) -> T + Send + Sync
{
use rayon::prelude::*;
@ -57,7 +275,7 @@ where F: Fn(&[u8]) -> T + Send + Sync
}
#[inline(always)]
fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec<T>
pub(crate) fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec<T>
where F: FnOnce(&'a [u8], usize) -> U,
U: rayon::iter::IndexedParallelIterator<Item = T> + 'a
{
@ -118,3 +336,15 @@ fn phash<D: Digest + Send + Sync>(data: impl AsRef<[u8]>, chunk: usize) -> Paral
{
}*/
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn paralell_hashing()
{
let mut zeros = [0u8; 512];
//ParallelDigest::new(10).digest(data)
}
}

@ -1,4 +1,117 @@
//mod pool; A)JR AOISJOAIJOIA JODIJAOSI JDFUCK THIS DOESN@T WORK FUCK ATOMICREFCELL FUCK YOU FFUCK EVERYTHING AAAAAAAAAAAAA
pub mod encode;
pub mod hash;
//mod ext; use ext::*;
mod encode;
mod hash;
pub use digest;
pub use hash::{
ParallelHashOutput,
ParallelHash,
ParallelDigest,
};
pub type DefaultDigest = sha2::Sha256;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Builder<D= DefaultDigest>
{
digest: std::marker::PhantomData<D>,
chunk_size: Option<std::num::NonZeroUsize>,
}
impl Builder<DefaultDigest>
{
#[inline]
pub fn create_default() -> ParallelDigest<DefaultDigest>
{
Self::new().create()
}
pub const fn new_default() -> Self
{
Self {
digest: std::marker::PhantomData,
chunk_size: None
}
}
}
impl<D: digest::Digest> Builder<D>
{
//pub type Digest = D;
pub const fn new() -> Self
{
Self {
digest: std::marker::PhantomData,
chunk_size: None
}
}
pub const fn with_digest<N: digest::Digest>(self) -> Builder<N>
{
Builder {
chunk_size: self.chunk_size,
digest: std::marker::PhantomData,
}
}
pub const fn with_chunk_size(self, chunk_size: usize) -> Self
{
Self {
chunk_size: std::num::NonZeroUsize::new(chunk_size),
..self
}
}
}
impl<D: digest::Digest + Send + Sync> Builder<D>
{
#[inline]
pub fn chunk_size(&self) -> usize
{
extern "C" {
fn getpagesize() -> std::os::raw::c_int;
}
self.chunk_size
.or_else(|| unsafe { std::num::NonZeroUsize::new(getpagesize().try_into().expect("Failed to get page size")) })
.expect("Failed to get page size: cannot be zero")
.get()
}
#[inline]
pub fn create(self) -> ParallelDigest<D>
{
ParallelDigest::<D>::new(self.chunk_size())
}
#[inline]
pub fn digest_slice(self, slice: impl AsRef<[u8]>) -> ParallelHash<D>
{
ParallelDigest::<D>::digest_slice(self.chunk_size(), slice)
}
}
impl<D: digest::Digest + Send + Sync> From<Builder<D>> for ParallelDigest<D>
{
#[inline]
fn from(from: Builder<D>) -> Self
{
from.create()
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn parallel_hash_specific_cs()
{
const CHUNK_SIZE: usize = 0;
let slice = [10u8; 4096 << 2];//b"One two three for fize size seven eight.";
let mut digest = Builder::new().with_chunk_size(CHUNK_SIZE).create();
digest.append_slice(slice);
let digest= digest.complete().finalize();
println!("digest: {digest}");
assert_eq!(digest, Builder::new_default().with_chunk_size(CHUNK_SIZE).digest_slice(slice).finalize(), "phash of same input produced different output");
assert_ne!(digest, Builder::new_default().with_chunk_size(CHUNK_SIZE).digest_slice(&slice[..8]).finalize(), "phash of differing input produced same output");
}
}

Loading…
Cancel
Save