mutation operations

master
Avril 4 years ago
parent a1c9f4cd39
commit dd4e231048
Signed by: flanchan
GPG Key ID: 284488987C31F630

7
Cargo.lock generated

@ -1,5 +1,11 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
[[package]]
name = "ad-hoc-iter"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5068a429476033d1940f21e21d317afae2fc3a82f412d5d8fe08c13f100a00e8"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@ -455,6 +461,7 @@ dependencies = [
name = "mtfse" name = "mtfse"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ad-hoc-iter",
"bytes 0.6.0", "bytes 0.6.0",
"cryptohelpers", "cryptohelpers",
"futures", "futures",

@ -8,6 +8,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
ad-hoc-iter = "0.2.2"
bytes = "0.6.0" bytes = "0.6.0"
cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]} cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]}
futures = "0.3.8" futures = "0.3.8"

@ -32,6 +32,14 @@ pub struct Entry
cache: DataCacheState, cache: DataCacheState,
} }
impl Entry
{
pub(super) fn purge_from_host(&self, host: &mut Store)
{
host.tag_search_any(&self.tags[..]).dedup_ref(); //TODO
}
}
impl Entry impl Entry
{ {
/// The sha256 hash of the data in this entry /// The sha256 hash of the data in this entry

@ -39,10 +39,10 @@ impl Freeze
{ {
if let Some(&ti) = new.tags.get(tag) { if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it // This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().push(hash_idx); new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else { } else {
// This tag has no entry, create it // This tag has no entry, create it
let ti = new.tag_mappings.insert(vec![hash_idx]); let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti); new.tags.insert(tag.clone(), ti);
} }
} }
@ -67,10 +67,10 @@ impl Freeze
{ {
if let Some(&ti) = new.tags.get(tag) { if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it // This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().push(hash_idx); new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else { } else {
// This tag has no entry, create it // This tag has no entry, create it
let ti = new.tag_mappings.insert(vec![hash_idx]); let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti); new.tags.insert(tag.clone(), ti);
} }
} }

