From 5bc52f8d1c7034ce5b5d4e4144b482149badf391 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 19 Dec 2020 01:58:58 +0000 Subject: [PATCH] more minor cache stuff --- Cargo.lock | 1 + Cargo.toml | 1 + src/data/cache.rs | 285 +++++++++++++++++++++++++++++++++++++++++++++- src/data/mod.rs | 4 +- src/main.rs | 1 + 5 files changed, 285 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea96bc6..99d11a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,6 +457,7 @@ version = "0.1.0" dependencies = [ "bytes 0.6.0", "cryptohelpers", + "futures", "generational-arena", "jemallocator", "memmap", diff --git a/Cargo.toml b/Cargo.toml index af509c7..75a5184 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" [dependencies] bytes = "0.6.0" cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]} +futures = "0.3.8" generational-arena = "0.2.8" jemallocator = "0.3.2" memmap = "0.7.0" diff --git a/src/data/cache.rs b/src/data/cache.rs index d3fb02b..df9756b 100644 --- a/src/data/cache.rs +++ b/src/data/cache.rs @@ -1,12 +1,26 @@ use super::*; use std::marker::Unpin; -use std::{io::{self,Read,}, - fs}; +use std::{ + io::{ + self, + Read, + }, + fs, + fmt, +}; use bytes::{ BytesMut, BufMut, }; use tokio::io::AsyncRead; +use tokio::{ + task, + sync::{ + mpsc, + oneshot, + }, +}; +use futures::prelude::*; /// An open fd that has been memory mapped. #[derive(Debug)] @@ -27,6 +41,23 @@ impl AsRef<[u8]> for OpenMMap impl OpenMMap { + async fn new_file(file: tokio::fs::File) -> io::Result + { + let file = file.into_std().await; + let map = unsafe { Mmap::map(&file)? }; + Ok(Self { + file, + map + }) + } + fn new_file_sync(file: File) -> io::Result + { + let map = unsafe { Mmap::map(&file)? }; + Ok(Self { + file, + map + }) + } fn new_sync(file: impl AsRef) -> io::Result { let file = fs::OpenOptions::new().read(true).open(file)?; @@ -98,7 +129,7 @@ pub enum Level /// Provides immutable caching of a file in a data entry. #[derive(Debug)] -pub(super) enum DataCacheState +pub enum DataCacheState { /// There is no file cache for this item. None, @@ -121,6 +152,104 @@ impl Default for DataCacheState impl DataCacheState { + + /// Spawn a task to generate a cache for the file provided by `from`. + /// + /// # Notes + /// This is redundant for all cache levels except `Extreme`. + pub fn background(from: impl Into, level: Level) -> impl Future> + 'static//tokio::task::JoinHandle> + { + let from = from.into(); + async move { + if from.exists() && from.is_file() { + tokio::spawn(Self::new(from, level)).await.expect("Background loader panicked") + } else { + Err(io_err!(NotFound, "Path either not existant or not a file.")) + } + } + } + /// Spawn a task to generate a cache for the already opened file `from`. + /// + /// # Notes + /// This is redundant for all cache levels except `Extreme`. + pub fn background_from_file(from: tokio::fs::File, level: Level) -> impl Future> + 'static // tokio::task::JoinHandle> + { + async move { + let file = from.into_std().await; + tokio::spawn(Self::new_fd(file, level)).await.expect("Background loader panicked") + } + } + +} + +impl DataCacheState +{ + /// Attempt to upgrade the cache. + /// + /// # Returns + /// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`). + pub async fn into_upgrade(self) -> io::Result + { + Ok(match self { + Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?), + Self::Mapped(map) => Self::Memory({ + use bytes::Buf; + tokio::task::spawn_blocking(move || { + map.as_ref().copy_to_bytes(map.as_ref().len()) + }).await.expect("Copying map into memory failed") + }), + x => x, + }) + } + /// Attempt to upgrade the cache, blocking the current thread. + /// + /// # Returns + /// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`). + pub fn into_upgrade_sync(self) -> io::Result + { + Ok(match self { + Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?), + Self::Mapped(map) => Self::Memory({ + use bytes::Buf; + map.as_ref().copy_to_bytes(map.as_ref().len()) + }), + x => x, + }) + } + /// Attempt to upgrade the cache in-place. + /// + /// # Panics + /// If `self` is `None` or `Memory`. + pub async fn upgrade(&mut self) -> io::Result<()> + { + *self = match std::mem::replace(self, Self::None) { + Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?), + Self::Mapped(map) => Self::Memory({ + use bytes::Buf; + tokio::task::spawn_blocking(move || { + map.as_ref().copy_to_bytes(map.as_ref().len()) + }).await.expect("Copying map into memory failed") + }), + x => panic!("Cannot upgrade from {:?}", x), + }; + Ok(()) + } + /// Attempt to upgrade the cache in-place, blocking the current thread. + /// + /// # Panics + /// If `self` is `None` or `Memory`. + pub fn upgrade_sync(&mut self) -> io::Result<()> + { + *self = match std::mem::replace(self, Self::None) { + Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?), + Self::Mapped(map) => Self::Memory({ + use bytes::Buf; + map.as_ref().copy_to_bytes(map.as_ref().len()) + }), + x => panic!("Cannot upgrade from {:?}", x), + }; + Ok(()) + } /// Read from the cache at `offset` into the provided buffer, and return the number of bytes read. /// /// # Performance @@ -180,11 +309,36 @@ impl DataCacheState } /// Drop the whole cache (if there is one). - #[inline] pub fn clear(&mut self) + #[inline(never)] pub fn clear(&mut self) { *self = Self::None; } + /// Attempt to asynchronously create a cache state for file provided by already loaded `file` at this level. + pub async fn new_fd(file: File, level: Level) -> io::Result + { + Ok(match level { + Level::None => Self::None, + Level::Low => Self::Open(file), + Level::High => Self::Mapped(OpenMMap::new_file(file.into()).await?), + Level::Extreme => { + let file = tokio::fs::File::from(file); + + let (mut bytes,expect) = { + if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() { + (BytesMut::with_capacity(len), Some(len)) + } else { + (BytesMut::new(), None) + } + }; + + match (expect, read_whole_into_buffer(file, &mut bytes).await?) { + (Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")), + _ => Self::Memory(bytes.freeze()), + } + }, + }) + } /// Attempt to asynchronously create a cache state for file provided by `file` at this level. pub async fn new(file: impl AsRef, level: Level) -> io::Result { @@ -214,7 +368,7 @@ impl DataCacheState /// /// # Note /// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it. - pub fn new_blocking(file: impl AsRef, level: Level) -> io::Result + pub fn new_sync(file: impl AsRef, level: Level) -> io::Result { Ok(match level { Level::None => Self::None, @@ -231,6 +385,32 @@ impl DataCacheState } }; + match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) { + (Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")), + _ => Self::Memory(bytes.freeze()), + } + }, + }) + } + /// Attempt to synchronously create a cache state for file provided by already-loaded `file` at this level. + /// + /// # Note + /// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it. + pub fn new_fd_sync(file: fs::File, level: Level) -> io::Result + { + Ok(match level { + Level::None => Self::None, + Level::Low => Self::Open(file), + Level::High => Self::Mapped(OpenMMap::new_file_sync(file)?), + Level::Extreme => { + let (mut bytes,expect) = { + if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() { + (BytesMut::with_capacity(len), Some(len)) + } else { + (BytesMut::new(), None) + } + }; + match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) { (Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")), _ => Self::Memory(bytes.freeze()), @@ -277,3 +457,98 @@ where R: Read, (&mut output).put(&buf[..read]); }) } + +/// A request to send to a background worker spawned by a `service` function, for each instance of this there will be a corresponding `CacheResponse` you can `await` to receive the value. +#[derive(Debug)] +pub struct CacheRequest(tokio::fs::File, Level, oneshot::Sender>); + +/// A handle to a pending `DataCacheState` being constructed from a `CacheRequest` sent to a background worker from a `service` function. +/// +/// `await`ing this response will yield until the worker task has completed and can send the cache state back, upon which it yields a `io::Result`. +#[derive(Debug)] +pub struct CacheResponse(oneshot::Receiver>); + +//TODO: impl Future for CacheResponse ... + +/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously. +/// +/// The service shuts down when all producers of requests are dropped. +/// +/// # Returns +/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service. +#[inline] pub fn service() -> (impl Future + 'static, mpsc::Sender) +{ + use std::{ + task::{Poll, Context}, + pin::Pin, + }; + /// A future that never completes and never reschedules itself. + struct ShimNever; + impl Future for ShimNever + { + type Output = !; + #[inline(always)] fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll + { + Poll::Pending + } + } + let (rt, tx) = service_with_shutdown(ShimNever); + (rt.map(|_| ()), tx) +} + +/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously. +/// +/// The service can take a `Future` which, when completes, will drop the service task and stop replying to responses. +/// The service also shuts down when all producers of requests are dropped. +/// +/// # Returns +/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service. +/// The output of the future is how the service was terminated, by the `cancel` future completing, or by all producers being dropped. +pub fn service_with_shutdown(cancel: impl Future + 'static) -> (impl Future + 'static, mpsc::Sender) +{ + let (tx, mut rx) = mpsc::channel(32); + (async move { + let ren = async { + while let Some(CacheRequest(file, level, send_to)) = rx.recv().await { + tokio::spawn(async move { + let _ = send_to.send(DataCacheState::new_fd(file.into_std().await, level).await); + }); + } + }; + + tokio::select! { + _ = ren => { + // all senders dropped + ServiceResult::NoProducers + } + _ = cancel => { + // explicit cancel + rx.close(); + ServiceResult::Cancelled + } + } + }, tx) +} + +/// The reason a `service` task has terminated. +#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] +#[non_exhaustive] +pub enum ServiceResult +{ + /// All mpsc senders had been dropped, there were no more possible requests to respond to. + NoProducers, + /// The service was explicitly cancelled by a shutdown future. + Cancelled, +} + +impl fmt::Display for ServiceResult +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::NoProducers => write!(f, "All mpsc senders had been dropped, there were no more possible requests to respond to."), + Self::Cancelled => write!(f, "The service was explicitly cancelled by a shutdown future."), + } + } +} + diff --git a/src/data/mod.rs b/src/data/mod.rs index ffd355d..ba33894 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -3,7 +3,7 @@ use generational_arena::{ Arena, Index as ArenaIndex, }; -use std::convert::{TryFrom, TryInfo}; +use std::convert::{TryFrom, TryInto}; use std::collections::BTreeMap; use std::collections::HashSet; @@ -80,7 +80,7 @@ impl Store assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `with_capacity` not existant or not a directory", metadata.root); Self { metadata, -+ + data: HashSet::with_capacity(cap), data_hashes: Arena::with_capacity(cap), diff --git a/src/main.rs b/src/main.rs index b58312d..981d466 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(never_type)] #![allow(dead_code)]