//! Hashing //use super::pool; use digest::{ Digest, Output, }; use std::fmt; use bytes::Buf; #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParallelDigest{ chunk_size: usize, completed: ParallelHash, last_slice: Vec, } #[derive(Debug, Clone, Hash)] pub struct ParallelHash(Vec>, usize); impl Eq for ParallelHash{} impl PartialEq for ParallelHash { fn eq(&self, other: &Self) -> bool { self.1 == other.1 && self.0.iter().map(|x| &x[..]).eq(other.0.iter().map(|x| &x[..])) } } #[derive(Debug, Clone, Hash)] pub struct ParallelHashOutput(usize, Output); impl Eq for ParallelHashOutput{} impl PartialEq for ParallelHashOutput { fn eq(&self, other: &Self) -> bool { self.0 == other.0 && &self.1[..] == &other.1[..] } } impl ParallelHashOutput { #[inline] pub fn chunk_size(&self) -> usize { self.0 } #[inline] pub fn condensed_hash(&self) -> &Output { &self.1 } pub fn into_bytes(self) -> bytes::Bytes { let mut bytes = bytes::BytesMut::with_capacity(self.1.len() + std::mem::size_of::()); use bytes::BufMut; bytes.put_u64(self.0.try_into().unwrap()); bytes.extend(self.1.into_iter()); bytes.freeze() } pub fn from_bytes(mut bytes: impl bytes::Buf) -> Self { let mut output = Output::::default(); Self(bytes.get_u64().try_into().unwrap(), { bytes.copy_to_slice(&mut output[..]); output }) } } impl From> for bytes::Bytes { #[inline] fn from(from: ParallelHashOutput) -> Self { from.into_bytes() } } impl From> for ParallelHashOutput { #[inline] fn from(from: ParallelHash) -> Self { from.finalize() } } impl ParallelHash { pub fn finalize(self) -> ParallelHashOutput { let mut digest = D::new(); for buf in self.0.into_iter() { digest.update(buf); } ParallelHashOutput(self.1, digest.finalize()) } #[inline(always)] fn digest_single_chunk(slice: impl AsRef<[u8]>) -> Output { D::digest(slice) } /// # Safety /// `last` must either be: /// * the last slice written to `self` /// * a full chunk #[inline] unsafe fn append_slice_unchecked(&mut self, last: impl AsRef<[u8]>) { self.0.push(Self::digest_single_chunk(last)); } #[inline] pub(crate) fn append_chunks(&mut self, chunks: impl rayon::iter::IndexedParallelIterator>) -> usize { let mut temp = Vec::new(); chunks.collect_into_vec(&mut temp); self.0.extend_from_slice(&temp[..]); temp.len() } fn append_chunks_unzipped(&mut self, chunks: impl rayon::iter::IndexedParallelIterator>, Option)>) -> (usize, impl std::iter::Iterator) { let (mut ok, mut err) = (Vec::new(), Vec::new()); chunks.unzip_into_vecs(&mut ok, &mut err); let l = self.0.len(); self.0.extend(ok.into_iter().filter_map(std::convert::identity)); (self.0.len() - l, err.into_iter().filter_map(std::convert::identity)) } } impl fmt::Display for ParallelHashOutput { #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use crate::encode::*; write!(f, "{}:", self.0)?; HexEncoder::encode_slice(&HexEncoder, &self.1, f) } } impl fmt::Display for ParallelHash { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use crate::encode::*; let mut d = D::new(); write!(f, "{}:", self.1)?; for output in self.0.iter() { d.update(&output[..]); } HexEncoder::encode_slice(&HexEncoder, &d.finalize()[..], f) } } impl ParallelDigest { /// Create a new digest pub fn new(chunk_size: usize) -> Self { Self{ chunk_size, completed: ParallelHash(Vec::new(), chunk_size), last_slice: Vec::with_capacity(chunk_size), } } fn append_slice_raw<'s>(&mut self, slice: &'s [u8]) -> Option<&'s [u8]> { use rayon::prelude::*; let (_, mut rest) = self .completed .append_chunks_unzipped(slice .par_chunks(self.chunk_size) .map(|slice| if slice.len() == self.chunk_size { (Some(ParallelHash::::digest_single_chunk(slice)), None) } else { (None, Some(slice)) })); rest.next() // We know there will only be one of these } fn fill_and_take_buf(&mut self, slice: &mut &[u8]) -> Option { if !self.last_slice.is_empty() { let rest = self.chunk_size - self.last_slice.len(); let rest = std::cmp::min(slice.len(), rest); self.last_slice.extend_from_slice(&slice[..rest]); //XXX: This is probably quite slow... if slice.len() != rest { *slice = &slice[rest..]; } else { *slice = &[]; } if self.last_slice.len() == self.chunk_size { let output = (&mut &self.last_slice[..]).copy_to_bytes(self.chunk_size); self.last_slice.clear(); Some(output) } else { None } } else { None } } pub fn append_buf(&mut self, buf: &mut (impl Buf + ?Sized)) { while buf.has_remaining() { let cs = { let chunk = buf.chunk(); self.append_slice(chunk); chunk.len() }; buf.advance(cs); } } pub fn append_slice(&mut self, slice: impl AsRef<[u8]>) { let mut slice = slice.as_ref(); if let Some(chunk0) = self.fill_and_take_buf(&mut slice) { debug_assert_eq!(chunk0.len(), self.chunk_size, "internal buffer release not chunk_size long"); // SAFETY: we know `chunk0` is a full chunk. unsafe { self.completed.append_slice_unchecked(chunk0) } } if slice.len() > 0 { if let Some(rest) = self.append_slice_raw(slice) { self.last_slice.extend_from_slice(rest); } } } pub fn complete(self) -> ParallelHash { if self.last_slice.is_empty() { self.completed } else { let mut output = self.completed; // SAFETY: This is the final slice written to `output`. unsafe { output.append_slice_unchecked(self.last_slice); } output } } /// Compute a parallel digest from `data` only. #[inline] pub fn digest_slice(chunk_size: usize, data: impl AsRef<[u8]>) -> ParallelHash { ParallelHash(phash_digest_into::(data.as_ref(), chunk_size), chunk_size) } } #[inline(always)] pub(crate) fn phash_raw<'a, T: Send, F: 'a>(data: &'a [u8], chunk: usize, digest: F) -> impl rayon::iter::IndexedParallelIterator + 'a where F: Fn(&[u8]) -> T + Send + Sync { use rayon::prelude::*; data.par_chunks(chunk).map(digest) } #[inline(always)] pub(crate) fn into_output<'a, T,F, U>(data: &'a [u8], chunk: usize, func: F) -> Vec where F: FnOnce(&'a [u8], usize) -> U, U: rayon::iter::IndexedParallelIterator + 'a { let mut output = Vec::with_capacity(1 + (data.len() / chunk)); func(data, chunk).collect_into_vec(&mut output); output } #[inline(always)] pub(crate) fn phash_raw_into(data: &[u8], chunk: usize, digest: F) -> Vec//Vec> where F: Fn(&[u8]) -> T + Send + Sync { use rayon::prelude::*; let mut output = Vec::with_capacity(1 + (data.len() / chunk)); phash_raw(data, chunk, digest) .collect_into_vec(&mut output); output } /* pub(crate) fn phash_pool<'a, D: Digest + digest::FixedOutputReset + Send + Sync + 'a>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator> + 'a { let pool = pool::Pool::new_with(rayon::max_num_threads(), D::new); phash_raw(data.as_ref(), chunk, move |range| { let mut digest = pool.take_one_or_else(D::new); ::update(&mut digest, range); digest.finalize_reset() }) } #[inline] pub(crate) fn phash_pool_into(data: &[u8], chunk: usize) -> Vec> { into_output(data, chunk, phash_pool::) } */ pub(crate) fn phash_digest<'a, D: Digest>(data: &'a [u8], chunk: usize) -> impl rayon::iter::IndexedParallelIterator> + 'a where Output: Send { phash_raw(data, chunk, |range| { let mut digest = D::new(); digest.update(range); digest.finalize() }) } pub(crate) fn phash_digest_into<'a, D: Digest>(data: &[u8], chunk: usize) -> Vec> where Output: Send { into_output(data, chunk, phash_digest::) } /* fn phash(data: impl AsRef<[u8]>, chunk: usize) -> ParallelHash { }*/ #[cfg(test)] mod tests { use super::*; #[test] fn paralell_hashing() { let mut zeros = [0u8; 512]; //ParallelDigest::new(10).digest(data) } }