You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
8.6 KiB

//! Hashing
//use super::pool;
use digest::{
Digest,
Output,
};
use std::fmt;
use bytes::Buf;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParallelDigest<D: Digest + Send + Sync>{
chunk_size: usize,
completed: ParallelHash<D>,
last_slice: Vec<u8>,
}
#[derive(Debug, Clone, Hash)]
pub struct ParallelHash<D: Digest>(Vec<Output<D>>, usize);
impl<D: Digest> Eq for ParallelHash<D>{}
impl<D: Digest> PartialEq for ParallelHash<D>
{
fn eq(&self, other: &Self) -> bool
{
self.1 == other.1 && self.0.iter().map(|x| &x[..]).eq(other.0.iter().map(|x| &x[..]))
}
}
#[derive(Debug, Clone, Hash)]
pub struct ParallelHashOutput<D: Digest>(usize, Output<D>);
impl<D: Digest> Eq for ParallelHashOutput<D>{}
impl<D: Digest> PartialEq for ParallelHashOutput<D>
{
fn eq(&self, other: &Self) -> bool
{
self.0 == other.0 && &self.1[..] == &other.1[..]
}
}
impl<D: Digest> ParallelHashOutput<D>
{
#[inline]
pub fn chunk_size(&self) -> usize
{
self.0
}
#[inline]
pub fn condensed_hash(&self) -> &Output<D>
{
&self.1
}
pub fn into_bytes(self) -> bytes::Bytes
{
let mut bytes = bytes::BytesMut::with_capacity(self.1.len() + std::mem::size_of::<u64>());
use bytes::BufMut;
bytes.put_u64(self.0.try_into().unwrap());
bytes.extend(self.1.into_iter());
bytes.freeze()
}
pub fn from_bytes(mut bytes: impl bytes::Buf) -> Self
{
let mut output = Output::<D>::default();
Self(bytes.get_u64().try_into().unwrap(), {
bytes.copy_to_slice(&mut output[..]);
output
})
}
}
impl<D: Digest> From<ParallelHashOutput<D>> for bytes::Bytes
{
#[inline]
fn from(from: ParallelHashOutput<D>) -> Self
{
from.into_bytes()
}
}
impl<D: Digest> From<ParallelHash<D>> for ParallelHashOutput<D>
{
#[inline]
fn from(from: ParallelHash<D>) -> Self
{
from.finalize()
}
}
impl<D: Digest> ParallelHash<D>
{
pub fn finalize(self) -> ParallelHashOutput<D>
{
let mut digest = D::new();
for buf in self.0.into_iter() {
digest.update(buf);
}
ParallelHashOutput(self.1, digest.finalize())
}
#[inline(always)]
fn digest_single_chunk(slice: impl AsRef<[u8]>) -> Output<D>
{
D::digest(slice)
}
/// # Safety
/// `last` must either be:
/// * the last slice written to `self`
/// * a full chunk
#[inline]
unsafe fn append_slice_unchecked(&mut self, last: impl AsRef<[u8]>)
{
self.0.push(Self::digest_single_chunk(last));
}
#[inline]
pub(crate) fn append_chunks(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item=Output<D>>) -> usize
{
let mut temp = Vec::new();
chunks.collect_into_vec(&mut temp);
self.0.extend_from_slice(&temp[..]);
temp.len()
}
fn append_chunks_unzipped<E: Send>(&mut self, chunks: impl rayon::iter::IndexedParallelIterator<Item= (Option<Output<D>>, Option<E>)>) -> (usize, impl std::iter::Iterator<Item=E>)
{
let (mut ok, mut err) = (Vec::new(), Vec::new());
chunks.unzip_into_vecs(&mut ok, &mut err);
let l = self.0.len();
self.0.extend(ok.into_iter().filter_map(std::convert::identity));
(self.0.len() - l, err.into_iter().filter_map(std::convert::identity))
}
}
impl<D: Digest> fmt::Display for ParallelHashOutput<D>
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
use crate::encode::*;
write!(f, "{}:", self.0)?;
HexEncoder::encode_slice(&HexEncoder, &self.1, f)
}
}
impl<D: Digest> fmt::Display for ParallelHash<D>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
use crate::encode::*;
let mut d = D::new();
write!(f, "{}:", self.1)?;
for output in self.0.iter() {
d.update(&output[..]);
}
HexEncoder::encode_slice(&HexEncoder, &d.finalize()[..], f)
}
}
impl<D: Digest + Send + Sync> ParallelDigest<D>
{
/// Create a new digest
pub fn new(chunk_size: usize) -> Self
{
Self{
chunk_size,
completed: ParallelHash(Vec::new(), chunk_size),
last_slice: Vec::with_capacity(chunk_size),
}
}
fn append_slice_raw<'s>(&mut self, slice: &'s [u8]) -> Option<&'s [u8]>
{
use rayon::prelude::*;
let (_, mut rest) = self
.completed
.append_chunks_unzipped(slice
.par_chunks(self.chunk_size)
.map(|slice| if slice.len() == self.chunk_size {
(Some(ParallelHash::<D>::digest_single_chunk(slice)), None)
} else {
(None, Some(slice))
}));
rest.next() // We know there will only be one of these
}
fn fill_and_take_buf(&mut self, slice: &mut &[u8]) -> Option<bytes::Bytes>
{
if !self.last_slice.is_empty() {
let rest = self.chunk_size - self.last_slice.len();
let rest = std::cmp::min(slice.len(), rest);
self.last_slice.extend_from_slice(&slice[..rest]); //XXX: This is probably quite slow...
if slice.len() != rest {
*slice = &slice[rest..];
} else {
*slice = &[];
}
if self.last_slice.len() == self.chunk_size {
let output = (&mut &self.last_slice[..]).copy_to_bytes(self.chunk_size);
self.last_slice.clear();
Some(output)
} else {
None
}
} else {
None
}
}
pub fn append_buf(&mut self, buf: &mut (impl Buf + ?Sized))
{
while buf.has_remaining() {
let cs = {
let chunk = buf.chunk();
self.append_slice(chunk);
chunk.len()
};
buf.advance(cs);
}
}
pub fn append_slice(&mut self, slice: impl AsRef<[u8]>)
{
let mut slice = slice.as_ref();
if let Some(chunk0) = self.fill_and_take_buf(&mut slice) {
debug_assert_eq!(chunk0.len(), self.chunk_size, "internal buffer release not chunk_size long");
// SAFETY: we know `chunk0` is a full chunk.
unsafe {
self.completed.append_slice_unchecked(chunk0)
}
}
if slice.len() > 0 {
if let Some(rest) = self.append_slice_raw(slice) {
self.last_slice.extend_from_slice(rest);
}
}
}
pub fn complete(self) -> ParallelHash<D>
{
if self.last_slice.is_empty() {
self.completed
} else {
let mut output = self.completed;
// SAFETY: This is the final slice written to `output`.
unsafe {
output.append_slice_unchecked(self.last_slice);
}
output
}
}
/// Compute a parallel digest from `data` only.
#[inline]
pub fn digest_slice(chunk_size: usize, data: impl AsRef<[u8]>) -> ParallelHash<D>
{
ParallelHash(phash_digest_into::<D>(data.as_ref(), chunk_size), chunk_size)
}
}
#[inline(always)]
pub(crate) fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator<Item = T> + 'a
where F: Fn(&[u8]) -> T + Send + Sync
{
use rayon::prelude::*;
data.par_chunks(chunk).map(digest)
}
#[inline(always)]
pub(crate) fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec<T>
where F: FnOnce(&'a [u8], usize) -> U,
U: rayon::iter::IndexedParallelIterator<Item = T> + 'a
{
let mut output = Vec::with_capacity(1 + (data.len() / chunk));
func(data, chunk).collect_into_vec(&mut output);
output
}
#[inline(always)]
pub(crate) fn phash_raw_into<T: Send, F>(data: &[u8], chunk: usize, digest: F) -> Vec<T>//Vec<Output<D>>
where F: Fn(&[u8]) -> T + Send + Sync
{
use rayon::prelude::*;
let mut output = Vec::with_capacity(1 + (data.len() / chunk));
phash_raw(data, chunk, digest)
.collect_into_vec(&mut output);
output
}
/*
pub(crate) fn phash_pool<'a, D: Digest + digest::FixedOutputReset + Send + Sync + 'a>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator<Item= Output<D>> + 'a
{
let pool = pool::Pool::new_with(rayon::max_num_threads(), D::new);
phash_raw(data.as_ref(), chunk, move |range| {
let mut digest = pool.take_one_or_else(D::new);
<D as Digest>::update(&mut digest, range);
digest.finalize_reset()
})
}
#[inline]
pub(crate) fn phash_pool_into<D: Digest + digest::FixedOutputReset + Send + Sync>(data: &[u8], chunk: usize) -> Vec<Output<D>>
{
into_output(data, chunk, phash_pool::<D>)
}
*/
pub(crate) fn phash_digest<'a, D: Digest>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator<Item = Output<D>> + 'a
where Output<D>: Send
{
phash_raw(data, chunk, |range| {
let mut digest = D::new();
digest.update(range);
digest.finalize()
})
}
pub(crate) fn phash_digest_into<'a, D: Digest>(data: &[u8], chunk: usize) -> Vec<Output<D>>
where Output<D>: Send
{
into_output(data, chunk, phash_digest::<D>)
}
/*
fn phash<D: Digest + Send + Sync>(data: impl AsRef<[u8]>, chunk: usize) -> ParallelHash<D>
{
}*/
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn paralell_hashing()
{
let mut zeros = [0u8; 512];
//ParallelDigest::new(10).digest(data)
}
}