//! Frozen, serialisable state use super::*; use futures::prelude::*; use std::io; use tokio::prelude::*; /// An immutable image of `State`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Freeze { users: HashSet, posts: Vec<(UserID, Post)>, } /// Reading and writing state //TODO: Compression impl Freeze { /// Serialise this instance into an output synchronously pub fn write_sync(&self, output: impl io::Write) -> eyre::Result<()> { serde_cbor::to_writer(output, self) .wrap_err(eyre!("Failed to write (sync) to output")) } /// Serialise this instance into an output asynchronously /// /// # Notes /// This function serialises to a heap buffer first pub async fn write_async(&self, mut output: impl tokio::io::AsyncWrite + Unpin) -> eyre::Result<()> { let buf = serde_cbor::to_vec(self) .wrap_err(eyre!("Failed to serialise to buffer"))?; output.write_all(&buf[..]).await .wrap_err(eyre!("Failed to write buffer to output stream"))?; Ok(()) } /// Deserialise an instance from an input synchronously pub fn read_sync(input: impl io::Read) -> eyre::Result { serde_cbor::from_reader(input) .wrap_err(eyre!("Failed to read (sync) from input")) } /// Deserialise an instance from an input asynchronously /// /// # Notes /// This function reads the entire input stream into a heap allocated buffer first /// /// # Panics /// If the buffer size exceeds `defaults::MAX_IMAGE_READ_SIZE`. pub async fn read_async(mut input: impl tokio::io::AsyncRead + Unpin) -> eyre::Result { let whole = { let mut whole = Vec::new(); let mut buffer = [0u8; 4096]; let mut read; let mut done =0; while { read = input.read(&mut buffer[..]).await.wrap_err(eyre!("Failed to read from input"))?; read > 0 } { whole.extend_from_slice(&buffer[..read]); done+=read; if defaults::MAX_IMAGE_READ_SIZE > 0 { //constant fn should if done > defaults::MAX_IMAGE_READ_SIZE { Err::(eyre!("Image read exceeded max read size")) .with_section(|| done.to_string().header("Current size is")) .with_note(|| defaults::MAX_IMAGE_READ_SIZE.to_string().header("Max size is")) .expect("Image read fatal"); //panic with this instance } } } whole }; serde_cbor::from_slice(&whole) .wrap_err(eyre!("Failed to deserialise buffer")) } } impl Freeze { /// Create a `State` from this freeze /// /// # Notes /// This clones all post and user data, to perform this operation without cloning, use `into_state`. pub fn unfreeze(&self) -> State { let users: HashMap> = self.users .iter() .map(|x| (*x.id(), RwLock::new(x.clone()))).collect(); let mut posts: HashMap>> = HashMap::with_capacity(users.len()); for (id, post) in self.posts.iter() { posts.entry(*id).or_insert_with(move || MaybeVec::new()).push(RwLock::new(post.clone())); } State(Arc::new(Inner{posts: RwLock::new( Posts { users, posts })})) } /// Consume into a new `State`. pub fn into_state(self) -> State { State::from_freeze(self) } /// Try to consume this `State` into a `Freeze`. /// /// # Fails /// See `State::try_into_freeze()`. #[inline] pub fn try_from_state(state: State) -> Result { state.try_into_freeze() } } impl State { /// Create a serialisable image from this state pub async fn freeze(&self) -> Freeze { let posts = self.0.posts.read().await; let users: HashSet = { stream::iter(posts.users.iter()) .then(|(_, x)| async move { x.read().await }) .map(|x| x.clone()).collect().await }; let posts: Vec<_> = { stream::iter(posts.posts.iter() .map(|(x, y)| y.into_iter().map(move |z| (x, z))).flatten()) .then(|x| async move { (x.0, x.1.read().await) }) .map(|(&id, post)| (id, post.clone())).collect().await }; Freeze {users, posts} } /// Try to consume into a `Freeze`. /// /// # Result /// This method fails if there are more than one shared reference to the state pub fn try_into_freeze(self) -> Result { match Arc::try_unwrap(self.0) { Ok(freeze) => { let posts = freeze.posts.into_inner(); let users: HashSet = { posts.users.into_iter() .map(|(_, x)| x.into_inner()).collect() }; let posts: Vec<_> = { posts.posts.into_iter() .map(|(x, y)| y.into_iter().map(move |z| (x, z.into_inner()))).flatten() //.map(|x| (x.0, x.1.into_inner()) ) .map(|(id, post)| (id, post)).collect() }; Ok(Freeze {users, posts}) }, Err(e) => Err(Self(e)) } } /// Create `State` from this image pub fn from_freeze(freeze: Freeze) -> Self { let users: HashMap> = freeze.users .into_iter() .map(|x| (*x.id(), RwLock::new(x))).collect(); let mut posts: HashMap>> = HashMap::with_capacity(users.len()); for (id, post) in freeze.posts.into_iter() { posts.entry(id).or_insert_with(move || MaybeVec::new()).push(RwLock::new(post)); } Self(Arc::new(Inner{posts: RwLock::new( Posts { users, posts })})) } } impl From for State { #[inline] fn from(from: Freeze) -> Self { Self::from_freeze(from) } } impl TryFrom for Freeze { type Error = State; #[inline] fn try_from(from: State) -> Result { from.try_into_freeze() } }