@ -30,23 +30,66 @@ pub use freeze::Freeze;
mod search; mod search;
pub use search::*; pub use search::*;
mod mutation;
pub use mutation::*;
/// The key used to look up a single entry in `O(1)` time. /// The key used to look up a single entry in `O(1)` time.
///
/// # Notes
/// If you change this, make sure to change the `BuildHasher` back to `RandomState`.
pub type EntryKey = sha256::Sha256Hash; pub type EntryKey = sha256::Sha256Hash;
/// The hasher used for the entry data set.
///
/// # Notes
/// Change this back to `RandomState` if you change the type of `EntryKey`.
pub type BuildHasher = Sha256TopBuildHasher;
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)]
struct PurgeTrack
{
/// Number of removals since last purge
num_removals: usize,
/// Max number of removals before an arena index purge.
///
/// If set to 0, purge will be performed on every removal of a tag.
num_removals_max: usize,
}
impl PurgeTrack
{
/// Create a new purge tracker with this purge threshold.
#[inline] pub fn new(max: usize) -> Self
{
Self {
num_removals: 0,
num_removals_max: max,
}
}
/// Should we purge now?
#[inline] pub fn should_purge(&self) -> bool
{
self.num_removals >= self.num_removals_max
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Store pub struct Store
{ {
metadata: StoreMetadata, metadata: StoreMetadata,
data: HashSet<Entry, Sha256TopBuildHasher>, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`. purge_track: PurgeTrack,
data_hashes: Arena<sha256::Sha256Hash>, // used to lookup in `data`.
tag_mappings: Arena<Vec<ArenaIndex>>, data: HashSet<Entry, BuildHasher>, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`.
data_hashes: Arena<EntryKey>, // used to lookup in `data`.
tag_mappings: Arena<HashSet<ArenaIndex>>,
tags: BTreeMap<String, ArenaIndex>, // string (tags) -> index (tag_mappings) -> index (data_hashes) -> hash used for lookup (data) tags: BTreeMap<String, ArenaIndex>, // string (tags) -> index (tag_mappings) -> index (data_hashes) -> hash used for lookup (data)
} }
// Comptime asserts (TODO: Unneeded. Remove.) // Comptime asserts (TODO: Unneeded. Remove.)
impl Store /*impl Store
{ {
fn _assert_tag_lookup_works(&self, tag: &str) -> &Entry fn _assert_tag_lookup_works(&self, tag: &str) -> &Entry
{ {
@ -56,7 +99,7 @@ impl Store
{ {
self.data.get(self.data_hashes.get(data_idx).unwrap()).unwrap() // yes this works.. self.data.get(self.data_hashes.get(data_idx).unwrap()).unwrap() // yes this works..
} }
} }*/
// Creating // Creating
impl Store impl Store
@ -70,6 +113,9 @@ impl Store
assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `new` not existant or not a directory", metadata.root); assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `new` not existant or not a directory", metadata.root);
Self { Self {
metadata, metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_hasher(Default::default()), data: HashSet::with_hasher(Default::default()),
data_hashes: Arena::new(), data_hashes: Arena::new(),
@ -87,6 +133,8 @@ impl Store
Self { Self {
metadata, metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_capacity_and_hasher(cap, Default::default()), data: HashSet::with_capacity_and_hasher(cap, Default::default()),
data_hashes: Arena::with_capacity(cap), data_hashes: Arena::with_capacity(cap),

@ -0,0 +1,204 @@
//! Handling store mutation
use super::*;
impl Store
{
/// Insert this entry into the data table, overwriting any identically hashed one and returning it.
pub fn insert_overwrite(&mut self, ent: Entry) -> Option<Entry>
{
let old = self.remove(ent.hash());
let hash_idx = self.data_hashes.insert(*ent.hash());
for tag in ent.tags.iter() {
if let Some(&ti) = self.tags.get(tag) {
// This tag has an entry already, append to it
self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = self.tag_mappings.insert(iter![hash_idx].collect());
self.tags.insert(tag.clone(), ti);
}
}
self.data.insert(ent);
old
}
/// Insert this entry then return a reference to it.
pub fn insert(&mut self, ent: Entry) -> &Entry
{
let ffd = *ent.hash();
self.insert_overwrite(ent);
self.data.get(&ffd).unwrap()
}
/// Mutate this entry in place if it exists.
///
/// See [`map_entry`].
pub fn mutate_entry_in_place<T, F>(&mut self, ent_id: &EntryKey, f: F) -> Option<T>
where F: FnOnce(&mut Entry) -> T
{
if let Some(mut ent) = self.data.take(ent_id) {
let ohash = ent.hash().clone();
let otags = ent.tags.clone();
let out = f(&mut ent);
let new = ent;
let iidx = if new.hash() != &ohash {
// We need to update `data_hashes`.
for (_, hash) in self.data_hashes.iter_mut()
{
if hash == &ohash {
*hash = *new.hash();
break;
}
}
self.reverse_index_lookup(new.hash()).unwrap()
} else {
self.reverse_index_lookup(&ohash).unwrap()
};
if &new.tags[..] != &otags[..] {
// We need to update tag mappings
let ntags: HashSet<_> = new.tags.iter().collect();
let otags: HashSet<_> = otags.iter().collect();
// Find the ones that were removed and added in parallel.
for (t, u) in ntags.iter().zip(otags.iter())
{
if !otags.contains(t) {
// It was added
self.insert_tag_for_idx(t, iidx);
}
if !ntags.contains(u) {
// It was removed
self.remove_tag_for_idx(t, iidx);
}
}
}
self.data.insert(new);
Some(out)
} else {
None
}
}
/// Map the entry with this function, updating references to it if needed.
///
/// If the hash of the entry if modified by this map, then the hashes indecies are updated to the new hash.
pub fn map_entry<F>(&mut self, ent_id: &EntryKey, f: F)
where F: FnOnce(Entry) -> Entry
{
if let Some(ent) = self.data.take(ent_id) {
let ohash = ent.hash().clone();
let new = f(ent);
if new.hash() != &ohash {
// We need to update `data_hashes`.
for (_, hash) in self.data_hashes.iter_mut()
{
if hash == &ohash {
*hash = *new.hash();
break;
}
}
}
self.data.insert(new);
}
}
/// Remove this entry, and return it, if it was set.
pub fn remove(&mut self, key: &EntryKey) -> Option<Entry>
{
if let Some(entry) = self.data.take(key) {
Some(self.cleanup_remove_entry(entry))
} else {
None
}
}
/// Preform cleanup on an entry *already removed* from `data`.
fn cleanup_remove_entry(&mut self, ent: Entry) -> Entry
{
let ent = ent.with_no_cache();
// Remove any unused tags
for (nm, ti) in precollect!(self.tag_index_lookup(&ent.tags[..]).map(|(nm, idx)| ({
ent.tags.iter().filter(|y| y.as_str() == nm).next().unwrap() // swap the `nm` reference to the owned reference in `ent`'s tags... There should be a better way that this eh
}, idx))) {
if self.purge_tag_index(ti, ent.hash()) {
// No more mappings, remove this tag
self.tags.remove(nm);
// And its mapping
self.tag_mappings.remove(ti);
}
}
// Remove from data hashes can be deferred
self.purge_if_needed();
ent
}
/// Purge this tag index from the mappings for the entry `to_remove`.
/// Returns true if there are no more references to this tag and it can be removed.
#[inline] fn purge_tag_index(&mut self, idx: ArenaIndex, to_remove: &EntryKey) -> bool
{
let data_hashes = &mut self.data_hashes;
if let Some(map) = self.tag_mappings.get_mut(idx) {
map.retain(move |&hash_idx| data_hashes.get(hash_idx).map(|x| x != to_remove).unwrap_or(false));
map.len() == 0
} else {
// There is no reference in the tag mapping itself.
false
}
}
/// Remove dead mappings from `data_hashes` to `data`.
#[inline] fn purge_data_hash_mappings(&mut self)
{
let data = &self.data;
self.data_hashes.retain(move |_, hash| data.get(hash).is_some());
}
/// Purge the arena mapping if threshold of dead entries is reached, otherwise defer it.
#[inline] fn purge_if_needed(&mut self)
{
if self.purge_track.should_purge() {
self.purge_data_hash_mappings();
self.purge_track.num_removals = 0;
} else {
self.purge_track.num_removals += 1;
}
}
}
// Tag specific stuff
impl Store
{
/// Remove a mapping for this tag string to this specific hash index, cleaning up the tag mappings if needed.
#[inline] fn remove_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
match self.tag_mappings.get_mut(ti).map(|x| {x.remove(&hash_idx); x.len()}) {
Some(0) => no_op!(self.tag_mappings.remove(ti)), // there is only 1 mapping, remove it and then remove the tag (TODO: Should we keep the tag in the btree as cache? TODO: Add this to `PurgeTrack`)
None => (), // there is no mapping, just remove the tag
_ => return, //don't remove the tag, there's other references in the mapping
}
self.tags.remove(tag);
}
}
/// Insert a mapping for this tag string to this single hash index, creating it if needed
#[inline] fn insert_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
// This tag has an entry already, append to it
self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = self.tag_mappings.insert(iter![hash_idx].collect());
self.tags.insert(tag.to_owned(), ti);
}
}
}

@ -28,6 +28,27 @@ pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String,
// Searching by tags // Searching by tags
impl Store impl Store
{ {
/// Lookup tag indecies for this iterator of tags
pub(super) fn tag_index_lookup<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> impl Iterator<Item = (& str, ArenaIndex)> + '_
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => Some(self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))),
_ => None
}.map_into_iter()
}
/// Find the `data_hashes` index for this entry in `data`.
pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option<ArenaIndex>
{
self.data_hashes.iter().filter_map(|(i, h)| if h == hs {
Some(i)
} else {
None
}).next()
}
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks. /// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
/// ///

@ -11,3 +11,33 @@ pub use streams::*;
mod hashers; mod hashers;
pub use hashers::*; pub use hashers::*;
pub const PRECOLLECT_STACK_SIZE: usize = 64;
/// Collect an iterator's output and then drop it to detach the iterator from any references or resources it might have.
#[macro_export] macro_rules! precollect {
($iter:expr, $num:literal) => {
{
{
let it: ::smallvec::SmallVec<[_; $num]> = $iter.into_iter().collect();
it.into_iter()
}
}
};
($iter:expr) => {
{
{
let it: ::smallvec::SmallVec<[_; $crate::ext::PRECOLLECT_STACK_SIZE]> = $iter.into_iter().collect();
it.into_iter()
}
}
}
}
#[macro_export] macro_rules! no_op {
($expr:expr) => {
{
$expr;
}
};
}

@ -2,6 +2,8 @@
#![allow(dead_code)] #![allow(dead_code)]
#[macro_use] extern crate ad_hoc_iter;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use jemallocator::Jemalloc; use jemallocator::Jemalloc;

Loading…
Cancel
Save