@ -1,12 +1,26 @@
use super ::* ;
use std ::marker ::Unpin ;
use std ::{ io ::{ self , Read , } ,
fs } ;
use std ::{
io ::{
self ,
Read ,
} ,
fs ,
fmt ,
} ;
use bytes ::{
BytesMut ,
BufMut ,
} ;
use tokio ::io ::AsyncRead ;
use tokio ::{
task ,
sync ::{
mpsc ,
oneshot ,
} ,
} ;
use futures ::prelude ::* ;
/// An open fd that has been memory mapped.
#[ derive(Debug) ]
@ -27,6 +41,23 @@ impl AsRef<[u8]> for OpenMMap
impl OpenMMap
{
async fn new_file ( file : tokio ::fs ::File ) -> io ::Result < Self >
{
let file = file . into_std ( ) . await ;
let map = unsafe { Mmap ::map ( & file ) ? } ;
Ok ( Self {
file ,
map
} )
}
fn new_file_sync ( file : File ) -> io ::Result < Self >
{
let map = unsafe { Mmap ::map ( & file ) ? } ;
Ok ( Self {
file ,
map
} )
}
fn new_sync ( file : impl AsRef < Path > ) -> io ::Result < Self >
{
let file = fs ::OpenOptions ::new ( ) . read ( true ) . open ( file ) ? ;
@ -98,7 +129,7 @@ pub enum Level
/// Provides immutable caching of a file in a data entry.
#[ derive(Debug) ]
pub ( super ) enum DataCacheState
pub enum DataCacheState
{
/// There is no file cache for this item.
None ,
@ -121,6 +152,104 @@ impl Default for DataCacheState
impl DataCacheState
{
/// Spawn a task to generate a cache for the file provided by `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background ( from : impl Into < PathBuf > , level : Level ) -> impl Future < Output = io ::Result < Self > > + ' static //tokio::task::JoinHandle<io::Result<Self>>
{
let from = from . into ( ) ;
async move {
if from . exists ( ) & & from . is_file ( ) {
tokio ::spawn ( Self ::new ( from , level ) ) . await . expect ( "Background loader panicked" )
} else {
Err ( io_err ! ( NotFound , "Path either not existant or not a file." ) )
}
}
}
/// Spawn a task to generate a cache for the already opened file `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background_from_file ( from : tokio ::fs ::File , level : Level ) -> impl Future < Output = io ::Result < Self > > + ' static // tokio::task::JoinHandle<io::Result<Self>>
{
async move {
let file = from . into_std ( ) . await ;
tokio ::spawn ( Self ::new_fd ( file , level ) ) . await . expect ( "Background loader panicked" )
}
}
}
impl DataCacheState
{
/// Attempt to upgrade the cache.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub async fn into_upgrade ( self ) -> io ::Result < Self >
{
Ok ( match self {
Self ::Open ( file ) = > Self ::Mapped ( OpenMMap ::new_file ( file . into ( ) ) . await ? ) ,
Self ::Mapped ( map ) = > Self ::Memory ( {
use bytes ::Buf ;
tokio ::task ::spawn_blocking ( move | | {
map . as_ref ( ) . copy_to_bytes ( map . as_ref ( ) . len ( ) )
} ) . await . expect ( "Copying map into memory failed" )
} ) ,
x = > x ,
} )
}
/// Attempt to upgrade the cache, blocking the current thread.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub fn into_upgrade_sync ( self ) -> io ::Result < Self >
{
Ok ( match self {
Self ::Open ( file ) = > Self ::Mapped ( OpenMMap ::new_file_sync ( file ) ? ) ,
Self ::Mapped ( map ) = > Self ::Memory ( {
use bytes ::Buf ;
map . as_ref ( ) . copy_to_bytes ( map . as_ref ( ) . len ( ) )
} ) ,
x = > x ,
} )
}
/// Attempt to upgrade the cache in-place.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub async fn upgrade ( & mut self ) -> io ::Result < ( ) >
{
* self = match std ::mem ::replace ( self , Self ::None ) {
Self ::Open ( file ) = > Self ::Mapped ( OpenMMap ::new_file ( file . into ( ) ) . await ? ) ,
Self ::Mapped ( map ) = > Self ::Memory ( {
use bytes ::Buf ;
tokio ::task ::spawn_blocking ( move | | {
map . as_ref ( ) . copy_to_bytes ( map . as_ref ( ) . len ( ) )
} ) . await . expect ( "Copying map into memory failed" )
} ) ,
x = > panic! ( "Cannot upgrade from {:?}" , x ) ,
} ;
Ok ( ( ) )
}
/// Attempt to upgrade the cache in-place, blocking the current thread.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub fn upgrade_sync ( & mut self ) -> io ::Result < ( ) >
{
* self = match std ::mem ::replace ( self , Self ::None ) {
Self ::Open ( file ) = > Self ::Mapped ( OpenMMap ::new_file_sync ( file ) ? ) ,
Self ::Mapped ( map ) = > Self ::Memory ( {
use bytes ::Buf ;
map . as_ref ( ) . copy_to_bytes ( map . as_ref ( ) . len ( ) )
} ) ,
x = > panic! ( "Cannot upgrade from {:?}" , x ) ,
} ;
Ok ( ( ) )
}
/// Read from the cache at `offset` into the provided buffer, and return the number of bytes read.
///
/// # Performance
@ -180,11 +309,36 @@ impl DataCacheState
}
/// Drop the whole cache (if there is one).
#[ inline ] pub fn clear ( & mut self )
#[ inline (never) ] pub fn clear ( & mut self )
{
* self = Self ::None ;
}
/// Attempt to asynchronously create a cache state for file provided by already loaded `file` at this level.
pub async fn new_fd ( file : File , level : Level ) -> io ::Result < Self >
{
Ok ( match level {
Level ::None = > Self ::None ,
Level ::Low = > Self ::Open ( file ) ,
Level ::High = > Self ::Mapped ( OpenMMap ::new_file ( file . into ( ) ) . await ? ) ,
Level ::Extreme = > {
let file = tokio ::fs ::File ::from ( file ) ;
let ( mut bytes , expect ) = {
if let Some ( len ) = file . metadata ( ) . await . ok ( ) . map ( | m | usize ::try_from ( m . len ( ) ) . ok ( ) ) . flatten ( ) {
( BytesMut ::with_capacity ( len ) , Some ( len ) )
} else {
( BytesMut ::new ( ) , None )
}
} ;
match ( expect , read_whole_into_buffer ( file , & mut bytes ) . await ? ) {
( Some ( expect ) , len ) if len ! = expect = > return Err ( io_err ! ( UnexpectedEof , "Size mismatch" ) ) ,
_ = > Self ::Memory ( bytes . freeze ( ) ) ,
}
} ,
} )
}
/// Attempt to asynchronously create a cache state for file provided by `file` at this level.
pub async fn new ( file : impl AsRef < Path > , level : Level ) -> io ::Result < Self >
{
@ -214,7 +368,7 @@ impl DataCacheState
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_ blocking ( file : impl AsRef < Path > , level : Level ) -> io ::Result < Self >
pub fn new_ sync ( file : impl AsRef < Path > , level : Level ) -> io ::Result < Self >
{
Ok ( match level {
Level ::None = > Self ::None ,
@ -231,6 +385,32 @@ impl DataCacheState
}
} ;
match ( expect , read_whole_into_buffer_sync ( file , & mut bytes ) ? ) {
( Some ( expect ) , len ) if len ! = expect = > return Err ( io_err ! ( UnexpectedEof , "Size mismatch" ) ) ,
_ = > Self ::Memory ( bytes . freeze ( ) ) ,
}
} ,
} )
}
/// Attempt to synchronously create a cache state for file provided by already-loaded `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_fd_sync ( file : fs ::File , level : Level ) -> io ::Result < Self >
{
Ok ( match level {
Level ::None = > Self ::None ,
Level ::Low = > Self ::Open ( file ) ,
Level ::High = > Self ::Mapped ( OpenMMap ::new_file_sync ( file ) ? ) ,
Level ::Extreme = > {
let ( mut bytes , expect ) = {
if let Some ( len ) = file . metadata ( ) . ok ( ) . map ( | m | usize ::try_from ( m . len ( ) ) . ok ( ) ) . flatten ( ) {
( BytesMut ::with_capacity ( len ) , Some ( len ) )
} else {
( BytesMut ::new ( ) , None )
}
} ;
match ( expect , read_whole_into_buffer_sync ( file , & mut bytes ) ? ) {
( Some ( expect ) , len ) if len ! = expect = > return Err ( io_err ! ( UnexpectedEof , "Size mismatch" ) ) ,
_ = > Self ::Memory ( bytes . freeze ( ) ) ,
@ -277,3 +457,98 @@ where R: Read,
( & mut output ) . put ( & buf [ .. read ] ) ;
} )
}
/// A request to send to a background worker spawned by a `service` function, for each instance of this there will be a corresponding `CacheResponse` you can `await` to receive the value.
#[ derive(Debug) ]
pub struct CacheRequest ( tokio ::fs ::File , Level , oneshot ::Sender < io ::Result < DataCacheState > > ) ;
/// A handle to a pending `DataCacheState` being constructed from a `CacheRequest` sent to a background worker from a `service` function.
///
/// `await`ing this response will yield until the worker task has completed and can send the cache state back, upon which it yields a `io::Result<DataCacheState>`.
#[ derive(Debug) ]
pub struct CacheResponse ( oneshot ::Receiver < io ::Result < DataCacheState > > ) ;
//TODO: impl Future for CacheResponse ...
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
#[ inline ] pub fn service ( ) -> ( impl Future < Output = ( ) > + ' static , mpsc ::Sender < CacheRequest > )
{
use std ::{
task ::{ Poll , Context } ,
pin ::Pin ,
} ;
/// A future that never completes and never reschedules itself.
struct ShimNever ;
impl Future for ShimNever
{
type Output = ! ;
#[ inline(always) ] fn poll ( self : Pin < & mut Self > , _ctx : & mut Context ) -> Poll < Self ::Output >
{
Poll ::Pending
}
}
let ( rt , tx ) = service_with_shutdown ( ShimNever ) ;
( rt . map ( | _ | ( ) ) , tx )
}
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service can take a `Future` which, when completes, will drop the service task and stop replying to responses.
/// The service also shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
/// The output of the future is how the service was terminated, by the `cancel` future completing, or by all producers being dropped.
pub fn service_with_shutdown ( cancel : impl Future + ' static ) -> ( impl Future < Output = ServiceResult > + ' static , mpsc ::Sender < CacheRequest > )
{
let ( tx , mut rx ) = mpsc ::channel ( 32 ) ;
( async move {
let ren = async {
while let Some ( CacheRequest ( file , level , send_to ) ) = rx . recv ( ) . await {
tokio ::spawn ( async move {
let _ = send_to . send ( DataCacheState ::new_fd ( file . into_std ( ) . await , level ) . await ) ;
} ) ;
}
} ;
tokio ::select ! {
_ = ren = > {
// all senders dropped
ServiceResult ::NoProducers
}
_ = cancel = > {
// explicit cancel
rx . close ( ) ;
ServiceResult ::Cancelled
}
}
} , tx )
}
/// The reason a `service` task has terminated.
#[ derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd) ]
#[ non_exhaustive ]
pub enum ServiceResult
{
/// All mpsc senders had been dropped, there were no more possible requests to respond to.
NoProducers ,
/// The service was explicitly cancelled by a shutdown future.
Cancelled ,
}
impl fmt ::Display for ServiceResult
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
match self {
Self ::NoProducers = > write! ( f , "All mpsc senders had been dropped, there were no more possible requests to respond to." ) ,
Self ::Cancelled = > write! ( f , "The service was explicitly cancelled by a shutdown future." ) ,
}
}
}