You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
18 KiB

use super::*;
use std::marker::Unpin;
use std::{
io::{
self,
Read,
},
fs,
fmt,
};
use bytes::{
BytesMut,
BufMut,
};
use tokio::io::AsyncRead;
use tokio::{
task,
sync::{
mpsc,
oneshot,
},
};
use futures::prelude::*;
/// An open fd that has been memory mapped.
#[derive(Debug)]
pub struct OpenMMap
{
file: File,//note: this layout matters for destruction ordering.
map: Mmap,
}
impl AsRef<[u8]> for OpenMMap
{
#[inline(always)] fn as_ref(&self) -> &[u8]
{
&self.map[..]
}
}
impl OpenMMap
{
async fn new_file(file: tokio::fs::File) -> io::Result<Self>
{
let file = file.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_file_sync(file: File) -> io::Result<Self>
{
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_sync(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = fs::OpenOptions::new().read(true).open(file)?;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
async fn new(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
}
/// How aggressively should we cache a specific item.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
#[repr(u32)]
pub enum Level
{
/// No cacheing
///
/// # Usage
/// Best used for cold files, or files that are not accessed much, or when running low on memory and/or fds.
///
/// Corresponds to `DataCacheState::None`
None,
/// Open the file and cache the FD as std `File`
/// This can avoid the syscall overhead of needing to open the file next time it is accessed.
///
/// # Usage
/// Best used for frequently acessed files.
///
/// Corresponds to `DataCacheState::Open`
Low,
/// Open the file, cache the FD *and* map the file in memory.
/// This can provide efficient random-access of the file without the need to preload it.
///
/// # Usage
/// Best used for frequently read large files.
///
/// Corresponds to `DataCacheState::Mapped`
High,
/// Load the whole contents of the file into memory, without keeping it open.
/// This provides the most efficient acesss of the file, as well as not contributing to the process' fd limit, but at the cost of:
/// * Loading the whole file into a memory buffer, which may be a slow operation and/or take up massive memory space
/// * Allowing a potential desync if the file is changed on disk while the cache buffer is loaded.
///
/// ## Desync
/// While `mtfse` already assumes it has exclusive access to all files in its db root, generally a file being modified by an external program will cause an error to be returned within a `mtfse` operation eventually as file hashes are updated and/or files are encrypted/decrypted or even read (in the case of a file being deleted.)
///
/// When an item is cached at this level however, any of these kinds of changes made will not be visible. The store can then be put into an invalid state without realising it.
///
/// ## Tradeoffs
/// The caching operation itself is expensive, the memory requirements is expensive, and in some cases this can even *slow* reads compared to the other levels when used on large files, as we cannot take advantage of the kernel's internal file data caching for mapped random-acesss reads and we are bound to causing CPU cache misses.
///
/// # Usage
/// Best used for frequently read small files.
///
/// Corresponds to `DataCacheState::Memory`
Extreme,
}
/// Provides immutable caching of a file in a data entry.
#[derive(Debug)]
pub enum DataCacheState
{
/// There is no file cache for this item.
None,
/// The file is open, we have an fd.
Open(File),
/// The file is open and memory mapped.
Mapped(OpenMMap),
/// The file is not open, but its whole contents have been loaded into memory.
Memory(Bytes), // load from file via `BytesMut` buffer, then call `.freeze()`.
}
impl Default for DataCacheState
{
#[inline]
fn default() -> Self
{
Self::None
}
}
impl DataCacheState
{
/// Spawn a task to generate a cache for the file provided by `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background(from: impl Into<PathBuf>, level: Level) -> impl Future<Output = io::Result<Self>> + 'static//tokio::task::JoinHandle<io::Result<Self>>
{
let from = from.into();
async move {
if from.exists() && from.is_file() {
tokio::spawn(Self::new(from, level)).await.expect("Background loader panicked")
} else {
Err(io_err!(NotFound, "Path either not existant or not a file."))
}
}
}
/// Spawn a task to generate a cache for the already opened file `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background_from_file(from: tokio::fs::File, level: Level) -> impl Future<Output = io::Result<Self>> + 'static // tokio::task::JoinHandle<io::Result<Self>>
{
async move {
let file = from.into_std().await;
tokio::spawn(Self::new_fd(file, level)).await.expect("Background loader panicked")
}
}
}
impl DataCacheState
{
/// Attempt to upgrade the cache.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub async fn into_upgrade(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => x,
})
}
/// Attempt to upgrade the cache, blocking the current thread.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub fn into_upgrade_sync(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => x,
})
}
/// Attempt to upgrade the cache in-place.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub async fn upgrade(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Attempt to upgrade the cache in-place, blocking the current thread.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub fn upgrade_sync(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Read from the cache at `offset` into the provided buffer, and return the number of bytes read.
///
/// # Performance
///
/// When the cache has random access, this method completes without yielding. If not, it performs async seek & read operations to fill the buffer as much as possible from the offset.
///
/// # Returns
/// If `EOF` is encountered within the read, then it is terminated early and the number of bytes successfully read is returned (and will be less than the length of the buffer), otherwise, the full buffer was filled, and the full buffer's length will be returned.
///
/// ## Errors
///
/// If this cache is not active, it will return an `io::Error` with `io::ErrorKind::NotConnected`.
/// Any other error in I/O operations is propagated.
pub async fn read_at(&mut self, offset: usize, into: &mut [u8]) -> io::Result<usize> // this takes `&mut self` only to ensure it cannot be called on different threads at the same time, as any file operations need to be atomic.
{
if let Some(ar) = self.random_access()
{
return Ok(slice::copy_bytes(&ar[offset..], into));
}
if let Some(file) = self.file()
{
use tokio::{
fs, prelude::*,
};
let mut file = fs::File::from_std(file.try_clone()?); // this is what requires we take `&mut(ex) self`.
file.seek(io::SeekFrom::Start(u64::try_from(offset).expect("Arch size integer was out of bounds of u64 (this should never happen)"))).await?;
let mut read =0;
let mut cur;
while {cur =file.read(&mut into[read..]).await?; cur != 0 && read<into.len()} {
read+=cur;
}
return Ok(read);
}
Err(io_err!(NotConnected, "Operation not supported (no cache is available)"))
}
/// Attempt to get a reference to an fd
pub fn file(&self) -> Option<&File>
{
match self
{
Self::Mapped(map) => Some(&map.file),
Self::Open(file) => Some(file),
_ => None,
}
}
/// Attempt to get a random access buffer of this cache, if one exists.
pub fn random_access(&self) -> Option<& [u8]>
{
match self {
Self::Mapped(map) => Some(map.as_ref()),
Self::Memory(mem) => Some(&mem[..]),
_ => None,
}
}
/// Drop the whole cache (if there is one).
#[inline(never)] pub fn clear(&mut self)
{
*self = Self::None;
}
/// Attempt to asynchronously create a cache state for file provided by already loaded `file` at this level.
pub async fn new_fd(file: File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Level::Extreme => {
let file = tokio::fs::File::from(file);
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to asynchronously create a cache state for file provided by `file` at this level.
pub async fn new(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await),
Level::High => Self::Mapped(OpenMMap::new(file).await?),
Level::Extreme => {
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_sync(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(fs::OpenOptions::new().read(true).open(file)?),
Level::High => Self::Mapped(OpenMMap::new_sync(file)?),
Level::Extreme => {
let file = fs::OpenOptions::new().read(true).open(file)?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by already-loaded `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_fd_sync(file: fs::File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file_sync(file)?),
Level::Extreme => {
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
}
const BUFSIZE: usize = 4096;
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Does not block the current task.
async fn read_whole_into_buffer<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: AsyncRead + Unpin,
W: BufMut + ?Sized,
{
use tokio::prelude::*;
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..]).await? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Blocks the current thread.
fn read_whole_into_buffer_sync<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: Read,
W: BufMut + ?Sized,
{
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..])? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// A request to send to a background worker spawned by a `service` function, for each instance of this there will be a corresponding `CacheResponse` you can `await` to receive the value.
#[derive(Debug)]
pub struct CacheRequest(tokio::fs::File, Level, oneshot::Sender<io::Result<DataCacheState>>);
/// A handle to a pending `DataCacheState` being constructed from a `CacheRequest` sent to a background worker from a `service` function.
///
/// `await`ing this response will yield until the worker task has completed and can send the cache state back, upon which it yields a `io::Result<DataCacheState>`.
#[derive(Debug)]
pub struct CacheResponse(oneshot::Receiver<io::Result<DataCacheState>>);
//TODO: impl Future for CacheResponse ...
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
#[inline] pub fn service() -> (impl Future<Output = ()> + 'static, mpsc::Sender<CacheRequest>)
{
use std::{
task::{Poll, Context},
pin::Pin,
};
/// A future that never completes and never reschedules itself.
struct ShimNever;
impl Future for ShimNever
{
type Output = !;
#[inline(always)] fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output>
{
Poll::Pending
}
}
let (rt, tx) = service_with_shutdown(ShimNever);
(rt.map(|_| ()), tx)
}
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service can take a `Future` which, when completes, will drop the service task and stop replying to responses.
/// The service also shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
/// The output of the future is how the service was terminated, by the `cancel` future completing, or by all producers being dropped.
pub fn service_with_shutdown(cancel: impl Future + 'static) -> (impl Future<Output = ServiceResult> + 'static, mpsc::Sender<CacheRequest>)
{
let (tx, mut rx) = mpsc::channel(32);
(async move {
let ren = async {
while let Some(CacheRequest(file, level, send_to)) = rx.recv().await {
tokio::spawn(async move {
let _ = send_to.send(DataCacheState::new_fd(file.into_std().await, level).await);
});
}
};
tokio::select! {
_ = ren => {
// all senders dropped
ServiceResult::NoProducers
}
_ = cancel => {
// explicit cancel
rx.close();
ServiceResult::Cancelled
}
}
}, tx)
}
/// The reason a `service` task has terminated.
#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[non_exhaustive]
pub enum ServiceResult
{
/// All mpsc senders had been dropped, there were no more possible requests to respond to.
NoProducers,
/// The service was explicitly cancelled by a shutdown future.
Cancelled,
}
impl fmt::Display for ServiceResult
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::NoProducers => write!(f, "All mpsc senders had been dropped, there were no more possible requests to respond to."),
Self::Cancelled => write!(f, "The service was explicitly cancelled by a shutdown future."),
}
}
}