cache: added hashing on partial entry, and hash field to full entry

new
Avril 4 years ago
parent 53fd8b89bc
commit 659cb0d835
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -3,6 +3,17 @@ use std::hash::{BuildHasherDefault, Hasher};
use smallvec::SmallVec; use smallvec::SmallVec;
use cryptohelpers::sha256; use cryptohelpers::sha256;
use ::bytes::Buf; use ::bytes::Buf;
use cryptohelpers::sha2::{
Sha256,
Digest,
};
use std::borrow::BorrowMut;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
io,
};
/// A hasher that takes the first 8 bytes from SHA256 hash as its output. /// A hasher that takes the first 8 bytes from SHA256 hash as its output.
/// ///
@ -36,6 +47,107 @@ impl Hasher for Sha256TopHasher
} }
} }
/// An `AsyncWrite` implementor that writes it's inputs to a sha256 digest.
#[pin_project]
#[derive(Debug)]
pub struct Sha256Sink<H: BorrowMut<Sha256> = Sha256>
{
digest: H
}
impl<H: BorrowMut<Sha256>> Sha256Sink<H>
{
/// Create a new Sha256-computing `AsyncWrite` sink.
#[inline] pub fn new(digest: H) -> Self
{
Self{digest}
}
/// Consume into the inner digest
#[inline] pub fn into_inner(self) -> H
{
self.digest
}
/// The inner digest
#[inline] pub fn inner(&self) -> &H
{
&self.digest
}
/// The inner digest (mutable)
#[inline] pub fn inner_mut(&mut self) -> &mut H
{
&mut self.digest
}
#[inline(always)] pub fn digest(&self) -> &Sha256
{
self.digest.borrow()
}
#[inline(always)] pub fn digest_mut(&mut self) -> &mut Sha256
{
self.digest.borrow_mut()
}
}
impl<H: BorrowMut<Sha256>> AsRef<Sha256> for Sha256Sink<H>
{
fn as_ref(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<H: BorrowMut<Sha256>> AsMut<Sha256> for Sha256Sink<H>
{
fn as_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
/*
impl<'a, H: BorrowMut<Sha256>> AsRef<Sha256> for &'a Sha256Sink<H>
//where H: 'a
{
fn as_ref(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<'a, H: BorrowMut<Sha256>> AsMut<Sha256> for &'a mut Sha256Sink<H>
// where H: 'a
{
fn as_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
*/
impl<H: BorrowMut<Sha256>> BorrowMut<Sha256> for Sha256Sink<H>
{
#[inline] fn borrow_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
impl<H: BorrowMut<Sha256>> Borrow<Sha256> for Sha256Sink<H>
{
#[inline] fn borrow(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<H: BorrowMut<Sha256>> AsyncWrite for Sha256Sink<H>
{
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
this.digest.borrow_mut().update(buf);
Poll::Ready(Ok(buf.len()))
}
#[inline(always)] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
#[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
pub trait Sha256HashExt pub trait Sha256HashExt
{ {
fn compute_sha256_hash(&self) -> sha256::Sha256Hash; fn compute_sha256_hash(&self) -> sha256::Sha256Hash;
@ -61,3 +173,4 @@ impl<T: Buf> Sha256HashOwnedExt for T
sha256::compute_sync(self.reader()).unwrap() sha256::compute_sync(self.reader()).unwrap()
} }
} }

@ -37,7 +37,7 @@ pub mod sync;
pub mod plex; pub mod plex;
pub use plex::MultiplexStreamExt; pub use plex::MultiplexStreamExt;
// The extension traits are defined in this file, no need to re-export anything from here. // The extension traits are defined in this `mod.rs` file, no need to re-export anything from here.
pub mod chunking; pub mod chunking;
/// How many elements should `precollect` allocate on the stack before spilling to the heap. /// How many elements should `precollect` allocate on the stack before spilling to the heap.

@ -18,6 +18,7 @@ use std::collections::HashMap;
use chrono::DateTime; use chrono::DateTime;
use uuid::Uuid; use uuid::Uuid;
use crossbeam_utils::atomic::AtomicCell; use crossbeam_utils::atomic::AtomicCell;
use cryptohelpers::sha256;
pub type Timezone = chrono::Utc; pub type Timezone = chrono::Utc;
@ -84,6 +85,12 @@ pub struct CacheEntry //<K: Key> // `K` is the key in `entries`.
tm_created: DateTime<Timezone>, tm_created: DateTime<Timezone>,
accesses: AtomicUsize, // Can be mutated when read, hence the atomic. accesses: AtomicUsize, // Can be mutated when read, hence the atomic.
/// Hash of the memcache
///
/// Used to ensure integrity of written disk data on an explicit check
/// (implicitly, integrity is checked by comparing the length of the disk stream with the length of the memory stream, since they are write-only in partial entries.)
hash: sha256::Sha256Hash,
memory: Option<Memory>, memory: Option<Memory>,
// Pathname is computed from `id`. // Pathname is computed from `id`.

@ -5,6 +5,7 @@ use std::path::Path;
use tokio::fs::{self, File}; use tokio::fs::{self, File};
use std::io; use std::io;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use cryptohelpers::sha2::{Sha256, Digest};
use mem::MemoryMut; use mem::MemoryMut;
@ -32,12 +33,16 @@ pub struct PartialCacheEntry<K>
id: Uuid, id: Uuid,
key: K, key: K,
/// Written to with each write to any instance created by `writer()`.
/// Finalised only when freezing this to a completed entry.
hasher: Sha256,
file: Option<File>, file: Option<File>,
memory: MemoryMut, memory: MemoryMut,
} }
/// The writer type for writing to a `PartialCacheEntry`. /// The writer type for writing to a `PartialCacheEntry`.
pub type PartialCacheEntrySink<'a> = Box<dyn AsyncWrite + Send + Sync + Unpin + 'a>; pub type PartialCacheEntrySink<'a> = plex::MultiplexWrite<Box<dyn AsyncWrite + Send + Sync + Unpin + 'a>, Sha256Sink<&'a mut Sha256>>;
impl<K: Key> PartialCacheEntry<K> impl<K: Key> PartialCacheEntry<K>
{ {
@ -49,6 +54,8 @@ impl<K: Key> PartialCacheEntry<K>
file: None, file: None,
memory: MemoryMut::new(), memory: MemoryMut::new(),
hasher: Sha256::new(),
key, key,
} }
@ -67,12 +74,14 @@ impl<K: Key> PartialCacheEntry<K>
/// Create a writer for this entry /// Create a writer for this entry
pub fn writer(&mut self) -> PartialCacheEntrySink<'_> pub fn writer(&mut self) -> PartialCacheEntrySink<'_>
{ {
if let Some(file) = self.file.as_mut() let bx: Box<dyn AsyncWrite + Send + Sync + Unpin + '_> = if let Some(file) = self.file.as_mut()
{ {
Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file)) Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file))
} else { } else {
Box::new(&mut self.memory) Box::new(&mut self.memory)
} };
bx.multiplex(Sha256Sink::new(&mut self.hasher))
} }
} }

Loading…
Cancel
Save