//! Handles suspending and reloading posts
use super ::* ;
use std ::{
marker ::{
PhantomData ,
Unpin ,
Send ,
Sync ,
} ,
io ,
collections ::HashMap ,
ops ::Range ,
borrow ::{
Cow ,
Borrow ,
} ,
error ,
fmt ,
} ;
use tokio ::{
prelude ::* ,
io ::{
AsyncRead ,
AsyncWrite ,
} ,
} ;
/// Represents an opaque bytes stream of serialised data to insert into `SuspendStream`.
#[ derive(Debug, Clone, PartialEq, Eq) ]
pub struct Object
{
data : Vec < u8 > ,
data_instances : HashMap < Cow < ' static , str > , Range < usize > > ,
}
fn calc_len < T , I , U > ( from : I ) -> U
where I : IntoIterator < Item = T > ,
T : Borrow < Range < U > > ,
U : std ::cmp ::Ord + Default + Copy ,
{
let mut max = Default ::default ( ) ;
for i in from . into_iter ( )
{
let i = i . borrow ( ) . end ;
if i > max {
max = i
}
}
max
}
impl Object
{
/// Create a new empty `Object`.
#[ inline ] pub fn new ( ) -> Self
{
Self { data : Vec ::new ( ) , data_instances : HashMap ::new ( ) }
}
/// Are the internal mappings of this object valid?
pub fn validate ( & self ) -> bool
{
let len = self . data . len ( ) ;
for ( _ , range ) in self . data_instances . iter ( ) {
if range . end > len { //range.start + range.end > len {
return false ;
}
}
true
}
/// Try to get a value of type `T` from `name`.
pub fn get < ' a , T > ( & ' a mut self , name : impl Borrow < str > ) -> Option < T >
where T : Deserialize < ' a >
{
if let Some ( bytes ) = self . get_bytes ( name ) {
serde_cbor ::from_slice ( & bytes [ .. ] ) . expect ( "Failed to deserialize CBOR value" )
} else {
None
}
}
/// Try to get a value of type `T` from `name`.
pub fn try_get < ' a , T > ( & ' a mut self , name : impl Borrow < str > ) -> Option < T >
where T : Deserialize < ' a >
{
if let Some ( bytes ) = self . get_bytes ( name ) {
serde_cbor ::from_slice ( & bytes [ .. ] ) . ok ( )
} else {
None
}
}
/// Serialize and insert `value` into the stream with `name`.
pub fn insert < T > ( & mut self , name : impl Into < Cow < ' static , str > > , value : T )
where T : Serialize
{
let len = self . data . len ( ) ;
match serde_cbor ::to_writer ( & mut self . data , & value ) {
Ok ( ( ) ) = > {
let nlen = self . data . len ( ) ;
self . data_instances . insert ( name . into ( ) , len .. nlen ) . unwrap_none ( ) ;
} ,
Err ( err ) = > {
self . data . resize ( len , 0 ) ; //Roll back result
panic! ( "Failed inserting CBOR object: {}" , err ) //TODO: Return Err instead of panic
} ,
}
}
/// Insert bytes directly with this name
pub fn insert_bytes ( & mut self , name : impl Into < Cow < ' static , str > > , bytes : impl AsRef < [ u8 ] > )
{
let bytes = bytes . as_ref ( ) ;
let start = self . data . len ( ) ;
self . data . extend_from_slice ( bytes ) ;
let end = self . data . len ( ) ;
self . data_instances . insert ( name . into ( ) , start .. end ) . unwrap_none ( ) ;
}
/// Insert a value's bytes directly with this name
pub unsafe fn insert_value_raw < T , U > ( & mut self , name : U , value : & T )
where T : ? Sized ,
U : Into < Cow < ' static , str > >
{
self . insert_bytes ( name , bytes ::refer ( value ) )
}
/// Try to get the bytes specified by name
pub fn get_bytes ( & self , name : impl Borrow < str > ) -> Option < & [ u8 ] >
{
if let Some ( range ) = self . data_instances . get ( name . borrow ( ) ) {
Some ( & self . data [ range . clone ( ) ] )
} else {
None
}
}
/// Try to get the value spcified by name
///
/// # Panics
/// If `T` cannot fit into the size of the range
pub unsafe fn get_value_raw < T > ( & self , name : impl Borrow < str > ) -> Option < & T >
{
self . get_bytes ( name ) . map ( | x | bytes ::derefer ( x ) )
}
/// Try to get the value specified by name. Will return `None` if `T` cannot fit in the returned bytes.
pub unsafe fn try_get_value_raw < T > ( & self , name : impl Borrow < str > ) -> Option < & T >
{
if let Some ( bytes ) = self . get_bytes ( name ) {
if bytes . len ( ) > = std ::mem ::size_of ::< T > ( ) {
Some ( bytes ::derefer ( bytes ) )
} else {
#[ cfg(debug_assertions) ] eprintln! ( "Warning! Likely data corruption as {} (size {}) cannot fit into {} bytes" , std ::any ::type_name ::< T > ( ) , std ::mem ::size_of ::< T > ( ) , bytes . len ( ) ) ;
None
}
} else {
None
}
}
/// Consume into the data
pub fn into_bytes ( self ) -> Box < [ u8 ] >
{
let mut output = Vec ::new ( ) ; //TOOO: Get cap
debug_assert! ( self . validate ( ) , "passing invalid object to serialise" ) ;
macro_rules! bin {
( $bytes :expr ) = > {
{
let bytes = $bytes ;
output . extend_from_slice ( bytes . as_ref ( ) ) ;
}
} ;
( usize $value :expr ) = > {
{
use std ::convert ::TryInto ;
use byteorder ::{
WriteBytesExt ,
LittleEndian ,
} ;
let val : u64 = $value . try_into ( ) . expect ( "Value could not fit into `u64`" ) ;
WriteBytesExt ::write_u64 ::< LittleEndian > ( & mut output , val ) . expect ( "Failed to append `u64` to output buffer" ) ;
}
} ;
}
bin ! ( usize self . data_instances . len ( ) ) ;
for ( name , & Range { start , end } ) in self . data_instances . iter ( ) {
let name = name . as_bytes ( ) ;
bin ! ( usize name . len ( ) ) ;
bin ! ( name ) ;
bin ! ( usize start ) ;
bin ! ( usize end ) ;
}
bin ! ( usize self . data . len ( ) ) ; //for additional checks
output . extend ( self . data ) ;
output . into_boxed_slice ( )
}
/// Read an object from a stream
pub async fn from_stream < T > ( input : & mut T ) -> io ::Result < Self >
where T : AsyncRead + Unpin + ? Sized
{
let mut ext_buf = Vec ::new ( ) ;
macro_rules! bin {
( usize ) = > {
{
use std ::convert ::TryFrom ;
let value : u64 = input . read_u64 ( ) . await ? ;
usize ::try_from ( value ) . map_err ( | _ | io ::Error ::new ( io ::ErrorKind ::InvalidData , "u64 cannot fit in usize" ) ) ?
}
} ;
( $size :expr ) = > {
{
let sz = $size ;
ext_buf . resize ( sz , 0 ) ;
assert_eq! ( input . read_exact ( & mut ext_buf [ .. sz ] ) . await ? , ext_buf . len ( ) ) ;
& ext_buf [ .. sz ]
}
}
}
let entries = bin ! ( usize ) ;
let mut instances = HashMap ::with_capacity ( entries ) ;
for _i in 0 .. entries
{
let name_len = bin ! ( usize ) ;
let name_bytes = bin ! ( name_len ) ;
let name_str = std ::str ::from_utf8 ( name_bytes ) . map_err ( | _ | io ::Error ::new ( io ::ErrorKind ::InvalidData , "item name was corrupted" ) ) ? ;
let start = bin ! ( usize ) ;
let end = bin ! ( usize ) ;
instances . insert ( Cow ::Owned ( name_str . to_owned ( ) ) , start .. end ) ;
}
let expected_len = bin ! ( usize ) ;
if expected_len ! = calc_len ( instances . iter ( ) . map ( | ( _ , v ) | v ) )
{
return Err ( io ::Error ::new ( io ::ErrorKind ::InvalidData , "expected and read sizes differ" ) ) ;
}
let mut data = vec! [ 0 ; expected_len ] ;
assert_eq! ( input . read_exact ( & mut data [ .. ] ) . await ? , expected_len ) ;
Ok ( Self {
data ,
data_instances : instances ,
} )
}
/// Write this instance into an async stream
pub async fn into_stream < T > ( & self , output : & mut T ) -> io ::Result < usize >
where T : AsyncWrite + Unpin + ? Sized
{
//eprintln!("{}: {:?}", self.data.len(), self);
debug_assert! ( self . validate ( ) , "passing invalid object to serialise" ) ;
let mut written = 0 usize ;
macro_rules! bin {
( $bytes :expr ) = > {
{
let bytes = $bytes ;
output . write_all ( bytes ) . await ? ;
written + = bytes . len ( ) ;
}
} ;
( usize $value :expr ) = > {
{
use std ::convert ::TryInto ;
let val : u64 = $value . try_into ( ) . map_err ( | _ | io ::Error ::new ( io ::ErrorKind ::InvalidData , "size cannot fit in u64" ) ) ? ;
output . write_u64 ( val ) . await ? ;
written + = std ::mem ::size_of ::< u64 > ( ) ;
}
} ;
}
unsafe {
bin ! ( usize self . data_instances . len ( ) ) ;
for ( name , & Range { start , end } ) in self . data_instances . iter ( ) {
let name = name . as_bytes ( ) ;
bin ! ( usize name . len ( ) ) ;
bin ! ( name ) ;
bin ! ( usize start ) ;
bin ! ( usize end ) ;
}
}
bin ! ( usize self . data . len ( ) ) ; //for additional checks
bin ! ( & self . data ) ;
Ok ( written )
}
}
/// A suspend stream represents a stream of objects of _the same type_. Can be any number of them, but they all must be for the same type.
#[ async_trait ]
pub trait SuspendStream
{
/// Write an object into the opaque stream.
async fn set_object ( & mut self , obj : Object ) -> Result < ( ) , Error > ;
/// Read an object from the opaque stream.
async fn get_object ( & mut self ) -> Result < Option < Object > , Error > ;
}
/// An error that occoured in a suspend operation
#[ derive(Debug) ]
#[ non_exhaustive ]
pub enum Error {
BadObject ,
MissingObject ( Cow < ' static , str > ) ,
Corruption ,
IO ( io ::Error ) ,
Unknown ,
}
impl error ::Error for Error
{
fn source ( & self ) -> Option < & ( dyn error ::Error + ' static ) > {
Some ( match & self {
Self ::IO ( io ) = > io ,
_ = > return None ,
} )
}
}
impl fmt ::Display for Error
{
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result
{
match self {
Self ::BadObject = > write! ( f , "unexpected object in stream" ) ,
Self ::MissingObject ( string ) = > write! ( f , "missing object from stream: {}" , string ) ,
Self ::Corruption = > write! ( f , "data stream corruption" ) ,
Self ::IO ( _ ) = > write! ( f , "i/o error" ) ,
_ = > write! ( f , "unknown error" ) ,
}
}
}
impl From < io ::Error > for Error
{
#[ inline ] fn from ( from : io ::Error ) -> Self
{
Self ::IO ( from )
}
}
/// A suspendable type, that can save and reload its data atomically
#[ async_trait ]
pub trait Suspendable : Sized
{
async fn suspend < S : SuspendStream + Send + Sync + ? Sized > ( self , into : & mut S ) -> Result < ( ) , Error > ;
async fn load < S : SuspendStream + Send + Sync + ? Sized > ( from : & mut S ) -> Result < Self , Error > ;
}
/// An in-memory `SuspendStream`.
#[ derive(Debug, Clone) ]
pub struct MemorySuspendStream ( Vec < u8 > ) ;
impl MemorySuspendStream
{
/// Create a new empty instance
pub fn new ( ) -> Self
{
Self ( Vec ::new ( ) )
}
/// Create from a vector of bytes
pub fn from_bytes ( from : impl Into < Vec < u8 > > ) -> Self
{
Self ( from . into ( ) )
}
/// Create from a slice of bytes
pub fn from_slice ( from : impl AsRef < [ u8 ] > ) -> Self
{
Self ( Vec ::from ( from . as_ref ( ) ) )
}
/// Return the internal bytes
pub fn into_bytes ( self ) -> Vec < u8 >
{
self . 0
}
/// The internal buffer
pub fn buffer ( & self ) -> & Vec < u8 >
{
& self . 0
}
/// The internal buffer
pub fn buffer_mut ( & mut self ) -> & mut Vec < u8 >
{
& mut self . 0
}
}
impl AsRef < [ u8 ] > for MemorySuspendStream
{
fn as_ref ( & self ) -> & [ u8 ]
{
& self . 0 [ .. ]
}
}
impl AsMut < [ u8 ] > for MemorySuspendStream
{
fn as_mut ( & mut self ) -> & mut [ u8 ]
{
& mut self . 0 [ .. ]
}
}
impl From < Vec < u8 > > for MemorySuspendStream
{
#[ inline ] fn from ( from : Vec < u8 > ) -> Self
{
Self ( from . into ( ) )
}
}
impl From < Box < [ u8 ] > > for MemorySuspendStream
{
fn from ( from : Box < [ u8 ] > ) -> Self
{
Self ::from_bytes ( from )
}
}
impl From < MemorySuspendStream > for Box < [ u8 ] >
{
fn from ( from : MemorySuspendStream ) -> Self
{
from . 0. into ( )
}
}
impl From < MemorySuspendStream > for Vec < u8 >
{
#[ inline ] fn from ( from : MemorySuspendStream ) -> Self
{
from . 0
}
}
#[ async_trait ]
impl SuspendStream for MemorySuspendStream
{
async fn get_object ( & mut self ) -> Result < Option < Object > , Error > {
if self . 0. len ( ) = = 0 {
return Ok ( None ) ;
}
let mut ptr = & self . 0 [ .. ] ;
let vl = Object ::from_stream ( & mut ptr ) . await ? ;
let diff = ( ptr . as_ptr ( ) as usize ) - ( ( & self . 0 [ .. ] ) . as_ptr ( ) as usize ) ;
self . 0. drain ( 0 .. diff ) ;
Ok ( Some ( vl ) )
}
async fn set_object ( & mut self , obj : Object ) -> Result < ( ) , Error > {
obj . into_stream ( & mut self . 0 ) . await ? ;
Ok ( ( ) )
}
}
/// Suspend a single object to memory
pub async fn oneshot < T : Suspendable > ( value : T ) -> Result < Vec < u8 > , Error >
{
let mut output = MemorySuspendStream ::new ( ) ;
value . suspend ( & mut output ) . await ? ;
Ok ( output . into_bytes ( ) )
}
/// Load a single value from memory
pub async fn single < T : Suspendable > ( from : impl AsRef < [ u8 ] > ) -> Result < T , Error >
{
struct BorrowedStream < ' a > ( & ' a [ u8 ] ) ;
#[ async_trait ]
impl < ' a > SuspendStream for BorrowedStream < ' a >
{
async fn get_object ( & mut self ) -> Result < Option < Object > , Error > {
if self . 0. len ( ) = = 0 {
return Ok ( None ) ;
}
let mut ptr = & self . 0 [ .. ] ;
let vl = Object ::from_stream ( & mut ptr ) . await ? ;
let diff = ( ptr . as_ptr ( ) as usize ) - ( ( & self . 0 [ .. ] ) . as_ptr ( ) as usize ) ;
self . 0 = & self . 0 [ diff .. ] ;
Ok ( Some ( vl ) )
}
async fn set_object ( & mut self , _ : Object ) -> Result < ( ) , Error > {
panic! ( "Cannot write to borrowed stream" )
}
}
let bytes = from . as_ref ( ) ;
let mut stream = BorrowedStream ( bytes ) ;
T ::load ( & mut stream ) . await
}