diff --git a/Cargo.lock b/Cargo.lock index 557f451..8e6c829 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,7 @@ dependencies = [ "generational-arena", "jemallocator", "memmap", + "pin-project", "serde", "smallvec", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6986236..828ec43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3.8" generational-arena = "0.2.8" jemallocator = "0.3.2" memmap = "0.7.0" +pin-project = "1.0.2" serde = {version = "1.0.118", features= ["derive"]} smallvec = "1.5.1" tokio = {version = "0.2", features = ["full"]} diff --git a/src/data/mod.rs b/src/data/mod.rs index 2c57a77..90af972 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -30,6 +30,9 @@ pub use freeze::Freeze; mod search; pub use search::*; +/// The key used to look up a single entry in `O(1)` time. +pub type EntryKey = sha256::Sha256Hash; + #[derive(Debug)] pub struct Store { @@ -122,3 +125,13 @@ impl Store freeze.into_new() } } + +// Primitive access +impl Store +{ + /// Look up a single entry in `O(1)` time with its key. + #[inline] pub fn get(&self, key: &EntryKey) -> Option<&Entry> + { + self.data.get(key) + } +} diff --git a/src/data/search.rs b/src/data/search.rs index f3e865e..f3617bf 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -7,6 +7,13 @@ use std::collections::{ VecDeque, }; use smallvec::SmallVec; +use std::sync::Arc; +use tokio::{ + sync::{ + mpsc, + }, +}; +use futures::prelude::*; const STACK_CACHE_SIZE: usize = 8; @@ -21,13 +28,55 @@ pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option>(self: Arc, tags: impl IntoIterator) -> impl Stream + 'static + { + let (tx, rx) = mpsc::channel(16); + let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect(); + sorted.sort(); + tokio::spawn(async move { + let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) { + (Some(low), Some(high)) => self.tags.range::((std::ops::Bound::Included(low), std::ops::Bound::Included(high))), + _ => return, + }; + future::join_all(range.map(|(_, &ti)| { + let store = Arc::clone(&self); + let mut tx = tx.clone(); + tokio::spawn(async move { + let data_hashes = &store.data_hashes; + for x in store.tag_mappings.get(ti) + .map_into_iter() + .filter_map(move |&idx| data_hashes.get(idx)) + { + if tx.send(*x).await.is_err() { + return; + } + } + }) + })).await; + }); + rx + } /// Search for all entries with *all* of these provided tags. /// /// # Notes /// This is allowed to produce duplicate entries, if either: /// * An entry has multiple of the same tag set /// * An entry has multiple of the tags provided to this function set - //TODO: Parallelize this with futures::Stream ver to do the `tag_search` lookups in parallel. pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator) -> StoreSearchAllIter<'_, T> where String: Borrow { @@ -73,6 +122,10 @@ impl Store let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect(); let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect(); } + async fn _assert_test_search_par(self: Arc) + { + let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await; + } } impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T> @@ -93,7 +146,7 @@ where T: Ord, .map_into_iter() .filter_map(move |&idx| data_hashes.get(idx)) .filter_map(move |x| data.get(x)) - // Ensure all our `tags` are present in the entry's tags + // Ensure all our `tags` are present in the entry's tags .filter(move |entry| tags.iter() .filter(move |&x| entry.tags .binary_search_by_key(x, |t: &String| -> &T { t.borrow() }) diff --git a/src/ext.rs b/src/ext.rs index f19da8f..8a32d11 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,12 +1,16 @@ use std::iter::FusedIterator; use std::collections::BTreeSet; use std::borrow::Borrow; +use futures::prelude::*; /// An iterator that may be empty. #[derive(Debug, Clone)] pub struct MaybeIter(Option) where I: Iterator; +mod streams; +pub use streams::*; + pub trait OptionIterExt: Sized where I: Iterator { diff --git a/src/ext/streams.rs b/src/ext/streams.rs new file mode 100644 index 0000000..716daa0 --- /dev/null +++ b/src/ext/streams.rs @@ -0,0 +1,53 @@ +use super::*; +use pin_project::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +#[pin_project] +#[derive(Debug, Clone)] +pub struct MaybeStream(#[pin] Option) +where I: Stream; + +pub trait OptionStreamExt: Sized +where I: Stream +{ + /// Map this `Option` into a stream that will yield the items of the stream if it is present. + fn map_into_stream(self) -> MaybeStream; +} + +impl, T> Stream for MaybeStream +{ + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().0.as_pin_mut() + { + Some(i) => i.poll_next(cx), + _ => Poll::Ready(None), + } + } + fn size_hint(&self) -> (usize, Option) { + match &self.0 { + Some(i) => i.size_hint(), + None => (0, Some(0)), + } + } +} +impl> OptionStreamExt for Result +where I: Stream +{ + #[inline] fn map_into_stream(self) -> MaybeStream { + MaybeStream(self.ok()) + } +} +impl> OptionStreamExt for Option +where I: Stream +{ + #[inline] fn map_into_stream(self) -> MaybeStream { + MaybeStream(self) + } +} + + +