//! Handles suspending and reloading posts use super::*; use std::{ marker::{ PhantomData, Unpin, Send, Sync, }, io, collections::HashMap, ops::Range, borrow::{ Cow, Borrow, }, error, fmt, }; use tokio::{ prelude::*, io::{ AsyncRead, AsyncWrite, }, }; /// Represents an opaque bytes stream of serialised data to insert into `SuspendStream`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Object { data: Vec, data_instances: HashMap, Range>, } fn calc_len(from: I) -> U where I: IntoIterator, T: Borrow>, U: std::cmp::Ord + Default + Copy, { let mut max = Default::default(); for i in from.into_iter() { let i = i.borrow().end; if i > max { max = i } } max } impl Object { /// Create a new empty `Object`. #[inline] pub fn new() -> Self { Self{data: Vec::new(), data_instances: HashMap::new()} } /// Are the internal mappings of this object valid? pub fn validate(&self) -> bool { let len = self.data.len(); for (_, range) in self.data_instances.iter() { if range.end > len {//range.start + range.end > len { return false; } } true } /// Try to get a value of type `T` from `name`. pub fn get<'a, T>(&'a mut self, name: impl Borrow) -> Option where T: Deserialize<'a> { if let Some(bytes) = self.get_bytes(name) { serde_cbor::from_slice(&bytes[..]).expect("Failed to deserialize CBOR value") } else { None } } /// Try to get a value of type `T` from `name`. pub fn try_get<'a, T>(&'a mut self, name: impl Borrow) -> Option where T: Deserialize<'a> { if let Some(bytes) = self.get_bytes(name) { serde_cbor::from_slice(&bytes[..]).ok() } else { None } } /// Serialize and insert `value` into the stream with `name`. pub fn insert(&mut self, name: impl Into>, value: T) where T: Serialize { let len = self.data.len(); match serde_cbor::to_writer(&mut self.data, &value) { Ok(()) => { let nlen = self.data.len(); self.data_instances.insert(name.into(), len..nlen).unwrap_none(); }, Err(err) => { self.data.resize(len, 0); //Roll back result panic!("Failed inserting CBOR object: {}", err) //TODO: Return Err instead of panic }, } } /// Insert bytes directly with this name pub fn insert_bytes(&mut self, name: impl Into>, bytes: impl AsRef<[u8]>) { let bytes= bytes.as_ref(); let start = self.data.len(); self.data.extend_from_slice(bytes); let end = self.data.len(); self.data_instances.insert(name.into(), start..end).unwrap_none(); } /// Insert a value's bytes directly with this name pub unsafe fn insert_value_raw(&mut self, name: U, value: &T) where T: ?Sized, U: Into> { self.insert_bytes(name, bytes::refer(value)) } /// Try to get the bytes specified by name pub fn get_bytes(&self, name: impl Borrow) -> Option<&[u8]> { if let Some(range) = self.data_instances.get(name.borrow()) { Some(&self.data[range.clone()]) } else { None } } /// Try to get the value spcified by name /// /// # Panics /// If `T` cannot fit into the size of the range pub unsafe fn get_value_raw(&self, name: impl Borrow) -> Option<&T> { self.get_bytes(name).map(|x| bytes::derefer(x)) } /// Try to get the value specified by name. Will return `None` if `T` cannot fit in the returned bytes. pub unsafe fn try_get_value_raw(&self, name: impl Borrow) -> Option<&T> { if let Some(bytes) = self.get_bytes(name) { if bytes.len() >= std::mem::size_of::() { Some(bytes::derefer(bytes)) } else { #[cfg(debug_assertions)] eprintln!("Warning! Likely data corruption as {} (size {}) cannot fit into {} bytes",std::any::type_name::(), std::mem::size_of::(), bytes.len()); None } } else { None } } /// Consume into the data pub fn into_bytes(self) -> Box<[u8]> { let mut output = Vec::new(); //TOOO: Get cap debug_assert!(self.validate(), "passing invalid object to serialise"); macro_rules! bin { ($bytes:expr) => { { let bytes = $bytes; output.extend_from_slice(bytes.as_ref()); } }; (usize $value:expr) => { { use std::convert::TryInto; use byteorder::{ WriteBytesExt, LittleEndian, }; let val: u64 = $value.try_into().expect("Value could not fit into `u64`"); WriteBytesExt::write_u64::(&mut output,val).expect("Failed to append `u64` to output buffer"); } }; } bin!(usize self.data_instances.len()); for (name, &Range{start, end}) in self.data_instances.iter() { let name = name.as_bytes(); bin!(usize name.len()); bin!(name); bin!(usize start); bin!(usize end); } bin!(usize self.data.len()); //for additional checks output.extend(self.data); output.into_boxed_slice() } /// Read an object from a stream pub async fn from_stream(input: &mut T) -> io::Result where T: AsyncRead + Unpin + ?Sized { let mut ext_buf = Vec::new(); macro_rules! bin { (usize) => { { use std::convert::TryFrom; let value: u64 = input.read_u64().await?; usize::try_from(value).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "u64 cannot fit in usize"))? } }; ($size:expr) => { { let sz = $size; ext_buf.resize(sz, 0); assert_eq!(input.read_exact(&mut ext_buf[..sz]).await?, ext_buf.len()); &ext_buf[..sz] } } } let entries = bin!(usize); let mut instances = HashMap::with_capacity(entries); for _i in 0..entries { let name_len = bin!(usize); let name_bytes = bin!(name_len); let name_str = std::str::from_utf8(name_bytes).map_err(|_| io::Error::new(io::ErrorKind::InvalidData,"item name was corrupted"))?; let start = bin!(usize); let end = bin!(usize); instances.insert(Cow::Owned(name_str.to_owned()), start..end); } let expected_len = bin!(usize); if expected_len != calc_len(instances.iter().map(|(_, v)| v)) { return Err(io::Error::new(io::ErrorKind::InvalidData, "expected and read sizes differ")); } let mut data = vec![0; expected_len]; assert_eq!(input.read_exact(&mut data[..]).await?, expected_len); Ok(Self { data, data_instances: instances, }) } /// Write this instance into an async stream pub async fn into_stream(&self, output: &mut T) -> io::Result where T: AsyncWrite + Unpin + ?Sized { //eprintln!("{}: {:?}", self.data.len(), self); debug_assert!(self.validate(), "passing invalid object to serialise"); let mut written=0usize; macro_rules! bin { ($bytes:expr) => { { let bytes = $bytes; output.write_all(bytes).await?; written+=bytes.len(); } }; (usize $value:expr) => { { use std::convert::TryInto; let val: u64 = $value.try_into().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "size cannot fit in u64"))?; output.write_u64(val).await?; written += std::mem::size_of::(); } }; } unsafe { bin!(usize self.data_instances.len()); for (name, &Range{start, end}) in self.data_instances.iter() { let name = name.as_bytes(); bin!(usize name.len()); bin!(name); bin!(usize start); bin!(usize end); } } bin!(usize self.data.len()); //for additional checks bin!(&self.data); Ok(written) } } /// A suspend stream represents a stream of objects of _the same type_. Can be any number of them, but they all must be for the same type. #[async_trait] pub trait SuspendStream { /// Write an object into the opaque stream. async fn set_object(&mut self, obj: Object) -> Result<(), Error>; /// Read an object from the opaque stream. async fn get_object(&mut self) -> Result, Error>; } /// An error that occoured in a suspend operation #[derive(Debug)] #[non_exhaustive] pub enum Error { BadObject, MissingObject(Cow<'static, str>), Corruption, IO(io::Error), Other(eyre::Report), Unknown, } impl error::Error for Error { fn source(&self) -> Option<&(dyn error::Error + 'static)> { Some(match &self { Self::IO(io) => io, _ => return None, }) } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::BadObject => write!(f, "unexpected object in stream"), Self::MissingObject(string) => write!(f, "missing object from stream: {}", string), Self::Corruption => write!(f, "data stream corruption"), Self::IO(_) => write!(f, "i/o error"), Self::Other(report) => write!(f, "internal: {}", report), _ => write!(f, "unknown error"), } } } impl From for Error { #[inline] fn from(from: io::Error) -> Self { Self::IO(from) } } impl From for Error { fn from(from: eyre::Report) -> Self { Self::Other(from) } } /// A suspendable type, that can save and reload its data atomically #[async_trait] pub trait Suspendable: Sized { async fn suspend(self, into: &mut S) -> Result<(), Error>; async fn load(from: &mut S) -> Result; } /// An in-memory `SuspendStream`. #[derive(Debug, Clone)] pub struct MemorySuspendStream(Vec); impl MemorySuspendStream { /// Create a new empty instance pub fn new() -> Self { Self(Vec::new()) } /// Create from a vector of bytes pub fn from_bytes(from: impl Into>) -> Self { Self(from.into()) } /// Create from a slice of bytes pub fn from_slice(from: impl AsRef<[u8]>) -> Self { Self(Vec::from(from.as_ref())) } /// Return the internal bytes pub fn into_bytes(self) -> Vec { self.0 } /// The internal buffer pub fn buffer(&self) -> &Vec { &self.0 } /// The internal buffer pub fn buffer_mut(&mut self) -> &mut Vec { &mut self.0 } } impl AsRef<[u8]> for MemorySuspendStream { fn as_ref(&self) -> &[u8] { &self.0[..] } } impl AsMut<[u8]> for MemorySuspendStream { fn as_mut(&mut self) -> &mut [u8] { &mut self.0[..] } } impl From> for MemorySuspendStream { #[inline] fn from(from: Vec) -> Self { Self(from.into()) } } impl From> for MemorySuspendStream { fn from(from: Box<[u8]>) -> Self { Self::from_bytes(from) } } impl From for Box<[u8]> { fn from(from: MemorySuspendStream) -> Self { from.0.into() } } impl From for Vec { #[inline] fn from(from: MemorySuspendStream) -> Self { from.0 } } #[async_trait] impl SuspendStream for MemorySuspendStream { async fn get_object(&mut self) -> Result, Error> { if self.0.len() ==0 { return Ok(None); } let mut ptr = &self.0[..]; let vl = Object::from_stream(&mut ptr).await?; let diff = (ptr.as_ptr() as usize) - ((&self.0[..]).as_ptr() as usize); self.0.drain(0..diff); Ok(Some(vl)) } async fn set_object(&mut self, obj: Object) -> Result<(), Error> { obj.into_stream(&mut self.0).await?; Ok(()) } } /// Suspend a single object to memory pub async fn oneshot(value: T) -> Result, Error> { let mut output = MemorySuspendStream::new(); value.suspend(&mut output).await?; Ok(output.into_bytes()) } /// Load a single value from memory pub async fn single(from: impl AsRef<[u8]>) -> Result { struct BorrowedStream<'a>(&'a [u8]); #[async_trait] impl<'a> SuspendStream for BorrowedStream<'a> { async fn get_object(&mut self) -> Result, Error> { if self.0.len() ==0 { return Ok(None); } let mut ptr = &self.0[..]; let vl = Object::from_stream(&mut ptr).await?; let diff = (ptr.as_ptr() as usize) - ((&self.0[..]).as_ptr() as usize); self.0 = &self.0[diff..]; Ok(Some(vl)) } async fn set_object(&mut self, _: Object) -> Result<(), Error> { panic!("Cannot write to borrowed stream") } } let bytes = from.as_ref(); let mut stream = BorrowedStream(bytes); T::load(&mut stream).await }