From 5cb807a107043dfa3ac66ab9e4769ff7d8f4797f Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 12 Sep 2020 03:01:50 +0100 Subject: [PATCH] karada --- Cargo.lock | 128 +++++++++++++++++++++ Cargo.toml | 1 + src/cache.rs | 95 ++++++++++++++++ src/main.rs | 6 + src/post/mod.rs | 2 +- src/state/local.rs | 94 ---------------- src/state/local/delta.rs | 230 ++++++++++++++++++++++++++++++++++++++ src/state/local/error.rs | 26 +++++ src/state/local/karada.rs | 91 +++++++++++++++ src/state/local/mod.rs | 136 ++++++++++++++++++++++ src/state/local/work.rs | 113 +++++++++++++++++++ src/suspend.rs | 29 ++--- 12 files changed, 843 insertions(+), 108 deletions(-) create mode 100644 src/cache.rs delete mode 100644 src/state/local.rs create mode 100644 src/state/local/delta.rs create mode 100644 src/state/local/error.rs create mode 100644 src/state/local/karada.rs create mode 100644 src/state/local/mod.rs create mode 100644 src/state/local/work.rs diff --git a/Cargo.lock b/Cargo.lock index e4cd791..4bfb340 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,12 +118,101 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -322,18 +411,56 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "pin-project" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4433fff2ae79342e497d9f8ee990d174071408f28f726d6d83af93e58e48aa" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +[[package]] +name = "proc-macro-hack" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + [[package]] name = "proc-macro2" version = "1.0.20" @@ -643,6 +770,7 @@ dependencies = [ "byteorder", "chrono", "cryptohelpers", + "futures", "libc", "once_cell", "rustc_version", diff --git a/Cargo.toml b/Cargo.toml index 94537c1..d9c1bd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ libc = "0.2.76" byteorder = "1.3.4" serde_cbor = "0.11.1" serde = {version = "1.0.115", features= ["derive"]} +futures = "0.3.5" [build-dependencies] rustc_version = "0.2" diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..a64fd78 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,95 @@ +//! Caching interfaces +use super::*; +use std::{ + borrow::{ + Borrow, + ToOwned, + }, +}; + +pub trait Cache +{ + fn cap(&self) -> Option; + fn len(&self) -> usize; + + fn insert(&mut self, value: T) -> &T; + fn get>(&self, value: &Q) -> Option<&T>; +} + +pub struct MemCache(Vec); +pub struct UnlimitedMemCache(Vec); + +impl MemCache +{ + /// Create a new cache with a max size + pub fn new(cap: usize) -> Self + { + Self(Vec::with_capacity(cap)) + } +} + +impl Cache for MemCache +{ + fn cap(&self) -> Option + { + Some(self.0.capacity()) + } + fn len(&self) -> usize + { + self.0.len() + } + + fn insert(&mut self, value: T) -> &T + { + if self.0.len() == self.0.capacity() { + self.0.remove(self.0.len()-1); + } + self.0.insert(0, value); + &self.0[0] + } + + fn get>(&self, value: &Q) -> Option<&T> + { + for x in self.0.iter() { + if value.eq(x) { + return Some(x); + } + } + None + } +} + +impl UnlimitedMemCache +{ + /// Create a new cache + pub fn new() -> Self + { + Self(Vec::new()) + } +} + +// extension + +pub trait CacheExt +{ + /// Insert into the cache if borrowed value is not present, + fn clone_insert<'a, Q>(&'a mut self, refer: &Q) -> &'a T + where Q: ToOwned + ?Sized + PartialEq, + T: Borrow; +} + +impl CacheExt for S +where S: Cache, + T: Clone +{ + fn clone_insert<'a, Q>(&'a mut self, refer: &Q) -> &'a T + where Q: ToOwned + ?Sized + PartialEq, + T: Borrow + { + if let Some(get) = self.get(refer) { + return get; + } + + self.insert(refer.to_owned()) + } +} diff --git a/src/main.rs b/src/main.rs index 370119a..dfa7d50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,12 @@ use serde::{ mod bytes; mod suspend; +mod cache; +use cache::{ + Cache, + CacheExt as _, +}; + mod config; mod tripcode; mod identity; diff --git a/src/post/mod.rs b/src/post/mod.rs index 649d891..2a92b5e 100644 --- a/src/post/mod.rs +++ b/src/post/mod.rs @@ -123,7 +123,7 @@ mod tests #[tokio::test] async fn static_post_ser() { - let mut output = suspend::MemorySsuspendStream::new(); + let mut output = suspend::MemorySuspendStream::new(); let post1 = Static { id: identity::PostID::new(), user: Default::default(), diff --git a/src/state/local.rs b/src/state/local.rs deleted file mode 100644 index 418fd8e..0000000 --- a/src/state/local.rs +++ /dev/null @@ -1,94 +0,0 @@ -//! Handles updating posts -use super::*; -use std::{ - sync::Arc, -}; -use tokio::{ - sync::{ - RwLock, - watch, - }, -}; - -const MAX_SINGLE_DELTA_SIZE: usize = 16; - -/// Information about the delta to be applied in `Delta`. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum DeltaKind -{ - /// Append to the end of body. Equivilant to `Insert` with a location at `karada.scape.len()` (the end of the buffer). Might be removed idk - Append{ - span: [char; MAX_SINGLE_DELTA_SIZE], - span_len: u8, - }, - /// Insert `span_len` chars from `span` into body starting ahead of `location` and moving ahead - Insert{ - span: [char; MAX_SINGLE_DELTA_SIZE], - span_len: u8, - }, - /// Remove `span_len` chars ahead of `location`. (INCLUSIVE) - RemoveAhead{ - span_len: usize, - }, - /// Remove `span_len` chars behind this `location`. (EXCLUSIVE) - RemoveBehind{ - span_len: usize, - }, - /// Remove char at `location` - RemoveSingle, - /// Remove entire post body - Clear, -} - -/// A delta to apply to `Karada`. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Delta -{ - /// Location to insert into. This is the INclusive range of: 0..=(karada.scape.len()). - /// - /// Insertions off the end of the buffer are to be appened instead. - location: usize, - /// The kind of delta t oinsert - kind: DeltaKind, -} - -/// Static assertion: `MAX_SINGLE_DELTA_SIZE` can fit into `u8`. -const _: [u8;(MAX_SINGLE_DELTA_SIZE < (!0u8 as usize)) as usize] = [0]; - -/// Contains post deltas and an intermediate representation of the still-open post body. -/// Created and modified with `Kokoro` worker instances. -/// -/// This should not be created by itself, instead `Kokoro` should create instances of this, so that it can retain the `watch::Sender` and other such things. -#[derive(Debug)] -pub struct Karada -{ - /// The post body so far as a vector of `char`s. - scape: Arc>>, - /// All applied deltas so far. Last applied one is at the end. - deltas: Arc>>, - - /// the latest render of the whole body string. Updated whenever a delta(s) are applied atomically. - current_body: watch::Receiver, -} - -/// Handles working on `Karada` instances. -pub struct Kokoro -{ -// ...//TODO -} - -/// An open, as yet unfinied post -#[derive(Debug)] -pub struct Imouto -{ - id: identity::PostID, - - user: identity::User, - - title: Option, - karada: Karada, - - /// Hash of the current post data - hash: crypto::sha256::Sha256Hash, - timestamp: post::PostTimestamp, //Note that `closed` should always be `None` in `Imouto`. We use this for post lifetimes and such -} diff --git a/src/state/local/delta.rs b/src/state/local/delta.rs new file mode 100644 index 0000000..3b3ab67 --- /dev/null +++ b/src/state/local/delta.rs @@ -0,0 +1,230 @@ +//! Deltas and applying them +use super::*; + +const MAX_SINGLE_DELTA_SIZE: usize = 16; + +/// Create a delta span from an input iterator. +/// +/// This function can take no more than 255 chars from the input. The number of chars inserted is also returned as `u8`. +pub(super) fn delta_span(from: I) -> ([char; MAX_SINGLE_DELTA_SIZE], u8) +where I: IntoIterator +{ + let mut output: [char; MAX_SINGLE_DELTA_SIZE] = Default::default(); + let mut sz: u8 = 0; + + for (d, s) in output.iter_mut().zip(from.into_iter().take(usize::from(u8::MAX))) + { + *d = s; + sz += 1; + } + + (output, sz) +} + +/// Information about the delta to be applied in `Delta`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +pub enum DeltaKind +{ + /// Append to the end of body. Equivilant to `Insert` with a location at `karada.scape.len()` (the end of the buffer). Might be removed idk + Append{ + span: [char; MAX_SINGLE_DELTA_SIZE], + span_len: u8, + }, + /// Insert `span_len` chars from `span` into body starting *at* `location` and moving ahead + Insert{ + span: [char; MAX_SINGLE_DELTA_SIZE], + span_len: u8, + }, + /// Remove `span_len` chars ahead of `location`. (INCLUSIVE) + RemoveAhead{ + span_len: usize, + }, + /// Remove `span_len` chars behind this `location`. (EXCLUSIVE) + RemoveBehind{ + span_len: usize, + }, + /// Remove char at `location` + RemoveSingle, + /// Remove entire post body + Clear, +} + +/// A delta to apply to `Karada`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +pub struct Delta +{ + /// Location to insert into. This is the INclusive range of: 0..=(karada.scape.len()). + /// + /// Insertions off the end of the buffer are to be appened instead. + location: usize, + /// The kind of delta t oinsert + kind: DeltaKind, +} + +/// Static assertion: `MAX_SINGLE_DELTA_SIZE` can fit into `u8`. +const _: [u8;(MAX_SINGLE_DELTA_SIZE < (!0u8 as usize)) as usize] = [0]; + +impl Delta +{ + pub fn insert(&self, inserter: &mut MessageSpan) + { + match self.kind { + DeltaKind::Append{span, span_len} => { + inserter.extend_from_slice(&span[..usize::from(span_len)]); + }, + DeltaKind::Insert{span, span_len} => { + let span = &span[..usize::from(span_len)]; + if self.location == inserter.len() { + inserter.extend_from_slice(span); + } else { + // reserve the extra space + inserter.reserve(span.len()); + // shift everything across, replacing with the new values + let splice: Vec<_> = inserter.splice(self.location.., span.iter().cloned()).collect(); + // add tail back + inserter.extend(splice); + } + }, + _ => unimplemented!(), + } + } +} + +#[cfg(test)] +mod tests +{ + use super::*; + #[test] + fn insert_body() + { + let mut message: Vec = "126789".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("345".chars()); + Delta { + location: 2, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_end() + { + let mut message: Vec = "1289".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("34567".chars()); + Delta { + location: 2, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_end_rev() + { + let mut message: Vec = "1234569".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("78".chars()); + Delta { + location: 6, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_begin_rev() + { + let mut message: Vec = "1456789".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("23".chars()); + Delta { + location: 1, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_begin() + { + let mut message: Vec = "789".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("123456".chars()); + Delta { + location: 0, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_end_f() + { + let mut message: Vec = "123".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("456789".chars()); + Delta { + location: 3, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } + #[test] + fn insert_end_f_rev() + { + let mut message: Vec = "1234567".chars().collect(); + + let delta = { + let (span, span_len) = delta_span("89".chars()); + Delta { + location: 7, + kind: DeltaKind::Insert{span, span_len}, + } + }; + + println!("from: {:?}", message); + println!("delta: {:?}", delta); + delta.insert(&mut message); + + assert_eq!(&message.into_iter().collect::()[..], "123456789"); + } +} diff --git a/src/state/local/error.rs b/src/state/local/error.rs new file mode 100644 index 0000000..90d41c3 --- /dev/null +++ b/src/state/local/error.rs @@ -0,0 +1,26 @@ +//! Local state change error +use super::*; +use std::{ + error, + fmt, +}; + +/// Local state updating errors +#[derive(Debug)] +#[non_exhaustive] +pub enum Error { + BroadcastUpdate, + Unknown, +} +impl error::Error for Error{} +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::BroadcastUpdate => write!(f, "failed to broadcast state body update"), + _ => write!(f, "unknown error"), + } + } +} + diff --git a/src/state/local/karada.rs b/src/state/local/karada.rs new file mode 100644 index 0000000..f4fb559 --- /dev/null +++ b/src/state/local/karada.rs @@ -0,0 +1,91 @@ +//! Mutating post body +use super::*; + +/// A message that can be mutated by deltas. +pub type MessageSpan = Vec; + +/// Contains post deltas and an intermediate representation of the still-open post body. +/// Created and modified with `Kokoro` worker instances. +/// +/// This should not be created by itself, instead `Kokoro` should create instances of this, so that it can retain the `watch::Sender` and other such things. +#[derive(Debug)] +pub struct Karada +{ + /// The post body so far as a vector of `char`s. + pub(super) scape: Arc>, + /// All applied deltas so far. Last applied one is at the end. + pub(super) deltas: Arc>>, + + /// the latest render of the whole body string. Updated whenever a delta(s) are applied atomically. + pub(super) current_body: watch::Receiver, +} + +impl Karada +{ + /// Clone the body string + pub fn body(&self) -> String + { + self.current_body.borrow().to_owned() + } + + /// Consume this instance into a suspension + /// + /// This will only acquire locks if needed, but since they might be needed, it must be awaited in case of `Kokoro` instances potentially owning the data. + pub async fn into_suspended(self) -> Suspension + { + let scape: String = { + let scape = self.scape; + match Arc::try_unwrap(scape) { //try to unwrap if possible, to avoid acquiring useless lock + Ok(scape) => scape.into_inner().into_iter().collect(), + Err(scape) => scape.read().await.iter().collect(), + } + }; + let deltas: Vec = { + let deltas = self.deltas; + match Arc::try_unwrap(deltas) { + Ok(deltas) => deltas.into_inner(), + Err(deltas) => deltas.read().await.clone(), + } + }; + + Suspension{scape,deltas} + } + + pub(super) fn from_suspended(susp: Suspension, current_body: watch::Receiver) -> Self + { + Self { + scape: Arc::new(RwLock::new(susp.scape.chars().collect())), + deltas: Arc::new(RwLock::new(susp.deltas)), + current_body, + } + } +} + +/// Suspension of [`Karada`](Karada). +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Suspension +{ + pub(super) scape: String, + pub(super) deltas: Vec, +} + +use suspend::{ + Suspendable, + SuspendStream, +}; + +#[async_trait] +impl Suspendable for Suspension +{ + async fn suspend(self, into: &mut S) -> Result<(), suspend::Error> + { + let mut output = suspend::Object::new(); + output.insert("post-dynamic", self); + into.set_object(output).await + } + async fn load(from: &mut S) -> Result + { + let mut input = from.get_object().await?.ok_or(suspend::Error::BadObject)?; + input.try_get("post-dynamic").ok_or(suspend::Error::BadObject) + } +} diff --git a/src/state/local/mod.rs b/src/state/local/mod.rs new file mode 100644 index 0000000..822ad48 --- /dev/null +++ b/src/state/local/mod.rs @@ -0,0 +1,136 @@ +//! Handles updating posts +use super::*; +use std::{ + sync::Arc, +}; +use tokio::{ + sync::{ + RwLock, + watch, + }, +}; + +mod karada; +pub use karada::*; + +mod delta; +pub use delta::*; + +mod work; +pub use work::*; + +mod error; + +pub use error::Error as LocalError; + +#[derive(Debug)] +enum Worker +{ + Attached(tokio::task::JoinHandle<()>), + /// Worker has not been attached yet + Suspended(Kokoro), +} + +/// An open, as yet unfinied post +#[derive(Debug)] +pub struct Imouto +{ + id: identity::PostID, + + user: identity::User, + + title: Option, + + karada: Karada, + worker: Worker, + + timestamp: post::PostTimestamp, //Note that `closed` should always be `None` in `Imouto`. We use this for post lifetimes and such +} + +use suspend::{ + Suspendable, + SuspendStream, +}; + +#[async_trait] +impl Suspendable for Imouto +{ + async fn suspend(self, into: &mut S) -> Result<(), suspend::Error> + { + let mut output = suspend::Object::new(); + output.insert("id", self.id); + output.insert("user", self.user); + output.insert("title", self.title); + output.insert("body", match self.worker { + Worker::Suspended(kokoro) => kokoro.into_suspended().await, // consume worker if possible + Worker::Attached(_) => self.karada.into_suspended().await, // consume body instead + }); + //output.insert("hash", self.hash); + output.insert("timestamp", self.timestamp); + into.set_object(output).await + } + async fn load(from: &mut S) -> Result + { + let mut input = from.get_object().await?.ok_or(suspend::Error::BadObject)?; + macro_rules! get { + ($name:literal) => { + input.try_get($name).ok_or_else(|| suspend::Error::MissingObject(std::borrow::Cow::Borrowed($name)))? + }; + } + let id = get!("id"); + let user = get!("user"); + let title = get!("title"); + let body = get!("body"); + //let hash = get!("hash"); + let timestamp = get!("timestamp"); + + let (karada, worker) = { + let mut kokoro = Kokoro::from_suspended(body); + (kokoro.spawn().unwrap(), Worker::Suspended(kokoro)) + }; + Ok(Self { + id, + user, + title, + karada, + worker, + //hash, + timestamp, + }) + } +} + +#[cfg(test)] +mod tests +{ + use super::*; + #[tokio::test] + async fn basic_ser() + { + let mut output = suspend::MemorySuspendStream::new(); + let mut lolis = std::iter::repeat_with(|| { + let (karada, kokoro) = { + let mut kokoro = Kokoro::new(); + (kokoro.spawn().unwrap(), kokoro) + }; + Imouto { + id: identity::PostID::new(), + user: Default::default(), + title: None, + + karada, + worker: Worker::Suspended(kokoro), + + timestamp: post::PostTimestamp::new() + } + }); + let imouto = lolis.next().unwrap(); + imouto.suspend(&mut output).await.expect("Suspension failed"); + + let imouto2 = lolis.next().unwrap(); + + let imouto3 = Imouto::load(&mut output).await.expect("Load failed"); + + assert_eq!(imouto2.karada.body(), imouto3.karada.body()); + } +} diff --git a/src/state/local/work.rs b/src/state/local/work.rs new file mode 100644 index 0000000..58bec6f --- /dev/null +++ b/src/state/local/work.rs @@ -0,0 +1,113 @@ +//! Worker that mutates `Kokoro`. +use super::*; +use tokio::{ + sync::{ + watch, + }, +}; + +/// Handles working on `Karada` instances. +#[derive(Debug)] +pub struct Kokoro +{ + scape: Arc>, + deltas: Arc>>, + update_body: watch::Sender, + + body_recv: Option>, +} + +impl Kokoro +{ + /// Create a new instance. This instance can spawn a `Karada` instance. + pub fn new() -> Self + { + let (tx, rx) = watch::channel(String::new()); + Self { + scape: Arc::new(RwLock::new(MessageSpan::new())), + deltas: Arc::new(RwLock::new(Vec::new())), + + update_body: tx, + body_recv: Some(rx), + } + } + + /// Create a new worker instance from a suspension of `Karada`. + pub fn from_suspended(susp: Suspension) -> Self + { + let span = susp.scape.chars().collect(); + let (tx, rx) = watch::channel(susp.scape); + Self { + scape: Arc::new(RwLock::new(span)), + deltas: Arc::new(RwLock::new(susp.deltas)), + + update_body: tx, + body_recv: Some(rx), + } + } + + /// Consume this instance into a suspension + /// + /// This will only acquire locks if needed, but since they might be needed, it must be awaited in case of `Kokoro` instances potentially owning the data. + pub async fn into_suspended(self) -> Suspension + { + + let scape: String = { + let scape = self.scape; + match Arc::try_unwrap(scape) { //try to unwrap if possible, to avoid acquiring useless lock + Ok(scape) => scape.into_inner().into_iter().collect(), + Err(scape) => scape.read().await.iter().collect(), + } + }; + let deltas: Vec = { + let deltas = self.deltas; + match Arc::try_unwrap(deltas) { + Ok(deltas) => deltas.into_inner(), + Err(deltas) => deltas.read().await.clone(), + } + }; + + Suspension{scape,deltas} + } + + /// Spawn one `Karada` instance. If this has already been called, it returns `None`. + pub fn spawn(&mut self) -> Option + { + self.body_recv.take() + .map(|current_body| { + Karada { + scape: Arc::clone(&self.scape), + deltas: Arc::clone(&self.deltas), + + current_body, + } + }) + } + + /// Spawn a clone `Karada` into a new instance. This function can be called many times to yield `Karada` instances that are all identical and controlled by this instance. + /// + /// # Panics + /// If `spawn` was previously called. + pub fn spawn_clone(&self) -> Karada + { + Karada { + scape: Arc::clone(&self.scape), + deltas: Arc::clone(&self.deltas), + current_body: self.body_recv.as_ref().unwrap().clone(), + } + } + + /// Apply a delta to this instance. + pub async fn apply(&mut self, delta: Delta) -> Result<(), error::Error> + { + let (mut scape, mut deltas) = tokio::join!{ + self.scape.write(), + self.deltas.write(), + }; + // Only start mutating now that both locks are writable. Is this needed, or can we do the mutation concurrently? + delta.insert(scape.as_mut()); + deltas.push(delta); + + self.update_body.broadcast(scape.iter().collect()).map_err(|_| error::Error::BroadcastUpdate) + } +} diff --git a/src/suspend.rs b/src/suspend.rs index 24ee738..c297d5c 100644 --- a/src/suspend.rs +++ b/src/suspend.rs @@ -64,7 +64,7 @@ impl Object { let len = self.data.len(); for (_, range) in self.data_instances.iter() { - if range.start + range.end > len { + if range.end > len {//range.start + range.end > len { return false; } } @@ -268,6 +268,7 @@ impl Object pub async fn into_stream(&self, output: &mut T) -> io::Result where T: AsyncWrite + Unpin + ?Sized { + //eprintln!("{}: {:?}", self.data.len(), self); debug_assert!(self.validate(), "passing invalid object to serialise"); let mut written=0usize; @@ -323,6 +324,7 @@ pub trait SuspendStream #[non_exhaustive] pub enum Error { BadObject, + MissingObject(Cow<'static, str>), Corruption, IO(io::Error), @@ -343,6 +345,7 @@ impl fmt::Display for Error { match self { Self::BadObject => write!(f, "unexpected object in stream"), + Self::MissingObject(string) => write!(f, "missing object from stream: {}", string), Self::Corruption => write!(f, "data stream corruption"), Self::IO(_) => write!(f, "i/o error"), _ => write!(f, "unknown error"), @@ -369,9 +372,9 @@ pub trait Suspendable: Sized /// An in-memory `SuspendStream`. #[derive(Debug, Clone)] -pub struct MemorySsuspendStream(Vec); +pub struct MemorySuspendStream(Vec); -impl MemorySsuspendStream +impl MemorySuspendStream { /// Create a new empty instance pub fn new() -> Self @@ -411,7 +414,7 @@ impl MemorySsuspendStream } -impl AsRef<[u8]> for MemorySsuspendStream +impl AsRef<[u8]> for MemorySuspendStream { fn as_ref(&self) -> &[u8] { @@ -419,7 +422,7 @@ impl AsRef<[u8]> for MemorySsuspendStream } } -impl AsMut<[u8]> for MemorySsuspendStream +impl AsMut<[u8]> for MemorySuspendStream { fn as_mut(&mut self) -> &mut [u8] { @@ -427,7 +430,7 @@ impl AsMut<[u8]> for MemorySsuspendStream } } -impl From> for MemorySsuspendStream +impl From> for MemorySuspendStream { #[inline] fn from(from: Vec) -> Self { @@ -435,7 +438,7 @@ impl From> for MemorySsuspendStream } } -impl From> for MemorySsuspendStream +impl From> for MemorySuspendStream { fn from(from: Box<[u8]>) -> Self { @@ -443,24 +446,24 @@ impl From> for MemorySsuspendStream } } -impl From for Box<[u8]> +impl From for Box<[u8]> { - fn from(from: MemorySsuspendStream) -> Self + fn from(from: MemorySuspendStream) -> Self { from.0.into() } } -impl From for Vec +impl From for Vec { - #[inline] fn from(from: MemorySsuspendStream) -> Self + #[inline] fn from(from: MemorySuspendStream) -> Self { from.0 } } #[async_trait] -impl SuspendStream for MemorySsuspendStream +impl SuspendStream for MemorySuspendStream { async fn get_object(&mut self) -> Result, Error> { if self.0.len() ==0 { @@ -483,7 +486,7 @@ impl SuspendStream for MemorySsuspendStream /// Suspend a single object to memory pub async fn oneshot(value: T) -> Result, Error> { - let mut output = MemorySsuspendStream::new(); + let mut output = MemorySuspendStream::new(); value.suspend(&mut output).await?; Ok(output.into_bytes()) }