From dd4e23104891cc398eab65fc7d2fbd53781b3668 Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 22 Dec 2020 01:37:00 +0000 Subject: [PATCH] mutation operations --- Cargo.lock | 7 ++ Cargo.toml | 1 + src/data/entry.rs | 8 ++ src/data/freeze.rs | 8 +- src/data/mod.rs | 60 +++++++++++-- src/data/mutation.rs | 204 +++++++++++++++++++++++++++++++++++++++++++ src/data/search.rs | 21 +++++ src/ext/mod.rs | 30 +++++++ src/main.rs | 2 + 9 files changed, 331 insertions(+), 10 deletions(-) create mode 100644 src/data/mutation.rs diff --git a/Cargo.lock b/Cargo.lock index 8e6c829..9230587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "ad-hoc-iter" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5068a429476033d1940f21e21d317afae2fc3a82f412d5d8fe08c13f100a00e8" + [[package]] name = "autocfg" version = "1.0.1" @@ -455,6 +461,7 @@ dependencies = [ name = "mtfse" version = "0.1.0" dependencies = [ + "ad-hoc-iter", "bytes 0.6.0", "cryptohelpers", "futures", diff --git a/Cargo.toml b/Cargo.toml index 828ec43..531a199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ad-hoc-iter = "0.2.2" bytes = "0.6.0" cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]} futures = "0.3.8" diff --git a/src/data/entry.rs b/src/data/entry.rs index 21a1e16..efd1221 100644 --- a/src/data/entry.rs +++ b/src/data/entry.rs @@ -32,6 +32,14 @@ pub struct Entry cache: DataCacheState, } +impl Entry +{ + pub(super) fn purge_from_host(&self, host: &mut Store) + { + host.tag_search_any(&self.tags[..]).dedup_ref(); //TODO + } +} + impl Entry { /// The sha256 hash of the data in this entry diff --git a/src/data/freeze.rs b/src/data/freeze.rs index 17df4de..a0cbb58 100644 --- a/src/data/freeze.rs +++ b/src/data/freeze.rs @@ -39,10 +39,10 @@ impl Freeze { if let Some(&ti) = new.tags.get(tag) { // This tag has an entry already, append to it - new.tag_mappings.get_mut(ti).unwrap().push(hash_idx); + new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx); } else { // This tag has no entry, create it - let ti = new.tag_mappings.insert(vec![hash_idx]); + let ti = new.tag_mappings.insert(iter![hash_idx].collect()); new.tags.insert(tag.clone(), ti); } } @@ -67,10 +67,10 @@ impl Freeze { if let Some(&ti) = new.tags.get(tag) { // This tag has an entry already, append to it - new.tag_mappings.get_mut(ti).unwrap().push(hash_idx); + new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx); } else { // This tag has no entry, create it - let ti = new.tag_mappings.insert(vec![hash_idx]); + let ti = new.tag_mappings.insert(iter![hash_idx].collect()); new.tags.insert(tag.clone(), ti); } } diff --git a/src/data/mod.rs b/src/data/mod.rs index bc0bfed..ab57413 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -30,23 +30,66 @@ pub use freeze::Freeze; mod search; pub use search::*; +mod mutation; +pub use mutation::*; + /// The key used to look up a single entry in `O(1)` time. +/// +/// # Notes +/// If you change this, make sure to change the `BuildHasher` back to `RandomState`. pub type EntryKey = sha256::Sha256Hash; +/// The hasher used for the entry data set. +/// +/// # Notes +/// Change this back to `RandomState` if you change the type of `EntryKey`. +pub type BuildHasher = Sha256TopBuildHasher; + +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)] +struct PurgeTrack +{ + /// Number of removals since last purge + num_removals: usize, + /// Max number of removals before an arena index purge. + /// + /// If set to 0, purge will be performed on every removal of a tag. + num_removals_max: usize, +} + +impl PurgeTrack +{ + /// Create a new purge tracker with this purge threshold. + #[inline] pub fn new(max: usize) -> Self + { + Self { + num_removals: 0, + num_removals_max: max, + } + } + + /// Should we purge now? + #[inline] pub fn should_purge(&self) -> bool + { + self.num_removals >= self.num_removals_max + } +} + #[derive(Debug)] pub struct Store { metadata: StoreMetadata, - data: HashSet, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`. - data_hashes: Arena, // used to lookup in `data`. + purge_track: PurgeTrack, + + data: HashSet, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`. + data_hashes: Arena, // used to lookup in `data`. - tag_mappings: Arena>, + tag_mappings: Arena>, tags: BTreeMap, // string (tags) -> index (tag_mappings) -> index (data_hashes) -> hash used for lookup (data) } // Comptime asserts (TODO: Unneeded. Remove.) -impl Store +/*impl Store { fn _assert_tag_lookup_works(&self, tag: &str) -> &Entry { @@ -56,7 +99,7 @@ impl Store { self.data.get(self.data_hashes.get(data_idx).unwrap()).unwrap() // yes this works.. } -} +}*/ // Creating impl Store @@ -70,6 +113,9 @@ impl Store assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `new` not existant or not a directory", metadata.root); Self { metadata, + + purge_track: PurgeTrack::new(16), + data: HashSet::with_hasher(Default::default()), data_hashes: Arena::new(), @@ -86,7 +132,9 @@ impl Store assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `with_capacity` not existant or not a directory", metadata.root); Self { metadata, - + + purge_track: PurgeTrack::new(16), + data: HashSet::with_capacity_and_hasher(cap, Default::default()), data_hashes: Arena::with_capacity(cap), diff --git a/src/data/mutation.rs b/src/data/mutation.rs new file mode 100644 index 0000000..221b182 --- /dev/null +++ b/src/data/mutation.rs @@ -0,0 +1,204 @@ +//! Handling store mutation +use super::*; + +impl Store +{ + /// Insert this entry into the data table, overwriting any identically hashed one and returning it. + pub fn insert_overwrite(&mut self, ent: Entry) -> Option + { + let old = self.remove(ent.hash()); + + let hash_idx = self.data_hashes.insert(*ent.hash()); + for tag in ent.tags.iter() { + if let Some(&ti) = self.tags.get(tag) { + // This tag has an entry already, append to it + self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx); + } else { + // This tag has no entry, create it + let ti = self.tag_mappings.insert(iter![hash_idx].collect()); + self.tags.insert(tag.clone(), ti); + } + } + + self.data.insert(ent); + old + } + + /// Insert this entry then return a reference to it. + pub fn insert(&mut self, ent: Entry) -> &Entry + { + let ffd = *ent.hash(); + self.insert_overwrite(ent); + self.data.get(&ffd).unwrap() + } + + /// Mutate this entry in place if it exists. + /// + /// See [`map_entry`]. + pub fn mutate_entry_in_place(&mut self, ent_id: &EntryKey, f: F) -> Option + where F: FnOnce(&mut Entry) -> T + { + if let Some(mut ent) = self.data.take(ent_id) { + let ohash = ent.hash().clone(); + let otags = ent.tags.clone(); + let out = f(&mut ent); + let new = ent; + + let iidx = if new.hash() != &ohash { + // We need to update `data_hashes`. + for (_, hash) in self.data_hashes.iter_mut() + { + if hash == &ohash { + *hash = *new.hash(); + break; + } + } + self.reverse_index_lookup(new.hash()).unwrap() + } else { + self.reverse_index_lookup(&ohash).unwrap() + }; + + if &new.tags[..] != &otags[..] { + // We need to update tag mappings + + + let ntags: HashSet<_> = new.tags.iter().collect(); + let otags: HashSet<_> = otags.iter().collect(); + // Find the ones that were removed and added in parallel. + for (t, u) in ntags.iter().zip(otags.iter()) + { + if !otags.contains(t) { + // It was added + self.insert_tag_for_idx(t, iidx); + } + if !ntags.contains(u) { + // It was removed + self.remove_tag_for_idx(t, iidx); + } + } + + } + self.data.insert(new); + Some(out) + } else { + None + } + } + + /// Map the entry with this function, updating references to it if needed. + /// + /// If the hash of the entry if modified by this map, then the hashes indecies are updated to the new hash. + pub fn map_entry(&mut self, ent_id: &EntryKey, f: F) + where F: FnOnce(Entry) -> Entry + { + if let Some(ent) = self.data.take(ent_id) { + let ohash = ent.hash().clone(); + let new = f(ent); + if new.hash() != &ohash { + // We need to update `data_hashes`. + for (_, hash) in self.data_hashes.iter_mut() + { + if hash == &ohash { + *hash = *new.hash(); + break; + } + } + } + self.data.insert(new); + } + } + + + /// Remove this entry, and return it, if it was set. + pub fn remove(&mut self, key: &EntryKey) -> Option + { + if let Some(entry) = self.data.take(key) { + Some(self.cleanup_remove_entry(entry)) + } else { + None + } + } + + /// Preform cleanup on an entry *already removed* from `data`. + fn cleanup_remove_entry(&mut self, ent: Entry) -> Entry + { + let ent = ent.with_no_cache(); + // Remove any unused tags + for (nm, ti) in precollect!(self.tag_index_lookup(&ent.tags[..]).map(|(nm, idx)| ({ + ent.tags.iter().filter(|y| y.as_str() == nm).next().unwrap() // swap the `nm` reference to the owned reference in `ent`'s tags... There should be a better way that this eh + }, idx))) { + if self.purge_tag_index(ti, ent.hash()) { + // No more mappings, remove this tag + self.tags.remove(nm); + // And its mapping + self.tag_mappings.remove(ti); + } + } + // Remove from data hashes can be deferred + self.purge_if_needed(); + ent + } + + /// Purge this tag index from the mappings for the entry `to_remove`. + /// Returns true if there are no more references to this tag and it can be removed. + #[inline] fn purge_tag_index(&mut self, idx: ArenaIndex, to_remove: &EntryKey) -> bool + { + let data_hashes = &mut self.data_hashes; + if let Some(map) = self.tag_mappings.get_mut(idx) { + map.retain(move |&hash_idx| data_hashes.get(hash_idx).map(|x| x != to_remove).unwrap_or(false)); + map.len() == 0 + } else { + // There is no reference in the tag mapping itself. + false + } + } + + /// Remove dead mappings from `data_hashes` to `data`. + #[inline] fn purge_data_hash_mappings(&mut self) + { + let data = &self.data; + self.data_hashes.retain(move |_, hash| data.get(hash).is_some()); + } + + /// Purge the arena mapping if threshold of dead entries is reached, otherwise defer it. + #[inline] fn purge_if_needed(&mut self) + { + if self.purge_track.should_purge() { + self.purge_data_hash_mappings(); + self.purge_track.num_removals = 0; + } else { + self.purge_track.num_removals += 1; + } + } +} + +// Tag specific stuff +impl Store +{ + /// Remove a mapping for this tag string to this specific hash index, cleaning up the tag mappings if needed. + #[inline] fn remove_tag_for_idx(&mut self, tag: impl AsRef, hash_idx: ArenaIndex) + { + let tag = tag.as_ref(); + if let Some(&ti) = self.tags.get(tag) { + match self.tag_mappings.get_mut(ti).map(|x| {x.remove(&hash_idx); x.len()}) { + Some(0) => no_op!(self.tag_mappings.remove(ti)), // there is only 1 mapping, remove it and then remove the tag (TODO: Should we keep the tag in the btree as cache? TODO: Add this to `PurgeTrack`) + None => (), // there is no mapping, just remove the tag + _ => return, //don't remove the tag, there's other references in the mapping + } + self.tags.remove(tag); + } + } + /// Insert a mapping for this tag string to this single hash index, creating it if needed + #[inline] fn insert_tag_for_idx(&mut self, tag: impl AsRef, hash_idx: ArenaIndex) + { + let tag = tag.as_ref(); + if let Some(&ti) = self.tags.get(tag) { + // This tag has an entry already, append to it + self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx); + } else { + // This tag has no entry, create it + let ti = self.tag_mappings.insert(iter![hash_idx].collect()); + self.tags.insert(tag.to_owned(), ti); + } + } +} diff --git a/src/data/search.rs b/src/data/search.rs index f3617bf..2a5077b 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -28,6 +28,27 @@ pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option(&self, tags: impl IntoIterator) -> impl Iterator + '_ + where String: Borrow + { + let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect(); + sorted.sort(); + match (sorted.first(), sorted.last()) { + (Some(&low), Some(&high)) => Some(self.tags.range::((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))), + _ => None + }.map_into_iter() + } + + /// Find the `data_hashes` index for this entry in `data`. + pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option + { + self.data_hashes.iter().filter_map(|(i, h)| if h == hs { + Some(i) + } else { + None + }).next() + } /// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks. /// diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 631bbae..2e6f000 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -11,3 +11,33 @@ pub use streams::*; mod hashers; pub use hashers::*; + +pub const PRECOLLECT_STACK_SIZE: usize = 64; + +/// Collect an iterator's output and then drop it to detach the iterator from any references or resources it might have. +#[macro_export] macro_rules! precollect { + ($iter:expr, $num:literal) => { + { + { + let it: ::smallvec::SmallVec<[_; $num]> = $iter.into_iter().collect(); + it.into_iter() + } + } + }; + ($iter:expr) => { + { + { + let it: ::smallvec::SmallVec<[_; $crate::ext::PRECOLLECT_STACK_SIZE]> = $iter.into_iter().collect(); + it.into_iter() + } + } + } +} + +#[macro_export] macro_rules! no_op { + ($expr:expr) => { + { + $expr; + } + }; +} diff --git a/src/main.rs b/src/main.rs index 4e2cba2..66300b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,8 @@ #![allow(dead_code)] +#[macro_use] extern crate ad_hoc_iter; + use serde::{Serialize, Deserialize}; use jemallocator::Jemalloc;