part: started `partition_one_with()`: Perform one nearest-needle-to-half partition on a buffer. The behaviour of this partition can be controlled with its `method` parameter. There are currently two: `SearchSeq` - search forward then backward in sequence. `SearchPar` - search forward and backward in parallel (NOTE: `SearchPar` is heavy and not well optimised, it will *always* spawn at least one thread.)
Fortune for reverse's current commit: Future blessing − 末吉refactor-search-capext
parent
ff898fc9b3
commit
f9068beca1
@ -1,3 +1,268 @@
|
|||||||
//! Partitioning even areas by delimitor byte.
|
//! Partitioning even areas by delimitor byte.
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::{
|
||||||
|
num::NonZeroUsize,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Midpoint searcher (forwards & backwards)
|
||||||
|
trait MidpointFBSearcher<T=u8>
|
||||||
|
{
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||||
|
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search the pivot for the needle sequentially.
|
||||||
|
///
|
||||||
|
/// The order of operations will be: `search_forward()?, search_backward()`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct SearchSeq;
|
||||||
|
|
||||||
|
impl MidpointFBSearcher<u8> for SearchSeq
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||||
|
match haystack.split_at(begin) {
|
||||||
|
([], []) => None,
|
||||||
|
([], x) => self.search_forward(x, needle),
|
||||||
|
(x, []) => self.search_backward(x, needle),
|
||||||
|
|
||||||
|
(x, y) => {
|
||||||
|
self.search_forward(y, needle)?;
|
||||||
|
self.search_backward(x, needle)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(feature="async")]
|
||||||
|
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct SearchAsync<F>
|
||||||
|
{
|
||||||
|
spawn_task: F,
|
||||||
|
result: oneshot::Receiver<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads-async")]
|
||||||
|
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
|
||||||
|
where F: Fn() -> Fu,
|
||||||
|
Fu: futures::Future + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// Search in parallel.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
/// This search operation is heavy. It **always** spawns its own (up to) two threads when `search_combined()` is invoked. This may not be ideal...
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
struct SearchPar
|
||||||
|
{
|
||||||
|
cap_start: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// For f/b pivot-searching, the max area for each operation to attempt.
|
||||||
|
const DEFAULT_PIVOT_MAX_SEARCH_AREA: usize = 1024;
|
||||||
|
/// For f/b pivot-searching, the max *possible* area for each operation to attempt when it grows in capacity
|
||||||
|
const DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA: usize = (1024 * 1024 * 1024) * 2; // 2GB
|
||||||
|
|
||||||
|
/// The number of pages of memory loaded where non-page-bound operations assume they're using HP-mapped data.
|
||||||
|
const DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES: usize = 4;
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
/// Real system page size.
|
||||||
|
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
|
||||||
|
use std::ffi::c_int;
|
||||||
|
extern "C" {
|
||||||
|
fn getpagesize() -> c_int;
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
getpagesize()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
/// Get a recommended bound for the pivot search area (if there is one.)
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The recommended max bound for a pivot search area, or `None` for unbounded search.
|
||||||
|
///
|
||||||
|
/// # Page kinds
|
||||||
|
/// If the operation is using huge-page mapped memory, set `use_hp` to true.
|
||||||
|
#[inline]
|
||||||
|
fn get_max_pivot_search_area(_use_hp: bool) -> Option<NonZeroUsize>
|
||||||
|
{
|
||||||
|
use std::ffi::c_int;
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
static ref PAGE_SIZE: usize = {
|
||||||
|
match *REAL_PAGE_SIZE {
|
||||||
|
c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA,
|
||||||
|
// Very large (hp)
|
||||||
|
very_large if very_large as usize > DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA => 0,
|
||||||
|
// Large (limit to upper bound of non-hp)
|
||||||
|
large if large as usize >= DEFAULT_PIVOT_MAX_SEARCH_AREA => std::cmp::min(large as usize, DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA),
|
||||||
|
// Smaller than default bound
|
||||||
|
small => small as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
//XXX: Should we return a different value if `use_hp` is enabled? ("using hugepage")
|
||||||
|
NonZeroUsize::new(*PAGE_SIZE)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SearchPar {
|
||||||
|
#[inline(always)]
|
||||||
|
pub const fn new() -> Self
|
||||||
|
{
|
||||||
|
Self::with_capacity(DEFAULT_PIVOT_MAX_SEARCH_AREA)
|
||||||
|
}
|
||||||
|
#[inline]
|
||||||
|
pub const fn with_capacity(cap_start: usize) -> Self
|
||||||
|
{
|
||||||
|
Self { cap_start }
|
||||||
|
}
|
||||||
|
#[inline]
|
||||||
|
pub const fn cap(&self) -> usize
|
||||||
|
{
|
||||||
|
self.cap_start
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub unsafe fn cap_mut(&mut self) -> &mut usize
|
||||||
|
{
|
||||||
|
&mut self.cap_start
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SearchPar
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn default() -> Self
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
cap_start: match *REAL_PAGE_SIZE {
|
||||||
|
std::ffi::c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA,
|
||||||
|
above_zero => above_zero as usize,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads")]
|
||||||
|
impl MidpointFBSearcher<u8> for SearchPar
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||||
|
|
||||||
|
let complete = crossbeam::atomic::AtomicCell::new(false);
|
||||||
|
std::thread::scope(|s| {
|
||||||
|
//let mut complete_val = UnsafeCell::new(false);
|
||||||
|
//let complete: parking_lot::Once = parking_lot::Once::new();
|
||||||
|
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
|
||||||
|
|
||||||
|
let (mut hb, mut hf) = haystack.split_at(begin);
|
||||||
|
|
||||||
|
let max_cap = match get_max_pivot_search_area(hf.len() > (DEFAULT_PIVOT_MAX_SEARCH_AREA * DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES)){ // Assume huge-page memory if len is larger than 4 pages.
|
||||||
|
|
||||||
|
// On debug builds, cap search area to one system page only.
|
||||||
|
_ignore if cfg!(debug_assertions) && (*REAL_PAGE_SIZE) > 0 =>
|
||||||
|
// SAFETY: We have checked if `*REAL_PAGE_SIZE` is non-zero above.
|
||||||
|
Some(unsafe { NonZeroUsize::new_unchecked(*REAL_PAGE_SIZE as usize) }),
|
||||||
|
// Otherwise, use the detected value.
|
||||||
|
cap => cap,
|
||||||
|
};
|
||||||
|
|
||||||
|
let forward = if hf.len() > 0 {
|
||||||
|
let cap = self.cap_start;
|
||||||
|
let sf = &self;
|
||||||
|
let complete = &complete;
|
||||||
|
Some(s.spawn(move || -> Option<_> {
|
||||||
|
let mut cap = std::cmp::min(cap, hf.len());
|
||||||
|
let len = hf.len();
|
||||||
|
while cap <= len {
|
||||||
|
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
|
||||||
|
complete.store(true);
|
||||||
|
//complete.call_once(|| complete_val = true);
|
||||||
|
return Some(x);
|
||||||
|
} else if complete.load() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Cut out `cap` bytes from the start.
|
||||||
|
hf = &hf[cap..];
|
||||||
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||||
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||||
|
}
|
||||||
|
None::<&'a u8>
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let backward = if hb.len() > 0 {
|
||||||
|
let cap = self.cap_start;
|
||||||
|
let sf = &self;
|
||||||
|
let complete = &complete;
|
||||||
|
Some(s.spawn(move || -> Option<_> {
|
||||||
|
let mut cap = std::cmp::min(cap, hb.len());
|
||||||
|
let len = hb.len();
|
||||||
|
while cap <= len {
|
||||||
|
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
|
||||||
|
complete.store(true);
|
||||||
|
//complete.call_once(|| complete_val = true);
|
||||||
|
return Some(x);
|
||||||
|
} else if complete.load() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Cut out `cap` bytes from the end.
|
||||||
|
hb = &hb[..cap];
|
||||||
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||||
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||||
|
}
|
||||||
|
None::<&'a u8>
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
match (forward, backward) {
|
||||||
|
(None, None) => None,
|
||||||
|
(None, Some(back)) => back.join().unwrap_or(None),
|
||||||
|
(Some(forward), None) => forward.join().unwrap_or(None),
|
||||||
|
(Some(forward), Some(backward)) => forward.join().unwrap_or(None).
|
||||||
|
or_else(move || backward.join().unwrap_or(None)),
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
|
||||||
|
where S: MidpointFBSearcher<u8>
|
||||||
|
{
|
||||||
|
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
|
||||||
|
}
|
||||||
|
|
||||||
|
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
|
||||||
|
|
||||||
|
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
|
||||||
|
Loading…
Reference in new issue