cache: partial entries

new
Avril 3 years ago
parent 0cf34a8ec6
commit 53fd8b89bc
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -0,0 +1,25 @@
//! Caching errors
use super::*;
use std::io;
use std::{
fmt,
error,
};
/// A partial cache entry initialisation error
#[derive(Debug)]
pub struct PartialInitError(pub(super) io::Error);
impl error::Error for PartialInitError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
impl fmt::Display for PartialInitError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Failed to initialise a partial cache entry")
}
}

@ -0,0 +1,59 @@
//! Memory holding types
use super::*;
use ::bytes::{
Bytes,
BytesMut,
BufMut,
Buf,
};
use std::{
pin::Pin,
task::{Context, Poll},
io,
};
use tokio::io::AsyncWrite;
// TODO: For when `MemoryMut` is vectorised, this will be the max size for each allocation
pub const PAGE_SIZE: usize = 4096;
/// The memory hold of a cahce entry.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Memory(pub(super) Bytes);
/// Unfrozen memory hold of a cache entry.
//TODO: Allow for non-resizing writes by making this `LinkedList<BytesMut>` or something, then we can implement `BufMut` for this
#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub struct MemoryMut(pub(super) BytesMut); //TODO: Type will be `SmallVec<[BytesMut; 1]>` or `LinkedList<BytesMut>` or maybe a `smolset` type (probably linkedlist is best actually...).
impl MemoryMut
{
/// Freeze this mutable memory into an immutable one
pub fn freeze(self) -> Memory
{
Memory(self.0.freeze())
}
/// Create a new, empty mutable memory pool
pub fn new() -> Self
{
Self(BytesMut::new())
}
}
impl AsyncWrite for MemoryMut
{
#[inline(always)] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
//TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut`
Poll::Ready(Ok(()))
}
#[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
//TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut`
Poll::Ready(Ok(()))
}
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
//TODO: When vectorised, this will either: fill the previous allocation with enough out of `buf`; then create a new allocation and write the rest there; repeat.
// This is kinda paging
self.get_mut().0.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
}

@ -16,7 +16,6 @@ use std::path::PathBuf;
use std::num::NonZeroUsize;
use std::collections::HashMap;
use chrono::DateTime;
use ::bytes::Bytes;
use uuid::Uuid;
use crossbeam_utils::atomic::AtomicCell;
@ -47,6 +46,9 @@ Send +
'static
{}
mod mem;
use mem::Memory;
basic_enum!(pub PurgeOrder; "How should a cache determine what to purge": Oldest => "Purge the oldest entries first", LeastUsed => "Purge the least accessed entries first", OldestUsed => "Purge the oldest accessed entries first");
default!(PurgeOrder: Self::LeastUsed);
@ -72,10 +74,6 @@ pub struct Config
pub mem_purge_order: PurgeOrder,
}
/// The memory hold of a cahce.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
struct Memory(Bytes);
/// An entry in a `ByteCache`.
#[derive(Debug)]
pub struct CacheEntry //<K: Key> // `K` is the key in `entries`.
@ -95,9 +93,10 @@ pub struct CacheEntry //<K: Key> // `K` is the key in `entries`.
#[derive(Debug)]
pub struct ByteCache<K: Key>
{
/// How the cache should operate
/// How the cache should operate.
/// This is `Arc`d so Partial entries can access it.
// Config is big, box it.
cfg: Box<Config>,
cfg: Arc<Config>,
/// Frozen entries.
entries: RwLock<HashMap<K, CacheEntry>>,
@ -108,15 +107,33 @@ pub struct ByteCache<K: Key>
// FUCK This shit, don't store it here, do it somewhere fucking else FUCK.
// working: tokio::sync::RwLock<Vec<Weak<AtomicCell<PartialCacheEntryInner<K>>>>>,
}
impl<K: Key> ByteCache<K>
{
/// Create a new empty entry for this cache
pub async fn create_partial(&self, key: K) -> Result<PartialCacheEntry<K>, error::PartialInitError>
{
let mut this = PartialCacheEntry::new_uninit(self, key);
this.init().await.map_err(error::PartialInitError)?;
Ok(this)
}
}
mod partial;
pub use partial::*;
pub mod error;
/*
XXX: Move this to submodule, fuck the Cell BULLSHIT FUCK
#[derive(Debug)]
struct PartialCacheEntryInner<K: Key>
{
key: K,
key: K,
disk: tokio::fs::File,
memory: ::bytes::BytesMut,
disk: tokio::fs::File,
memory: ::bytes::BytesMut,
}

@ -0,0 +1,89 @@
//! Inserting into the persistant cache.
use super::*;
use ::bytes::BytesMut;
use std::path::Path;
use tokio::fs::{self, File};
use std::io;
use tokio::io::AsyncWrite;
use mem::MemoryMut;
/// The write rule used for cache insertion operations.
/// This ensures the number of bytes returned corresponds to the number written into memory.
///
/// This is because we care more about the integrity of memcached data, because we can dump that to disk later if the integrity of the disk copy is incorrect.
#[derive(Debug)]
enum CacheWriteRule{}
impl plex::WriteRule for CacheWriteRule
{
#[inline(always)] fn compare_byte_sizes(a: usize, b: usize) -> Result<usize, Self::CompareFailedError> {
Ok(std::cmp::max(a,b))
}
}
/// A partially formed cache entry that can be mutated.
/// It has not yet been inserted into a persistant `ByteCache` cache, and is write-only.
pub struct PartialCacheEntry<K>
{
cfg: Arc<Config>,
id: Uuid,
key: K,
file: Option<File>,
memory: MemoryMut,
}
/// The writer type for writing to a `PartialCacheEntry`.
pub type PartialCacheEntrySink<'a> = Box<dyn AsyncWrite + Send + Sync + Unpin + 'a>;
impl<K: Key> PartialCacheEntry<K>
{
#[inline(always)] pub(super) fn new_uninit(owner: &ByteCache<K>, key: K) -> Self
{
Self {
cfg: Arc::clone(&owner.cfg),
id: Uuid::new_v4(),
file: None,
memory: MemoryMut::new(),
key,
}
}
#[inline(always)] pub(super) async fn init(&mut self) -> io::Result<()>
{
//self.memory.reserve(PAGE_SIZE);
if let Some(root) = &self.cfg.disk_location {
self.file = Some(fs::OpenOptions::new()
.write(true)
.open(gen_temp_path(root, &self.id)).await?);
}
Ok(())
}
/// Create a writer for this entry
pub fn writer(&mut self) -> PartialCacheEntrySink<'_>
{
if let Some(file) = self.file.as_mut()
{
Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file))
} else {
Box::new(&mut self.memory)
}
}
}
/// Create a path for a **non-completed** entry
pub(super) fn gen_temp_path(root: impl AsRef<Path>, from: &Uuid) -> PathBuf
{
root.as_ref().join(format!("{}.open", from))
}
/// Create a path for a **completed** entry
pub(super) fn gen_path(root: impl AsRef<Path>, from: &Uuid) -> PathBuf
{
root.as_ref().join(format!("{}.entry", from))
}
Loading…
Cancel
Save