Freeze read-write and from-into

new-idea
Avril 4 years ago
parent 6973bddf41
commit b753923bf0
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -54,6 +54,7 @@ fn install() -> eyre::Result<()>
Ok(()) Ok(())
} }
//pub use defaults::VERSION;
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {

@ -12,7 +12,7 @@ use tokio::prelude::*;
struct FreezeInner struct FreezeInner
{ {
users: HashSet<User>, users: HashSet<User>,
posts: HashSet<Post>, posts: HashSet<(UserID, Post)>,
} }
const FREEZE_CHK: &[u8; 4] = b"REI\0"; const FREEZE_CHK: &[u8; 4] = b"REI\0";
@ -32,6 +32,20 @@ struct FreezeMetadata
impl FreezeMetadata impl FreezeMetadata
{ {
#[inline] pub fn len(&self) -> usize
{
self.body_size.try_into().expect("Length exceeded limit of `usize`")
}
#[inline] fn new(from: &[u8]) -> Self
{
Self {
chk: *FREEZE_CHK,
version: defaults::VERSION,
body_size: from.len().try_into().unwrap(), //this should never fail, right?
compressed: false,
body_hash: sha256::compute_slice(from),
}
}
/// Write this metadata to an async stream, return the number of bytes written. /// Write this metadata to an async stream, return the number of bytes written.
pub async fn write_to(&self, mut to: impl tokio::io::AsyncWrite + Unpin) -> io::Result<usize> pub async fn write_to(&self, mut to: impl tokio::io::AsyncWrite + Unpin) -> io::Result<usize>
{ {
@ -153,185 +167,182 @@ pub struct Freeze
impl Freeze impl Freeze
{ {
/// Generate the output to write /// Generate the output to write.
fn gen_output(&self) -> (FreezeMetadata, Box<u8>) ///
/// This might take a while, and is done synchronously. Recommended to run on blocking task.
fn gen_output(&self) -> serde_cbor::Result<(FreezeMetadata, Vec<u8>)>
{ {
todo!() let body = serde_cbor::to_vec(&self.inner)?;
Ok((FreezeMetadata::new(&body[..]), body))
} }
}
/* /// Read `Freeze` from an input stream
/// Reading and writing state pub async fn read_async(mut input: impl tokio::io::AsyncRead + Unpin) -> eyre::Result<Self>
//TODO: Compression {
impl Freeze let inner: FreezeInner = {
{ let unchecked_meta = FreezeMetadata::read_from(&mut input)
/// Serialise this instance into an output synchronously .await
pub fn write_sync(&self, output: impl io::Write) -> eyre::Result<()> .wrap_err(eyre!("Failed to read metadata from stream"))?;
{ let mut body: Vec<u8> = vec![0; unchecked_meta.len()];
serde_cbor::to_writer(output, self) input.read_exact(&mut body[..]).await
.wrap_err(eyre!("Failed to write (sync) to output")) .wrap_err(eyre!("Failed to read body from stream"))
.with_section(|| format!("{:?}", unchecked_meta).header("Metadata was"))?;
tokio::task::spawn_blocking(move || {
if unchecked_meta.verify_hash_blocking(&body) {
Err(eyre!("Failed to verify metadata hash"))
} else {
serde_cbor::from_slice(&body)
.wrap_err(eyre!("Failed to deserialise body"))
}
}).await
.wrap_err("Background task panic")?
.with_section(|| format!("{:?}", unchecked_meta).header("Metadata was"))?
};
Ok(Self {
inner,
})
} }
/// Serialise this instance into an output asynchronously
/// Write `Freeze` into this output stream on the current task.
/// ///
/// # Notes /// This function runs the serialisation and hashing on a the current task, which is synchronous. Recommended to use `into_write_async` instead.
/// This function serialises to a heap buffer first
pub async fn write_async(&self, mut output: impl tokio::io::AsyncWrite + Unpin) -> eyre::Result<()> pub async fn write_async(&self, mut output: impl tokio::io::AsyncWrite + Unpin) -> eyre::Result<()>
{ {
let buf = serde_cbor::to_vec(self) let (meta, body) = self.gen_output()
.wrap_err(eyre!("Failed to serialise to buffer"))?; .wrap_err(eyre!("Failed to generate write body"))?;
meta.write_to(&mut output).await
output.write_all(&buf[..]).await .wrap_err(eyre!("Failed to write metadata to output stream"))
.wrap_err(eyre!("Failed to write buffer to output stream"))?; .with_section(|| format!("{:?}", meta).header("Metadata was"))?;
output.write_all(&body[..]).await
.wrap_err(eyre!("Failed to write whole body to output stream"))?;
Ok(()) Ok(())
} }
/// Deserialise an instance from an input synchronously /// Consume this `Freeze` into this output stream.
pub fn read_sync(input: impl io::Read) -> eyre::Result<Self> ///
/// This function runs the serialisation and hashing on a background blocking task instead of on the current one.
pub async fn into_write_async(self, mut output: impl tokio::io::AsyncWrite + Unpin) -> eyre::Result<()>
{ {
serde_cbor::from_reader(input) let (meta, body) = tokio::task::spawn_blocking(move || self.gen_output())
.wrap_err(eyre!("Failed to read (sync) from input")) .await
.wrap_err(eyre!("Background task panic"))?
.wrap_err(eyre!("Failed to generate write body"))?;
meta.write_to(&mut output).await
.wrap_err(eyre!("Failed to write metadata to output stream"))
.with_section(|| format!("{:?}", meta).header("Metadata was"))?;
output.write_all(&body[..]).await
.wrap_err(eyre!("Failed to write whole body to output stream"))?;
Ok(())
} }
}
/// Deserialise an instance from an input asynchronously
impl State
{
/// Create an image from the state.
/// ///
/// # Notes /// # Locks
/// This function reads the entire input stream into a heap allocated buffer first /// This method holds the read lock of Oneesan, it also holds the read lock of all posts and users.
/// This will prevent any writes while the `Freeze` is being created, and will also yield the current task until all write operations on `State` are completed.
/// ///
/// # Panics /// # Panics
/// If the buffer size exceeds `defaults::MAX_IMAGE_READ_SIZE`. /// If the internal state is incorrect.
pub async fn read_async(mut input: impl tokio::io::AsyncRead + Unpin) -> eyre::Result<Self> pub async fn freeze(&self) -> Freeze
{ {
let whole = { let onee = self.0.oneesan.read().await;
let mut whole = Vec::new(); use std::ops::Deref;
let mut buffer = [0u8; 4096];
let mut read; // this might be kinda expensive. should we offload this?
let mut done =0; let post_owner_reverse_lookup: HashMap<Index, UserID> = onee.posts_user_map.iter()
while { read = input.read(&mut buffer[..]).await.wrap_err(eyre!("Failed to read from input"))?; read > 0 } .map(|(&y,x)| x.iter().map(move |idx| (*idx,y)))
{ .flatten()
whole.extend_from_slice(&buffer[..read]); .collect();
done+=read; let (posts, users) = tokio::join!(
if defaults::MAX_IMAGE_READ_SIZE > 0 { //constant fn should stream::iter(onee.posts.iter()).then(|(post_idx, shared)| {
if done > defaults::MAX_IMAGE_READ_SIZE { let owner_id = *post_owner_reverse_lookup.get(&post_idx).unwrap();
Err::<!, _>(eyre!("Image read exceeded max read size")) async move { (owner_id, Post::clone(shared.read().await.deref())) }
.with_section(|| done.to_string().header("Current size is")) }).collect(),
.with_note(|| defaults::MAX_IMAGE_READ_SIZE.to_string().header("Max size is")) stream::iter(onee.users.iter()).then(|(_, shared)| async move { User::clone(shared.read().await.deref()) }).collect()
.expect("Image read fatal"); //panic with this instance );
let inner = FreezeInner {
} posts, users
}
}
whole
}; };
Freeze {
serde_cbor::from_slice(&whole) inner,
.wrap_err(eyre!("Failed to deserialise buffer")) }
} }
} }
impl Freeze impl Freeze
{ {
/// Create a `State` from this freeze /// Create a working `State` from this image.
/// ///
/// # Notes /// This clones all posts and users in the image. Use `into_state` to move into a state.
/// This clones all post and user data, to perform this operation without cloning, use `into_state`.
pub fn unfreeze(&self) -> State pub fn unfreeze(&self) -> State
{ {
let users: HashMap<UserID, RwLock<User>> = self.users let mut users = Arena::with_capacity(self.inner.users.len());
.iter() let mut posts = Arena::with_capacity(self.inner.posts.len());
.map(|x| (*x.id(), RwLock::new(x.clone()))).collect(); let mut posts_map = HashMap::with_capacity(self.inner.posts.len());
let mut posts: HashMap<UserID, MaybeVec<RwLock<Post>>> = HashMap::with_capacity(users.len()); let mut users_map = HashMap::with_capacity(self.inner.users.len());
let mut posts_user_map = HashMap::with_capacity(self.inner.users.len());
for (owner_id, post) in self.inner.posts.iter()
{
let idx = posts.insert(Arc::new(RwLock::new(post.clone())));
posts_user_map.entry(*owner_id).or_insert_with(|| MaybeVec::new()).push(idx);
posts_map.insert(*post.post_id(), idx);
}
for (id, post) in self.posts.iter() for user in self.inner.users.iter()
{ {
posts.entry(*id).or_insert_with(move || MaybeVec::new()).push(RwLock::new(post.clone())); let idx = users.insert(Arc::new(RwLock::new(user.clone())));
users_map.insert(*user.id(), idx);
} }
State(Arc::new(Inner{posts: RwLock::new( Posts { State(Arc::new(Inner {
users, oneesan: RwLock::new(Oneesan {
posts users,
})})) posts,
posts_map,
users_map,
posts_user_map
})
}))
} }
/// Consume into a new `State`. /// Convert this image into a new `State`.
pub fn into_state(self) -> State pub fn into_state(self) -> State
{ {
State::from_freeze(self) let mut users = Arena::with_capacity(self.inner.users.len());
} let mut posts = Arena::with_capacity(self.inner.posts.len());
let mut posts_map = HashMap::with_capacity(self.inner.posts.len());
/// Try to consume this `State` into a `Freeze`. let mut users_map = HashMap::with_capacity(self.inner.users.len());
/// let mut posts_user_map = HashMap::with_capacity(self.inner.users.len());
/// # Fails
/// See `State::try_into_freeze()`.
#[inline] pub fn try_from_state(state: State) -> Result<Self, State>
{
state.try_into_freeze()
}
}
impl State
{
/// Create a serialisable image from this state
pub async fn freeze(&self) -> Freeze
{
let posts = self.0.posts.read().await;
let users: HashSet<User> = {
stream::iter(posts.users.iter())
.then(|(_, x)| async move { x.read().await })
.map(|x| x.clone()).collect().await
};
let posts: Vec<_> = {
stream::iter(posts.posts.iter()
.map(|(x, y)| y.into_iter().map(move |z| (x, z))).flatten())
.then(|x| async move { (x.0, x.1.read().await) })
.map(|(&id, post)| (id, post.clone())).collect().await
};
Freeze {users, posts} for (owner_id, post) in self.inner.posts.into_iter()
} {
let post_id = *post.post_id();
/// Try to consume into a `Freeze`. let idx = posts.insert(Arc::new(RwLock::new(post)));
/// posts_user_map.entry(owner_id).or_insert_with(|| MaybeVec::new()).push(idx);
/// # Result posts_map.insert(post_id, idx);
/// This method fails if there are more than one shared reference to the state
pub fn try_into_freeze(self) -> Result<Freeze, Self>
{
match Arc::try_unwrap(self.0) {
Ok(freeze) => {
let posts = freeze.posts.into_inner();
let users: HashSet<User> = {
posts.users.into_iter()
.map(|(_, x)| x.into_inner()).collect()
};
let posts: Vec<_> = {
posts.posts.into_iter()
.map(|(x, y)| y.into_iter().map(move |z| (x, z.into_inner()))).flatten()
//.map(|x| (x.0, x.1.into_inner()) )
.map(|(id, post)| (id, post)).collect()
};
Ok(Freeze {users, posts})
},
Err(e) => Err(Self(e))
} }
}
/// Create `State` from this image for user in self.inner.users.into_iter()
pub fn from_freeze(freeze: Freeze) -> Self
{
let users: HashMap<UserID, RwLock<User>> = freeze.users
.into_iter()
.map(|x| (*x.id(), RwLock::new(x))).collect();
let mut posts: HashMap<UserID, MaybeVec<RwLock<Post>>> = HashMap::with_capacity(users.len());
for (id, post) in freeze.posts.into_iter()
{ {
posts.entry(id).or_insert_with(move || MaybeVec::new()).push(RwLock::new(post)); let user_id = *user.id();
let idx = users.insert(Arc::new(RwLock::new(user)));
users_map.insert(user_id, idx);
} }
Self(Arc::new(Inner{posts: RwLock::new( Posts { State(Arc::new(Inner {
users, oneesan: RwLock::new(Oneesan {
posts users,
})})) posts,
posts_map,
users_map,
posts_user_map
})
}))
} }
} }
@ -339,17 +350,6 @@ impl From<Freeze> for State
{ {
#[inline] fn from(from: Freeze) -> Self #[inline] fn from(from: Freeze) -> Self
{ {
Self::from_freeze(from) from.into_state()
}
}
impl TryFrom<State> for Freeze
{
type Error = State;
#[inline] fn try_from(from: State) -> Result<Self, Self::Error>
{
from.try_into_freeze()
} }
} }
*/

Loading…
Cancel
Save