You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
9.2 KiB

use super::*;
4 years ago
use std::marker::Unpin;
use std::{io::{self,Read,},
fs};
use bytes::{
BytesMut,
BufMut,
};
use tokio::io::AsyncRead;
/// An open fd that has been memory mapped.
#[derive(Debug)]
pub struct OpenMMap
{
file: File,//note: this layout matters for destruction ordering.
map: Mmap,
}
4 years ago
impl AsRef<[u8]> for OpenMMap
{
#[inline(always)] fn as_ref(&self) -> &[u8]
{
&self.map[..]
}
}
impl OpenMMap
{
fn new_sync(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = fs::OpenOptions::new().read(true).open(file)?;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
async fn new(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
}
/// How aggressively should we cache a specific item.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
#[repr(u32)]
pub enum Level
{
/// No cacheing
///
/// # Usage
/// Best used for cold files, or files that are not accessed much, or when running low on memory and/or fds.
///
/// Corresponds to `DataCacheState::None`
None,
/// Open the file and cache the FD as std `File`
/// This can avoid the syscall overhead of needing to open the file next time it is accessed.
///
/// # Usage
/// Best used for frequently acessed files.
///
/// Corresponds to `DataCacheState::Open`
Low,
/// Open the file, cache the FD *and* map the file in memory.
/// This can provide efficient random-access of the file without the need to preload it.
///
/// # Usage
/// Best used for frequently read large files.
///
/// Corresponds to `DataCacheState::Mapped`
High,
/// Load the whole contents of the file into memory, without keeping it open.
/// This provides the most efficient acesss of the file, as well as not contributing to the process' fd limit, but at the cost of:
/// * Loading the whole file into a memory buffer, which may be a slow operation and/or take up massive memory space
/// * Allowing a potential desync if the file is changed on disk while the cache buffer is loaded.
///
/// ## Desync
/// While `mtfse` already assumes it has exclusive access to all files in its db root, generally a file being modified by an external program will cause an error to be returned within a `mtfse` operation eventually as file hashes are updated and/or files are encrypted/decrypted or even read (in the case of a file being deleted.)
///
/// When an item is cached at this level however, any of these kinds of changes made will not be visible. The store can then be put into an invalid state without realising it.
///
/// ## Tradeoffs
/// The caching operation itself is expensive, the memory requirements is expensive, and in some cases this can even *slow* reads compared to the other levels when used on large files, as we cannot take advantage of the kernel's internal file data caching for mapped random-acesss reads and we are bound to causing CPU cache misses.
///
/// # Usage
/// Best used for frequently read small files.
///
/// Corresponds to `DataCacheState::Memory`
Extreme,
}
/// Provides immutable caching of a file in a data entry.
#[derive(Debug)]
4 years ago
pub(super) enum DataCacheState
{
/// There is no file cache for this item.
None,
/// The file is open, we have an fd.
Open(File),
/// The file is open and memory mapped.
Mapped(OpenMMap),
4 years ago
/// The file is not open, but its whole contents have been loaded into memory.
Memory(Bytes), // load from file via `BytesMut` buffer, then call `.freeze()`.
}
impl Default for DataCacheState
{
#[inline]
fn default() -> Self
{
Self::None
}
}
impl DataCacheState
{
4 years ago
/// Read from the cache at `offset` into the provided buffer, and return the number of bytes read.
///
/// # Performance
///
/// When the cache has random access, this method completes without yielding. If not, it performs async seek & read operations to fill the buffer as much as possible from the offset.
///
/// # Returns
/// If `EOF` is encountered within the read, then it is terminated early and the number of bytes successfully read is returned (and will be less than the length of the buffer), otherwise, the full buffer was filled, and the full buffer's length will be returned.
///
/// ## Errors
///
/// If this cache is not active, it will return an `io::Error` with `io::ErrorKind::NotConnected`.
/// Any other error in I/O operations is propagated.
pub async fn read_at(&mut self, offset: usize, into: &mut [u8]) -> io::Result<usize> // this takes `&mut self` only to ensure it cannot be called on different threads at the same time, as any file operations need to be atomic.
{
if let Some(ar) = self.random_access()
{
return Ok(slice::copy_bytes(&ar[offset..], into));
}
if let Some(file) = self.file()
{
use tokio::{
fs, prelude::*,
};
let mut file = fs::File::from_std(file.try_clone()?); // this is what requires we take `&mut(ex) self`.
file.seek(io::SeekFrom::Start(u64::try_from(offset).expect("Arch size integer was out of bounds of u64 (this should never happen)"))).await?;
let mut read =0;
let mut cur;
while {cur =file.read(&mut into[read..]).await?; cur != 0 && read<into.len()} {
read+=cur;
}
return Ok(read);
}
Err(io_err!(NotConnected, "Operation not supported (no cache is available)"))
}
/// Attempt to get a reference to an fd
pub fn file(&self) -> Option<&File>
{
match self
{
Self::Mapped(map) => Some(&map.file),
Self::Open(file) => Some(file),
_ => None,
}
}
/// Attempt to get a random access buffer of this cache, if one exists.
pub fn random_access(&self) -> Option<& [u8]>
{
match self {
Self::Mapped(map) => Some(map.as_ref()),
Self::Memory(mem) => Some(&mem[..]),
_ => None,
}
}
/// Drop the whole cache (if there is one).
#[inline] pub fn clear(&mut self)
{
*self = Self::None;
}
4 years ago
/// Attempt to asynchronously create a cache state for file provided by `file` at this level.
pub async fn new(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await),
Level::High => Self::Mapped(OpenMMap::new(file).await?),
Level::Extreme => {
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_blocking(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(fs::OpenOptions::new().read(true).open(file)?),
Level::High => Self::Mapped(OpenMMap::new_sync(file)?),
Level::Extreme => {
let file = fs::OpenOptions::new().read(true).open(file)?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
}
const BUFSIZE: usize = 4096;
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Does not block the current task.
async fn read_whole_into_buffer<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: AsyncRead + Unpin,
W: BufMut + ?Sized,
{
use tokio::prelude::*;
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..]).await? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Blocks the current thread.
fn read_whole_into_buffer_sync<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: Read,
W: BufMut + ?Sized,
{
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..])? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}