You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
7.6 KiB

use super::*;
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::iter;
use std::collections::{
btree_map::Range,
VecDeque,
};
use smallvec::SmallVec;
use std::sync::Arc;
use tokio::{
sync::{
mpsc,
},
};
use futures::prelude::*;
const STACK_CACHE_SIZE: usize = 8;
/// An iterator over entries in a store matching *any* tags
#[derive(Debug)]
pub struct StoreSearchAnyIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a sha256::Sha256Hash>, PhantomData<T>);
/// An iterator over entries in a store matching *all* tags
#[derive(Debug)]
pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a Entry>, SmallVec<[&'a T; STACK_CACHE_SIZE]>);
// Searching by tags
impl Store
{
/// Lookup tag indecies for this iterator of tags
pub(super) fn tag_index_lookup<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> impl Iterator<Item = (& str, ArenaIndex)> + '_
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => Some(self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))),
_ => None
}.map_into_iter()
}
/// Find the `data_hashes` index for this entry in `data`.
pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option<ArenaIndex>
{
self.data_hashes.iter().filter_map(|(i, h)| if h == hs {
Some(i)
} else {
None
}).next()
}
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
///
/// The stream outputs `EntryKey`s, which can be used to look up the `Entry` in the store in `O(1)` time.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
///
/// # Panics
/// It is not guaranteed that the search will complete before the stream ends, if one of the background tasks panics; although this is unlikely.
/// If one of the tasks panics, it is ignored.
///
/// This is only useful in a multithreaded environment where tasks can be scheduled on different threads, otherwise use `tag_search_all`
pub fn tag_search_all_detached<U: Into<String>>(self: Arc<Self>, tags: impl IntoIterator<Item=U>) -> impl Stream<Item = EntryKey> + 'static
{
let (tx, rx) = mpsc::channel(16);
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
tokio::spawn(async move {
let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) {
(Some(low), Some(high)) => self.tags.range::<str, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return,
};
future::join_all(range.map(|(_, &ti)| {
let store = Arc::clone(&self);
let mut tx = tx.clone();
tokio::spawn(async move {
let data_hashes = &store.data_hashes;
for x in store.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
{
if tx.send(*x).await.is_err() {
return;
}
}
})
})).await;
});
rx
}
/// Search for all entries with *all* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAllIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAllIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAllIter(self, None, Default::default(), sorted),
}), VecDeque::new(), sorted)
}
/// Search for all entries with *any* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_any<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAnyIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAnyIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAnyIter(self, None, Default::default(), PhantomData),
}), VecDeque::new(), PhantomData)
}
/// Search for all items with this provided tag.
///
/// # Notes
/// This is allowed to produce duplicate entries, if an entry has two of the same tags set.
pub fn tag_search<'a, T: ?Sized + Ord>(&'a self, tag: &T) -> StoreSearchAnyIter<'a, T>
where String: Borrow<T>
{
let r= (std::ops::Bound::Included(tag), std::ops::Bound::Included(tag));
StoreSearchAnyIter(self, Some(self.tags.range::<T, _>(r)), VecDeque::new(), PhantomData)
}
fn _assert_test_search(&self)
{
let _x: Vec<_> = self.tag_search("hello").dedup_ref().collect();
let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect();
let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect();
}
async fn _assert_test_search_par(self: Arc<Self>)
{
let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await;
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T>
where T: Ord,
String: Borrow<T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let data = &self.0.data;
let tags = &self.3;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
.filter_map(move |x| data.get(x))
// Ensure all our `tags` are present in the entry's tags
.filter(move |entry| tags.iter()
.filter(move |&x| entry.tags
.binary_search_by_key(x, |t: &String| -> &T { t.borrow() })
.is_err())
.count() == 0);
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAnyIter<'a, T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx));
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front().map(|x| self.0.data.get(x)).flatten()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> iter::FusedIterator for StoreSearchAnyIter<'a, T>{}