|
|
|
use super::*;
|
|
|
|
use std::borrow::Borrow;
|
|
|
|
use std::marker::PhantomData;
|
|
|
|
use std::iter;
|
|
|
|
use std::collections::{
|
|
|
|
btree_map::Range,
|
|
|
|
VecDeque,
|
|
|
|
};
|
|
|
|
use smallvec::SmallVec;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use tokio::{
|
|
|
|
sync::{
|
|
|
|
mpsc,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
use futures::prelude::*;
|
|
|
|
|
|
|
|
const STACK_CACHE_SIZE: usize = 8;
|
|
|
|
|
|
|
|
/// An iterator over entries in a store matching *any* tags
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct StoreSearchAnyIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a sha256::Sha256Hash>, PhantomData<T>);
|
|
|
|
|
|
|
|
/// An iterator over entries in a store matching *all* tags
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a Entry>, SmallVec<[&'a T; STACK_CACHE_SIZE]>);
|
|
|
|
|
|
|
|
// Searching by tags
|
|
|
|
impl Store
|
|
|
|
{
|
|
|
|
/// Lookup tag indecies for this iterator of tags
|
|
|
|
pub(super) fn tag_index_lookup<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> impl Iterator<Item = (& str, ArenaIndex)> + '_
|
|
|
|
where String: Borrow<T>
|
|
|
|
{
|
|
|
|
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
|
|
|
|
sorted.sort();
|
|
|
|
match (sorted.first(), sorted.last()) {
|
|
|
|
(Some(&low), Some(&high)) => Some(self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))),
|
|
|
|
_ => None
|
|
|
|
}.map_into_iter()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Find the `data_hashes` index for this entry in `data`.
|
|
|
|
pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option<ArenaIndex>
|
|
|
|
{
|
|
|
|
self.data_hashes.iter().filter_map(|(i, h)| if h == hs {
|
|
|
|
Some(i)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}).next()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
|
|
|
|
///
|
|
|
|
/// The stream outputs `EntryKey`s, which can be used to look up the `Entry` in the store in `O(1)` time.
|
|
|
|
///
|
|
|
|
/// # Notes
|
|
|
|
/// This is allowed to produce duplicate entries, if either:
|
|
|
|
/// * An entry has multiple of the same tag set
|
|
|
|
/// * An entry has multiple of the tags provided to this function set
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// It is not guaranteed that the search will complete before the stream ends, if one of the background tasks panics; although this is unlikely.
|
|
|
|
/// If one of the tasks panics, it is ignored.
|
|
|
|
///
|
|
|
|
/// This is only useful in a multithreaded environment where tasks can be scheduled on different threads, otherwise use `tag_search_all`
|
|
|
|
pub fn tag_search_all_detached<U: Into<String>>(self: Arc<Self>, tags: impl IntoIterator<Item=U>) -> impl Stream<Item = EntryKey> + 'static
|
|
|
|
{
|
|
|
|
let (tx, rx) = mpsc::channel(16);
|
|
|
|
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
|
|
|
|
sorted.sort();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) {
|
|
|
|
(Some(low), Some(high)) => self.tags.range::<str, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
|
|
|
|
_ => return,
|
|
|
|
};
|
|
|
|
future::join_all(range.map(|(_, &ti)| {
|
|
|
|
let store = Arc::clone(&self);
|
|
|
|
let mut tx = tx.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let data_hashes = &store.data_hashes;
|
|
|
|
for x in store.tag_mappings.get(ti)
|
|
|
|
.map_into_iter()
|
|
|
|
.filter_map(move |&idx| data_hashes.get(idx))
|
|
|
|
{
|
|
|
|
if tx.send(*x).await.is_err() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})).await;
|
|
|
|
});
|
|
|
|
rx
|
|
|
|
}
|
|
|
|
/// Search for all entries with *all* of these provided tags.
|
|
|
|
///
|
|
|
|
/// # Notes
|
|
|
|
/// This is allowed to produce duplicate entries, if either:
|
|
|
|
/// * An entry has multiple of the same tag set
|
|
|
|
/// * An entry has multiple of the tags provided to this function set
|
|
|
|
pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAllIter<'_, T>
|
|
|
|
where String: Borrow<T>
|
|
|
|
{
|
|
|
|
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
|
|
|
|
sorted.sort();
|
|
|
|
StoreSearchAllIter(self, Some(match (sorted.first(), sorted.last()) {
|
|
|
|
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
|
|
|
|
_ => return StoreSearchAllIter(self, None, Default::default(), sorted),
|
|
|
|
}), VecDeque::new(), sorted)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Search for all entries with *any* of these provided tags.
|
|
|
|
///
|
|
|
|
/// # Notes
|
|
|
|
/// This is allowed to produce duplicate entries, if either:
|
|
|
|
/// * An entry has multiple of the same tag set
|
|
|
|
/// * An entry has multiple of the tags provided to this function set
|
|
|
|
pub fn tag_search_any<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAnyIter<'_, T>
|
|
|
|
where String: Borrow<T>
|
|
|
|
{
|
|
|
|
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
|
|
|
|
sorted.sort();
|
|
|
|
StoreSearchAnyIter(self, Some(match (sorted.first(), sorted.last()) {
|
|
|
|
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
|
|
|
|
_ => return StoreSearchAnyIter(self, None, Default::default(), PhantomData),
|
|
|
|
}), VecDeque::new(), PhantomData)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Search for all items with this provided tag.
|
|
|
|
///
|
|
|
|
/// # Notes
|
|
|
|
/// This is allowed to produce duplicate entries, if an entry has two of the same tags set.
|
|
|
|
pub fn tag_search<'a, T: ?Sized + Ord>(&'a self, tag: &T) -> StoreSearchAnyIter<'a, T>
|
|
|
|
where String: Borrow<T>
|
|
|
|
{
|
|
|
|
let r= (std::ops::Bound::Included(tag), std::ops::Bound::Included(tag));
|
|
|
|
StoreSearchAnyIter(self, Some(self.tags.range::<T, _>(r)), VecDeque::new(), PhantomData)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn _assert_test_search(&self)
|
|
|
|
{
|
|
|
|
let _x: Vec<_> = self.tag_search("hello").dedup_ref().collect();
|
|
|
|
let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect();
|
|
|
|
let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect();
|
|
|
|
}
|
|
|
|
async fn _assert_test_search_par(self: Arc<Self>)
|
|
|
|
{
|
|
|
|
let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T>
|
|
|
|
where T: Ord,
|
|
|
|
String: Borrow<T>
|
|
|
|
{
|
|
|
|
type Item = &'a Entry;
|
|
|
|
fn next(&mut self) -> Option<Self::Item>
|
|
|
|
{
|
|
|
|
if let Some(range) = &mut self.1 {
|
|
|
|
if let Some((_, &ti)) = range.next() {
|
|
|
|
// tag index get
|
|
|
|
let data_hashes = &self.0.data_hashes;
|
|
|
|
let data = &self.0.data;
|
|
|
|
let tags = &self.3;
|
|
|
|
|
|
|
|
let iter = self.0.tag_mappings.get(ti)
|
|
|
|
.map_into_iter()
|
|
|
|
.filter_map(move |&idx| data_hashes.get(idx))
|
|
|
|
.filter_map(move |x| data.get(x))
|
|
|
|
// Ensure all our `tags` are present in the entry's tags
|
|
|
|
.filter(move |entry| tags.iter()
|
|
|
|
.filter(move |&x| entry.tags
|
|
|
|
.binary_search_by_key(x, |t: &String| -> &T { t.borrow() })
|
|
|
|
.is_err())
|
|
|
|
.count() == 0);
|
|
|
|
self.2.extend(iter);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
self.2.pop_front()
|
|
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
match self.1 {
|
|
|
|
None => (0, Some(0)),
|
|
|
|
Some(_) => (0, None),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl<'a, T: ?Sized> Iterator for StoreSearchAnyIter<'a, T>
|
|
|
|
{
|
|
|
|
type Item = &'a Entry;
|
|
|
|
fn next(&mut self) -> Option<Self::Item>
|
|
|
|
{
|
|
|
|
if let Some(range) = &mut self.1 {
|
|
|
|
if let Some((_, &ti)) = range.next() {
|
|
|
|
// tag index get
|
|
|
|
let data_hashes = &self.0.data_hashes;
|
|
|
|
let iter = self.0.tag_mappings.get(ti)
|
|
|
|
.map_into_iter()
|
|
|
|
.filter_map(move |&idx| data_hashes.get(idx));
|
|
|
|
self.2.extend(iter);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
self.2.pop_front().map(|x| self.0.data.get(x)).flatten()
|
|
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
match self.1 {
|
|
|
|
None => (0, Some(0)),
|
|
|
|
Some(_) => (0, None),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl<'a, T: ?Sized> iter::FusedIterator for StoreSearchAnyIter<'a, T>{}
|