diff --git a/src/part.rs b/src/part.rs index e0666fb..9e30f57 100644 --- a/src/part.rs +++ b/src/part.rs @@ -32,14 +32,55 @@ impl MidpointFBSearcher for SearchSeq #[inline] fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> { + let max_cap = match get_max_pivot_search_area(haystack.len() > (DEFAULT_PIVOT_MAX_SEARCH_AREA * DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES)){ // Assume huge-page memory if len is larger than 4 pages. + + // On debug builds, cap search area to one system page only. + _ignore if cfg!(debug_assertions) && (*REAL_PAGE_SIZE) > 0 => + // SAFETY: We have checked if `*REAL_PAGE_SIZE` is non-zero above. + Some(unsafe { NonZeroUsize::new_unchecked(*REAL_PAGE_SIZE as usize) }), + // Otherwise, use the detected value. + cap => cap, + }; + + match haystack.split_at(begin) { ([], []) => None, ([], x) => self.search_forward(x, needle), (x, []) => self.search_backward(x, needle), - - (x, y) => { + + // If both the buffers are lower than `max_cap`, just do the entire operation on each + (x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => { self.search_forward(y, needle)?; self.search_backward(x, needle) + }, + + (mut x, mut y) => { + let len = std::cmp::min(x.len(), y.len()); + let mut cap = std::cmp::min(len, DEFAULT_PIVOT_MAX_SEARCH_AREA); + + while cap <= len { + // If cap is too large for one (or more) of the buffers, truncate it. + if cap > y.len() || cap > x.len() { + cap = std::cmp::min(y.len(), x.len()); + } + + // Search forwards in `y`. (up to `cap`) + if let Some(y) = self.search_forward(&y[..cap], needle) { + return Some(y); + } + // Search backwards in `x`. (down to `cap`) + if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) { + return Some(x); + } + + // Cut out `cap` bytes from the start of forwards + y = &y[cap..]; + // Cut out `cap` bytes from the end of backwards. + x = &x[..cap]; + // Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.) + cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1); + } + None } } } @@ -193,15 +234,24 @@ impl MidpointFBSearcher for SearchPar // Otherwise, use the detected value. cap => cap, }; + // Cap the cap to `max_cap` if there is a max cap. + let cap = if let Some(max) = max_cap.as_ref() { + std::cmp::min(max.get(), self.cap_start) + } else { + self.cap_start + }; let forward = if hf.len() > 0 { - let cap = self.cap_start; + let cap = cap; let sf = &self; let complete = &complete; Some(s.spawn(move || -> Option<_> { let mut cap = std::cmp::min(cap, hf.len()); let len = hf.len(); while cap <= len { + // If `cap` is larger than the buffer `hf`, truncate it. + cap = std::cmp::min(cap, hf.len()); + // Search forward in `hf` up to `cap` bytes. if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) { complete.store(true); //complete.call_once(|| complete_val = true); @@ -220,13 +270,16 @@ impl MidpointFBSearcher for SearchPar None }; let backward = if hb.len() > 0 { - let cap = self.cap_start; + let cap = cap; let sf = &self; let complete = &complete; Some(s.spawn(move || -> Option<_> { let mut cap = std::cmp::min(cap, hb.len()); let len = hb.len(); while cap <= len { + // If `cap` is larger than the buffer `hb`, truncate it. + cap = std::cmp::min(cap, hb.len()); + // Search backwards in `hb` up to `cap` bytes. if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) { complete.store(true); //complete.call_once(|| complete_val = true); @@ -266,3 +319,36 @@ where S: MidpointFBSearcher //XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition... //Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.) + +//TODO: Add tests for `Search{Seq,Par}` partitioning methods. +#[cfg(test)] +mod test +{ + #[test] + fn partition_seq() + { + todo!("Test `SearchSeq` sequential partition searcher") + } + + #[cfg(feature="threads")] + #[test] + fn partition_par_heavy() + { + todo!("Test `SearchPar` parallel partition searcher") + } + + //TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.) + #[cfg(all(feature="threads-async", feature = "threads"))] + #[test] + fn partition_par_light() + { + unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented") + } + + #[cfg(feature="threads-async")] + #[/*tokio::*/test] + fn partition_par_async() + { + unimplemented!("A pure async parallel searcher has not yet been implemented") + } +}