diff --git a/src/ext/hashers.rs b/src/ext/hashers.rs index a64451e..615bd92 100644 --- a/src/ext/hashers.rs +++ b/src/ext/hashers.rs @@ -3,6 +3,17 @@ use std::hash::{BuildHasherDefault, Hasher}; use smallvec::SmallVec; use cryptohelpers::sha256; use ::bytes::Buf; +use cryptohelpers::sha2::{ + Sha256, + Digest, +}; +use std::borrow::BorrowMut; +use tokio::io::AsyncWrite; +use std::{ + pin::Pin, + task::{Context, Poll}, + io, +}; /// A hasher that takes the first 8 bytes from SHA256 hash as its output. /// @@ -36,6 +47,107 @@ impl Hasher for Sha256TopHasher } } +/// An `AsyncWrite` implementor that writes it's inputs to a sha256 digest. +#[pin_project] +#[derive(Debug)] +pub struct Sha256Sink = Sha256> +{ + digest: H +} + +impl> Sha256Sink +{ + /// Create a new Sha256-computing `AsyncWrite` sink. + #[inline] pub fn new(digest: H) -> Self + { + Self{digest} + } + /// Consume into the inner digest + #[inline] pub fn into_inner(self) -> H + { + self.digest + } + /// The inner digest + #[inline] pub fn inner(&self) -> &H + { + &self.digest + } + /// The inner digest (mutable) + #[inline] pub fn inner_mut(&mut self) -> &mut H + { + &mut self.digest + } + + #[inline(always)] pub fn digest(&self) -> &Sha256 + { + self.digest.borrow() + } + #[inline(always)] pub fn digest_mut(&mut self) -> &mut Sha256 + { + self.digest.borrow_mut() + } +} +impl> AsRef for Sha256Sink +{ + fn as_ref(&self) -> &Sha256 { + self.digest.borrow() + } +} +impl> AsMut for Sha256Sink +{ + fn as_mut(&mut self) -> &mut Sha256 { + self.digest.borrow_mut() + } +} +/* +impl<'a, H: BorrowMut> AsRef for &'a Sha256Sink + //where H: 'a +{ + fn as_ref(&self) -> &Sha256 { + self.digest.borrow() + } +} +impl<'a, H: BorrowMut> AsMut for &'a mut Sha256Sink + // where H: 'a +{ + fn as_mut(&mut self) -> &mut Sha256 { + self.digest.borrow_mut() + } +} +*/ +impl> BorrowMut for Sha256Sink +{ + #[inline] fn borrow_mut(&mut self) -> &mut Sha256 { + self.digest.borrow_mut() + } +} + +impl> Borrow for Sha256Sink +{ + #[inline] fn borrow(&self) -> &Sha256 { + self.digest.borrow() + } +} + + + +impl> AsyncWrite for Sha256Sink +{ + fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + + this.digest.borrow_mut().update(buf); + Poll::Ready(Ok(buf.len())) + } + #[inline(always)] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + #[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + + } +} + pub trait Sha256HashExt { fn compute_sha256_hash(&self) -> sha256::Sha256Hash; @@ -61,3 +173,4 @@ impl Sha256HashOwnedExt for T sha256::compute_sync(self.reader()).unwrap() } } + diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 196b372..b671333 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -37,7 +37,7 @@ pub mod sync; pub mod plex; pub use plex::MultiplexStreamExt; -// The extension traits are defined in this file, no need to re-export anything from here. +// The extension traits are defined in this `mod.rs` file, no need to re-export anything from here. pub mod chunking; /// How many elements should `precollect` allocate on the stack before spilling to the heap. diff --git a/src/service/cache/mod.rs b/src/service/cache/mod.rs index ec46bd9..25d9368 100644 --- a/src/service/cache/mod.rs +++ b/src/service/cache/mod.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use chrono::DateTime; use uuid::Uuid; use crossbeam_utils::atomic::AtomicCell; +use cryptohelpers::sha256; pub type Timezone = chrono::Utc; @@ -84,6 +85,12 @@ pub struct CacheEntry // // `K` is the key in `entries`. tm_created: DateTime, accesses: AtomicUsize, // Can be mutated when read, hence the atomic. + /// Hash of the memcache + /// + /// Used to ensure integrity of written disk data on an explicit check + /// (implicitly, integrity is checked by comparing the length of the disk stream with the length of the memory stream, since they are write-only in partial entries.) + hash: sha256::Sha256Hash, + memory: Option, // Pathname is computed from `id`. diff --git a/src/service/cache/partial.rs b/src/service/cache/partial.rs index 89b2e58..84f5bef 100644 --- a/src/service/cache/partial.rs +++ b/src/service/cache/partial.rs @@ -5,6 +5,7 @@ use std::path::Path; use tokio::fs::{self, File}; use std::io; use tokio::io::AsyncWrite; +use cryptohelpers::sha2::{Sha256, Digest}; use mem::MemoryMut; @@ -32,12 +33,16 @@ pub struct PartialCacheEntry id: Uuid, key: K, + /// Written to with each write to any instance created by `writer()`. + /// Finalised only when freezing this to a completed entry. + hasher: Sha256, + file: Option, memory: MemoryMut, } /// The writer type for writing to a `PartialCacheEntry`. -pub type PartialCacheEntrySink<'a> = Box; +pub type PartialCacheEntrySink<'a> = plex::MultiplexWrite, Sha256Sink<&'a mut Sha256>>; impl PartialCacheEntry { @@ -49,6 +54,8 @@ impl PartialCacheEntry file: None, memory: MemoryMut::new(), + + hasher: Sha256::new(), key, } @@ -67,12 +74,14 @@ impl PartialCacheEntry /// Create a writer for this entry pub fn writer(&mut self) -> PartialCacheEntrySink<'_> { - if let Some(file) = self.file.as_mut() + let bx: Box = if let Some(file) = self.file.as_mut() { Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file)) } else { Box::new(&mut self.memory) - } + }; + + bx.multiplex(Sha256Sink::new(&mut self.hasher)) } }