diff --git a/src/service/cache/error.rs b/src/service/cache/error.rs new file mode 100644 index 0000000..f2414ed --- /dev/null +++ b/src/service/cache/error.rs @@ -0,0 +1,25 @@ +//! Caching errors +use super::*; +use std::io; +use std::{ + fmt, + error, +}; + +/// A partial cache entry initialisation error +#[derive(Debug)] +pub struct PartialInitError(pub(super) io::Error); + +impl error::Error for PartialInitError +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(&self.0) + } +} +impl fmt::Display for PartialInitError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "Failed to initialise a partial cache entry") + } +} diff --git a/src/service/cache/mem.rs b/src/service/cache/mem.rs new file mode 100644 index 0000000..77f8ece --- /dev/null +++ b/src/service/cache/mem.rs @@ -0,0 +1,59 @@ +//! Memory holding types +use super::*; +use ::bytes::{ + Bytes, + BytesMut, + BufMut, + Buf, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, + io, +}; +use tokio::io::AsyncWrite; + +// TODO: For when `MemoryMut` is vectorised, this will be the max size for each allocation +pub const PAGE_SIZE: usize = 4096; + +/// The memory hold of a cahce entry. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Memory(pub(super) Bytes); + +/// Unfrozen memory hold of a cache entry. +//TODO: Allow for non-resizing writes by making this `LinkedList` or something, then we can implement `BufMut` for this +#[derive(Debug, PartialEq, Eq, Hash, Default)] +pub struct MemoryMut(pub(super) BytesMut); //TODO: Type will be `SmallVec<[BytesMut; 1]>` or `LinkedList` or maybe a `smolset` type (probably linkedlist is best actually...). + +impl MemoryMut +{ + /// Freeze this mutable memory into an immutable one + pub fn freeze(self) -> Memory + { + Memory(self.0.freeze()) + } + + /// Create a new, empty mutable memory pool + pub fn new() -> Self + { + Self(BytesMut::new()) + } +} + +impl AsyncWrite for MemoryMut +{ + #[inline(always)] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + //TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut` + Poll::Ready(Ok(())) + } + #[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + //TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut` + Poll::Ready(Ok(())) + } + fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + //TODO: When vectorised, this will either: fill the previous allocation with enough out of `buf`; then create a new allocation and write the rest there; repeat. + // This is kinda paging + self.get_mut().0.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } +} diff --git a/src/service/cache/mod.rs b/src/service/cache/mod.rs index 7e166ae..ec46bd9 100644 --- a/src/service/cache/mod.rs +++ b/src/service/cache/mod.rs @@ -16,7 +16,6 @@ use std::path::PathBuf; use std::num::NonZeroUsize; use std::collections::HashMap; use chrono::DateTime; -use ::bytes::Bytes; use uuid::Uuid; use crossbeam_utils::atomic::AtomicCell; @@ -47,6 +46,9 @@ Send + 'static {} +mod mem; +use mem::Memory; + basic_enum!(pub PurgeOrder; "How should a cache determine what to purge": Oldest => "Purge the oldest entries first", LeastUsed => "Purge the least accessed entries first", OldestUsed => "Purge the oldest accessed entries first"); default!(PurgeOrder: Self::LeastUsed); @@ -72,10 +74,6 @@ pub struct Config pub mem_purge_order: PurgeOrder, } -/// The memory hold of a cahce. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] -struct Memory(Bytes); - /// An entry in a `ByteCache`. #[derive(Debug)] pub struct CacheEntry // // `K` is the key in `entries`. @@ -95,9 +93,10 @@ pub struct CacheEntry // // `K` is the key in `entries`. #[derive(Debug)] pub struct ByteCache { - /// How the cache should operate + /// How the cache should operate. + /// This is `Arc`d so Partial entries can access it. // Config is big, box it. - cfg: Box, + cfg: Arc, /// Frozen entries. entries: RwLock>, @@ -108,15 +107,33 @@ pub struct ByteCache // FUCK This shit, don't store it here, do it somewhere fucking else FUCK. // working: tokio::sync::RwLock>>>>, } + + +impl ByteCache +{ + /// Create a new empty entry for this cache + pub async fn create_partial(&self, key: K) -> Result, error::PartialInitError> + { + let mut this = PartialCacheEntry::new_uninit(self, key); + this.init().await.map_err(error::PartialInitError)?; + Ok(this) + } +} + +mod partial; +pub use partial::*; + +pub mod error; + /* XXX: Move this to submodule, fuck the Cell BULLSHIT FUCK #[derive(Debug)] struct PartialCacheEntryInner { - key: K, +key: K, - disk: tokio::fs::File, - memory: ::bytes::BytesMut, +disk: tokio::fs::File, +memory: ::bytes::BytesMut, } diff --git a/src/service/cache/partial.rs b/src/service/cache/partial.rs new file mode 100644 index 0000000..89b2e58 --- /dev/null +++ b/src/service/cache/partial.rs @@ -0,0 +1,89 @@ +//! Inserting into the persistant cache. +use super::*; +use ::bytes::BytesMut; +use std::path::Path; +use tokio::fs::{self, File}; +use std::io; +use tokio::io::AsyncWrite; + +use mem::MemoryMut; + + +/// The write rule used for cache insertion operations. +/// This ensures the number of bytes returned corresponds to the number written into memory. +/// +/// This is because we care more about the integrity of memcached data, because we can dump that to disk later if the integrity of the disk copy is incorrect. +#[derive(Debug)] +enum CacheWriteRule{} + +impl plex::WriteRule for CacheWriteRule +{ + #[inline(always)] fn compare_byte_sizes(a: usize, b: usize) -> Result { + Ok(std::cmp::max(a,b)) + } +} + +/// A partially formed cache entry that can be mutated. +/// It has not yet been inserted into a persistant `ByteCache` cache, and is write-only. +pub struct PartialCacheEntry +{ + cfg: Arc, + + id: Uuid, + key: K, + + file: Option, + memory: MemoryMut, +} + +/// The writer type for writing to a `PartialCacheEntry`. +pub type PartialCacheEntrySink<'a> = Box; + +impl PartialCacheEntry +{ + #[inline(always)] pub(super) fn new_uninit(owner: &ByteCache, key: K) -> Self + { + Self { + cfg: Arc::clone(&owner.cfg), + id: Uuid::new_v4(), + + file: None, + memory: MemoryMut::new(), + + key, + } + } + #[inline(always)] pub(super) async fn init(&mut self) -> io::Result<()> + { + //self.memory.reserve(PAGE_SIZE); + if let Some(root) = &self.cfg.disk_location { + self.file = Some(fs::OpenOptions::new() + .write(true) + .open(gen_temp_path(root, &self.id)).await?); + } + Ok(()) + } + + /// Create a writer for this entry + pub fn writer(&mut self) -> PartialCacheEntrySink<'_> + { + if let Some(file) = self.file.as_mut() + { + Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file)) + } else { + Box::new(&mut self.memory) + } + } +} + +/// Create a path for a **non-completed** entry +pub(super) fn gen_temp_path(root: impl AsRef, from: &Uuid) -> PathBuf +{ + root.as_ref().join(format!("{}.open", from)) +} + +/// Create a path for a **completed** entry +pub(super) fn gen_path(root: impl AsRef, from: &Uuid) -> PathBuf +{ + root.as_ref().join(format!("{}.entry", from)) +}