You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
351 lines
8.6 KiB
351 lines
8.6 KiB
//! Hashing
|
|
//use super::pool;
|
|
use digest::{
|
|
Digest,
|
|
Output,
|
|
};
|
|
use std::fmt;
|
|
use bytes::Buf;
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct ParallelDigest<D: Digest + Send + Sync>{
|
|
chunk_size: usize,
|
|
completed: ParallelHash<D>,
|
|
last_slice: Vec<u8>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Hash)]
|
|
pub struct ParallelHash<D: Digest>(Vec<Output<D>>, usize);
|
|
|
|
|
|
impl<D: Digest> Eq for ParallelHash<D>{}
|
|
impl<D: Digest> PartialEq for ParallelHash<D>
|
|
{
|
|
fn eq(&self, other: &Self) -> bool
|
|
{
|
|
self.1 == other.1 && self.0.iter().map(|x| &x[..]).eq(other.0.iter().map(|x| &x[..]))
|
|
}
|
|
}
|
|
|
|
|
|
#[derive(Debug, Clone, Hash)]
|
|
pub struct ParallelHashOutput<D: Digest>(usize, Output<D>);
|
|
|
|
impl<D: Digest> Eq for ParallelHashOutput<D>{}
|
|
impl<D: Digest> PartialEq for ParallelHashOutput<D>
|
|
{
|
|
fn eq(&self, other: &Self) -> bool
|
|
{
|
|
self.0 == other.0 && &self.1[..] == &other.1[..]
|
|
}
|
|
}
|
|
|
|
impl<D: Digest> ParallelHashOutput<D>
|
|
{
|
|
#[inline]
|
|
pub fn chunk_size(&self) -> usize
|
|
{
|
|
self.0
|
|
}
|
|
#[inline]
|
|
pub fn condensed_hash(&self) -> &Output<D>
|
|
{
|
|
&self.1
|
|
}
|
|
pub fn into_bytes(self) -> bytes::Bytes
|
|
{
|
|
let mut bytes = bytes::BytesMut::with_capacity(self.1.len() + std::mem::size_of::<u64>());
|
|
use bytes::BufMut;
|
|
bytes.put_u64(self.0.try_into().unwrap());
|
|
bytes.extend(self.1.into_iter());
|
|
bytes.freeze()
|
|
}
|
|
pub fn from_bytes(mut bytes: impl bytes::Buf) -> Self
|
|
{
|
|
let mut output = Output::<D>::default();
|
|
Self(bytes.get_u64().try_into().unwrap(), {
|
|
bytes.copy_to_slice(&mut output[..]);
|
|
output
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<D: Digest> From<ParallelHashOutput<D>> for bytes::Bytes
|
|
{
|
|
#[inline]
|
|
fn from(from: ParallelHashOutput<D>) -> Self
|
|
{
|
|
from.into_bytes()
|
|
}
|
|
}
|
|
|
|
|
|
impl<D: Digest> From<ParallelHash<D>> for ParallelHashOutput<D>
|
|
{
|
|
#[inline]
|
|
fn from(from: ParallelHash<D>) -> Self
|
|
{
|
|
from.finalize()
|
|
}
|
|
}
|
|
|
|
impl<D: Digest> ParallelHash<D>
|
|
{
|
|
pub fn finalize(self) -> ParallelHashOutput<D>
|
|
{
|
|
let mut digest = D::new();
|
|
for buf in self.0.into_iter() {
|
|
digest.update(buf);
|
|
}
|
|
ParallelHashOutput(self.1, digest.finalize())
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn digest_single_chunk(slice: impl AsRef<[u8]>) -> Output<D>
|
|
{
|
|
D::digest(slice)
|
|
}
|
|
|
|
/// # Safety
|
|
/// `last` must either be:
|
|
/// * the last slice written to `self`
|
|
/// * a full chunk
|
|
#[inline]
|
|
unsafe fn append_slice_unchecked(&mut self, last: impl AsRef<[u8]>)
|
|
{
|
|
self.0.push(Self::digest_single_chunk(last));
|
|
}
|
|
|
|
#[inline]
|
|
pub(crate) fn append_chunks(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item=Output<D>>) -> usize
|
|
{
|
|
let mut temp = Vec::new();
|
|
chunks.collect_into_vec(&mut temp);
|
|
self.0.extend_from_slice(&temp[..]);
|
|
temp.len()
|
|
}
|
|
|
|
fn append_chunks_unzipped<E: Send>(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item= (Option<Output<D>>, Option<E>)>) -> (usize, impl std::iter::Iterator<Item=E>)
|
|
{
|
|
let (mut ok, mut err) = (Vec::new(), Vec::new());
|
|
chunks.unzip_into_vecs(&mut ok, &mut err);
|
|
let l = self.0.len();
|
|
self.0.extend(ok.into_iter().filter_map(std::convert::identity));
|
|
(self.0.len() - l, err.into_iter().filter_map(std::convert::identity))
|
|
}
|
|
}
|
|
|
|
|
|
impl<D: Digest> fmt::Display for ParallelHashOutput<D>
|
|
{
|
|
#[inline]
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
use crate::encode::*;
|
|
write!(f, "{}:", self.0)?;
|
|
HexEncoder::encode_slice(&HexEncoder, &self.1, f)
|
|
}
|
|
}
|
|
|
|
impl<D: Digest> fmt::Display for ParallelHash<D>
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
use crate::encode::*;
|
|
let mut d = D::new();
|
|
write!(f, "{}:", self.1)?;
|
|
|
|
for output in self.0.iter() {
|
|
d.update(&output[..]);
|
|
}
|
|
HexEncoder::encode_slice(&HexEncoder, &d.finalize()[..], f)
|
|
}
|
|
}
|
|
|
|
|
|
impl<D: Digest + Send + Sync> ParallelDigest<D>
|
|
{
|
|
/// Create a new digest
|
|
pub fn new(chunk_size: usize) -> Self
|
|
{
|
|
Self{
|
|
chunk_size,
|
|
completed: ParallelHash(Vec::new(), chunk_size),
|
|
last_slice: Vec::with_capacity(chunk_size),
|
|
}
|
|
}
|
|
|
|
fn append_slice_raw<'s>(&mut self, slice: &'s [u8]) -> Option<&'s [u8]>
|
|
{
|
|
use rayon::prelude::*;
|
|
let (_, mut rest) = self
|
|
.completed
|
|
.append_chunks_unzipped(slice
|
|
.par_chunks(self.chunk_size)
|
|
.map(|slice| if slice.len() == self.chunk_size {
|
|
(Some(ParallelHash::<D>::digest_single_chunk(slice)), None)
|
|
} else {
|
|
(None, Some(slice))
|
|
}));
|
|
rest.next() // We know there will only be one of these
|
|
}
|
|
|
|
fn fill_and_take_buf(&mut self, slice: &mut &[u8]) -> Option<bytes::Bytes>
|
|
{
|
|
if !self.last_slice.is_empty() {
|
|
let rest = self.chunk_size - self.last_slice.len();
|
|
let rest = std::cmp::min(slice.len(), rest);
|
|
self.last_slice.extend_from_slice(&slice[..rest]); //XXX: This is probably quite slow...
|
|
if slice.len() != rest {
|
|
*slice = &slice[rest..];
|
|
} else {
|
|
*slice = &[];
|
|
}
|
|
|
|
if self.last_slice.len() == self.chunk_size {
|
|
let output = (&mut &self.last_slice[..]).copy_to_bytes(self.chunk_size);
|
|
self.last_slice.clear();
|
|
Some(output)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
pub fn append_buf(&mut self, buf: &mut (impl Buf + ?Sized))
|
|
{
|
|
while buf.has_remaining() {
|
|
let cs = {
|
|
let chunk = buf.chunk();
|
|
self.append_slice(chunk);
|
|
chunk.len()
|
|
};
|
|
buf.advance(cs);
|
|
}
|
|
}
|
|
|
|
pub fn append_slice(&mut self, slice: impl AsRef<[u8]>)
|
|
{
|
|
let mut slice = slice.as_ref();
|
|
if let Some(chunk0) = self.fill_and_take_buf(&mut slice) {
|
|
debug_assert_eq!(chunk0.len(), self.chunk_size, "internal buffer release not chunk_size long");
|
|
// SAFETY: we know `chunk0` is a full chunk.
|
|
unsafe {
|
|
self.completed.append_slice_unchecked(chunk0)
|
|
}
|
|
}
|
|
if slice.len() > 0 {
|
|
if let Some(rest) = self.append_slice_raw(slice) {
|
|
self.last_slice.extend_from_slice(rest);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn complete(self) -> ParallelHash<D>
|
|
{
|
|
if self.last_slice.is_empty() {
|
|
self.completed
|
|
} else {
|
|
let mut output = self.completed;
|
|
// SAFETY: This is the final slice written to `output`.
|
|
unsafe {
|
|
output.append_slice_unchecked(self.last_slice);
|
|
}
|
|
output
|
|
}
|
|
}
|
|
|
|
/// Compute a parallel digest from `data` only.
|
|
#[inline]
|
|
pub fn digest_slice(chunk_size: usize, data: impl AsRef<[u8]>) -> ParallelHash<D>
|
|
{
|
|
ParallelHash(phash_digest_into::<D>(data.as_ref(), chunk_size), chunk_size)
|
|
}
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub(crate) fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator<Item = T> + 'a
|
|
where F: Fn(&[u8]) -> T + Send + Sync
|
|
{
|
|
use rayon::prelude::*;
|
|
|
|
data.par_chunks(chunk).map(digest)
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub(crate) fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec<T>
|
|
where F: FnOnce(&'a [u8], usize) -> U,
|
|
U: rayon::iter::IndexedParallelIterator<Item = T> + 'a
|
|
{
|
|
let mut output = Vec::with_capacity(1 + (data.len() / chunk));
|
|
func(data, chunk).collect_into_vec(&mut output);
|
|
output
|
|
}
|
|
|
|
|
|
#[inline(always)]
|
|
pub(crate) fn phash_raw_into<T: Send, F>(data: &[u8], chunk: usize, digest: F) -> Vec<T>//Vec<Output<D>>
|
|
where F: Fn(&[u8]) -> T + Send + Sync
|
|
{
|
|
use rayon::prelude::*;
|
|
let mut output = Vec::with_capacity(1 + (data.len() / chunk));
|
|
|
|
phash_raw(data, chunk, digest)
|
|
.collect_into_vec(&mut output);
|
|
output
|
|
}
|
|
/*
|
|
pub(crate) fn phash_pool<'a, D: Digest + digest::FixedOutputReset + Send + Sync + 'a>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator<Item= Output<D>> + 'a
|
|
{
|
|
let pool = pool::Pool::new_with(rayon::max_num_threads(), D::new);
|
|
|
|
phash_raw(data.as_ref(), chunk, move |range| {
|
|
let mut digest = pool.take_one_or_else(D::new);
|
|
<D as Digest>::update(&mut digest, range);
|
|
digest.finalize_reset()
|
|
})
|
|
}
|
|
|
|
#[inline]
|
|
pub(crate) fn phash_pool_into<D: Digest + digest::FixedOutputReset + Send + Sync>(data: &[u8], chunk: usize) -> Vec<Output<D>>
|
|
{
|
|
into_output(data, chunk, phash_pool::<D>)
|
|
}
|
|
*/
|
|
|
|
pub(crate) fn phash_digest<'a, D: Digest>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator<Item = Output<D>> + 'a
|
|
where Output<D>: Send
|
|
{
|
|
phash_raw(data, chunk, |range| {
|
|
let mut digest = D::new();
|
|
digest.update(range);
|
|
digest.finalize()
|
|
})
|
|
}
|
|
|
|
pub(crate) fn phash_digest_into<'a, D: Digest>(data: &[u8], chunk: usize) -> Vec<Output<D>>
|
|
where Output<D>: Send
|
|
{
|
|
into_output(data, chunk, phash_digest::<D>)
|
|
}
|
|
|
|
/*
|
|
fn phash<D: Digest + Send + Sync>(data: impl AsRef<[u8]>, chunk: usize) -> ParallelHash<D>
|
|
{
|
|
|
|
}*/
|
|
|
|
#[cfg(test)]
|
|
mod tests
|
|
{
|
|
use super::*;
|
|
#[test]
|
|
fn paralell_hashing()
|
|
{
|
|
let mut zeros = [0u8; 512];
|
|
//ParallelDigest::new(10).digest(data)
|
|
}
|
|
}
|