detached parallel tag search

master
Avril 3 years ago
parent 89ffe57153
commit 2155e729d7
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
Cargo.lock generated

@ -461,6 +461,7 @@ dependencies = [
"generational-arena",
"jemallocator",
"memmap",
"pin-project",
"serde",
"smallvec",
"tokio",

@ -14,6 +14,7 @@ futures = "0.3.8"
generational-arena = "0.2.8"
jemallocator = "0.3.2"
memmap = "0.7.0"
pin-project = "1.0.2"
serde = {version = "1.0.118", features= ["derive"]}
smallvec = "1.5.1"
tokio = {version = "0.2", features = ["full"]}

@ -30,6 +30,9 @@ pub use freeze::Freeze;
mod search;
pub use search::*;
/// The key used to look up a single entry in `O(1)` time.
pub type EntryKey = sha256::Sha256Hash;
#[derive(Debug)]
pub struct Store
{
@ -122,3 +125,13 @@ impl Store
freeze.into_new()
}
}
// Primitive access
impl Store
{
/// Look up a single entry in `O(1)` time with its key.
#[inline] pub fn get(&self, key: &EntryKey) -> Option<&Entry>
{
self.data.get(key)
}
}

@ -7,6 +7,13 @@ use std::collections::{
VecDeque,
};
use smallvec::SmallVec;
use std::sync::Arc;
use tokio::{
sync::{
mpsc,
},
};
use futures::prelude::*;
const STACK_CACHE_SIZE: usize = 8;
@ -21,13 +28,55 @@ pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String,
// Searching by tags
impl Store
{
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
///
/// The stream outputs `EntryKey`s, which can be used to look up the `Entry` in the store in `O(1)` time.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
///
/// # Panics
/// It is not guaranteed that the search will complete before the stream ends, if one of the background tasks panics; although this is unlikely.
/// If one of the tasks panics, it is ignored.
///
/// This is only useful in a multithreaded environment where tasks can be scheduled on different threads, otherwise use `tag_search_all`
pub fn tag_search_all_detached<U: Into<String>>(self: Arc<Self>, tags: impl IntoIterator<Item=U>) -> impl Stream<Item = EntryKey> + 'static
{
let (tx, rx) = mpsc::channel(16);
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
tokio::spawn(async move {
let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) {
(Some(low), Some(high)) => self.tags.range::<str, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return,
};
future::join_all(range.map(|(_, &ti)| {
let store = Arc::clone(&self);
let mut tx = tx.clone();
tokio::spawn(async move {
let data_hashes = &store.data_hashes;
for x in store.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
{
if tx.send(*x).await.is_err() {
return;
}
}
})
})).await;
});
rx
}
/// Search for all entries with *all* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
//TODO: Parallelize this with futures::Stream ver to do the `tag_search` lookups in parallel.
pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAllIter<'_, T>
where String: Borrow<T>
{
@ -73,6 +122,10 @@ impl Store
let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect();
let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect();
}
async fn _assert_test_search_par(self: Arc<Self>)
{
let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await;
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T>
@ -93,7 +146,7 @@ where T: Ord,
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
.filter_map(move |x| data.get(x))
// Ensure all our `tags` are present in the entry's tags
// Ensure all our `tags` are present in the entry's tags
.filter(move |entry| tags.iter()
.filter(move |&x| entry.tags
.binary_search_by_key(x, |t: &String| -> &T { t.borrow() })

@ -1,12 +1,16 @@
use std::iter::FusedIterator;
use std::collections::BTreeSet;
use std::borrow::Borrow;
use futures::prelude::*;
/// An iterator that may be empty.
#[derive(Debug, Clone)]
pub struct MaybeIter<I, T>(Option<I>)
where I: Iterator<Item=T>;
mod streams;
pub use streams::*;
pub trait OptionIterExt<I, T>: Sized
where I: Iterator<Item=T>
{

@ -0,0 +1,53 @@
use super::*;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[pin_project]
#[derive(Debug, Clone)]
pub struct MaybeStream<I,T>(#[pin] Option<I>)
where I: Stream<Item=T>;
pub trait OptionStreamExt<I, T>: Sized
where I: Stream<Item=T>
{
/// Map this `Option<Stream>` into a stream that will yield the items of the stream if it is present.
fn map_into_stream(self) -> MaybeStream<I,T>;
}
impl<I: Stream<Item=T>, T> Stream for MaybeStream<I,T>
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.as_pin_mut()
{
Some(i) => i.poll_next(cx),
_ => Poll::Ready(None),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.0 {
Some(i) => i.size_hint(),
None => (0, Some(0)),
}
}
}
impl<E, T, I: Stream<Item=T>> OptionStreamExt<I, T> for Result<I, E>
where I: Stream<Item = T>
{
#[inline] fn map_into_stream(self) -> MaybeStream<I, T> {
MaybeStream(self.ok())
}
}
impl<T, I: Stream<Item=T>> OptionStreamExt<I, T> for Option<I>
where I: Stream<Item = T>
{
#[inline] fn map_into_stream(self) -> MaybeStream<I, T> {
MaybeStream(self)
}
}
Loading…
Cancel
Save