You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
427 lines
14 KiB
427 lines
14 KiB
//! Partitioning even areas by delimitor byte.
|
|
use super::*;
|
|
use std::{
|
|
num::NonZeroUsize,
|
|
};
|
|
|
|
/// Size of one cache-line.
|
|
///
|
|
/// NOTE: alignment padded for `u8`.
|
|
///
|
|
/// TODO: Make this comptime env-var configurable (`option_env!()`) on debug builds. (See `SEARCH_CAP_GROW`.)
|
|
const CACHELINE_SIZE: usize = std::mem::size_of::<crossbeam_utils::CachePadded<u8>>();
|
|
|
|
/// A buffer that takes up exactly one cache-line.
|
|
///
|
|
/// This type is not `Copy` to ensure copies are made safely. `clone()` is trivial, and to copy explicitly in a const context use `.copied()`
|
|
///
|
|
/// # Alignment
|
|
/// Note that the buffer is *not* 1-cacheline aligned itself by default.
|
|
/// To ensure its alignment, you should use `crossbeam_utils::CachePadded<CachelineBuffer>` (or the type-alias `AlignedCachelineBuffer`.)
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
|
#[repr(transparent)]
|
|
pub struct CachelineBuffer([u8; CACHELINE_SIZE]);
|
|
|
|
impl Default for CachelineBuffer
|
|
{
|
|
#[inline]
|
|
fn default() -> Self
|
|
{
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
|
|
/// A buffer that takes up exactly one cache-line, which is itself aligned to 1 cacheline.
|
|
pub type AlignedCachelineBuffer = crossbeam_utils::CachePadded<CachelineBuffer>;
|
|
|
|
impl CachelineBuffer {
|
|
/// The size of the buffer (1 cacheline of bytes.)
|
|
pub const SIZE: usize = CACHELINE_SIZE;
|
|
|
|
/// Create a new, empty buffer.
|
|
#[inline]
|
|
pub const fn new() -> Self
|
|
{
|
|
Self([0; Self::SIZE])
|
|
}
|
|
|
|
/// Clone this value.
|
|
///
|
|
/// This is a `const fn` explicit trivial copy of the data.
|
|
#[inline]
|
|
pub const fn copied(&self) -> Self
|
|
{
|
|
Self(self.0)
|
|
}
|
|
|
|
/// Get a reference to the byte array.
|
|
pub const fn as_bytes(&self) -> &[u8; Self::SIZE]
|
|
{
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
forward_newtype!(mut CachelineBuffer => [u8], 0);
|
|
forward_newtype!(move const CachelineBuffer => [u8; CACHELINE_SIZE], 0);
|
|
|
|
const _: () = {
|
|
debug_assert!(CachelineBuffer::SIZE > std::mem::size_of::<u8>(), "Invalid cacheline-padding size (`CACHELINE_SIZE`)");
|
|
//debug_assert!(CACHELINE_SIZE == 128, "Unexpected `CACHELINE_SIZE`");
|
|
};
|
|
|
|
/// Grow capacity exponentially when search fails.
|
|
///
|
|
/// TODO: Make this comptime env-var settable (`option_env!()`) on debug builds.
|
|
const SEARCH_CAP_GROW: bool = true;
|
|
|
|
/// Settings for a searcher (memory search method configuration.)
|
|
///
|
|
/// The default values provided to implementors are globally controlled and (debug-build only) env-var configurable (for benchmarking purposes.)
|
|
trait SynchonousSearcher {
|
|
/// Initial size of capacity
|
|
const CAP_SIZE: usize = CACHELINE_SIZE;
|
|
|
|
/// Should the capacity be grown on failed search?
|
|
const CAP_GROW: bool = SEARCH_CAP_GROW;
|
|
}
|
|
|
|
// Default impl global compiled capacity settings for each.
|
|
impl SynchonousSearcher for SearchPar {}
|
|
impl SynchonousSearcher for SearchSeq {}
|
|
|
|
/// Midpoint searcher (forwards & backwards)
|
|
trait MidpointFBSearcher<T=u8>: SynchonousSearcher
|
|
{
|
|
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
|
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
|
|
|
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
|
|
}
|
|
|
|
/// Search the pivot for the needle sequentially.
|
|
///
|
|
/// The order of operations will be: `search_forward()?, search_backward()`.
|
|
#[derive(Debug, Clone, Default)]
|
|
struct SearchSeq;
|
|
|
|
#[inline]
|
|
fn get_max_cap_for_search_area(size: usize) -> Option<NonZeroUsize>
|
|
{
|
|
SYS_PAGE_SIZE.and_then(move |page| if size == 0 {
|
|
// Size is unknown, bound by page.
|
|
Some(page)
|
|
} else if size >= (page.get() << 2) {
|
|
// Size is huge, bound by page ^2
|
|
NonZeroUsize::new(page.get() << 1)
|
|
} else if size >= page.get() {
|
|
// Size is larger than page, bound by page.
|
|
Some(page)
|
|
} else {
|
|
// If the area size is lower than one page, do not bound the capacity growth.
|
|
None
|
|
})
|
|
}
|
|
|
|
impl MidpointFBSearcher<u8> for SearchSeq
|
|
{
|
|
#[inline(always)]
|
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
|
}
|
|
#[inline(always)]
|
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
|
}
|
|
|
|
#[inline]
|
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
|
let max_cap = get_max_cap_for_search_area(haystack.len());
|
|
|
|
match haystack.split_at(begin) {
|
|
([], []) => None,
|
|
([], x) => self.search_forward(x, needle),
|
|
(x, []) => self.search_backward(x, needle),
|
|
|
|
// If both the buffers are lower than `max_cap`, just do the entire operation on each
|
|
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
|
|
self.search_forward(y, needle)?;
|
|
self.search_backward(x, needle)
|
|
},
|
|
|
|
(mut x, mut y) => {
|
|
let len = std::cmp::min(x.len(), y.len());
|
|
let mut cap = std::cmp::min(len, Self::CAP_SIZE);
|
|
|
|
if let Some(&max) = max_cap.as_ref() {
|
|
// Bound `cap` to `max_cap` if it is set.
|
|
cap = std::cmp::min(cap, max.get());
|
|
}
|
|
|
|
while cap <= len {
|
|
// If cap is too large for one (or more) of the buffers, truncate it.
|
|
if cap > y.len() || cap > x.len() {
|
|
cap = std::cmp::min(y.len(), x.len());
|
|
}
|
|
|
|
// Search forwards in `y`. (up to `cap`)
|
|
if let Some(y) = self.search_forward(&y[..cap], needle) {
|
|
return Some(y);
|
|
}
|
|
// Search backwards in `x`. (down to `cap`)
|
|
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
|
|
return Some(x);
|
|
}
|
|
|
|
// Cut out `cap` bytes from the start of forwards
|
|
y = &y[cap..];
|
|
// Cut out `cap` bytes from the end of backwards.
|
|
x = &x[..cap];
|
|
|
|
if Self::CAP_GROW {
|
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
|
}
|
|
}
|
|
None
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
#[cfg(feature="async")]
|
|
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
|
|
#[derive(Debug, Clone)]
|
|
struct SearchAsync<F>
|
|
{
|
|
spawn_task: F,
|
|
result: oneshot::Receiver<usize>,
|
|
}
|
|
|
|
#[cfg(feature="threads-async")]
|
|
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
|
|
where F: Fn() -> Fu,
|
|
Fu: futures::Future + Send + Sync + 'static
|
|
{
|
|
|
|
}
|
|
};
|
|
|
|
|
|
/// Search in parallel.
|
|
///
|
|
/// # Warning
|
|
/// This search operation is heavy. It **always** spawns its own 2nd thread when `search_combined()` is invoked.
|
|
/// This may not be ideal... A lighter, thread-pool (async) or thread-reusing (sync) API would be better. (See below.)
|
|
#[derive(Debug, Clone, Default)]
|
|
struct SearchPar;
|
|
|
|
lazy_static::lazy_static! {
|
|
/// Real system page size (raw.)
|
|
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
|
|
use std::ffi::c_int;
|
|
extern "C" {
|
|
fn getpagesize() -> c_int;
|
|
}
|
|
unsafe {
|
|
getpagesize()
|
|
}
|
|
};
|
|
/// System page size.
|
|
///
|
|
/// If the page size returned from `getpagesize()` (`REAL_PAGE_SIZE`) was invalid (below-or-equal to 0,) `None` will be returned.
|
|
static ref SYS_PAGE_SIZE: Option<NonZeroUsize> = {
|
|
match *REAL_PAGE_SIZE {
|
|
std::ffi::c_int::MIN..=0 => None,
|
|
// SAFETY: We have masked out `0` in the above branch.
|
|
rest => unsafe {
|
|
debug_assert!(usize::try_from(rest).is_ok(), "Page size `c_int` out of range of system `usize`??? (Got {})", rest);
|
|
Some(NonZeroUsize::new_unchecked(rest as usize))
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[cfg(feature="threads")]
|
|
impl MidpointFBSearcher<u8> for SearchPar
|
|
{
|
|
#[inline(always)]
|
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
|
}
|
|
#[inline(always)]
|
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
|
}
|
|
|
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
|
|
|
let complete = crossbeam::atomic::AtomicCell::new(false);
|
|
std::thread::scope(|s| {
|
|
//let mut complete_val = UnsafeCell::new(false);
|
|
//let complete: parking_lot::Once = parking_lot::Once::new();
|
|
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
|
|
|
|
let (mut hb, mut hf) = haystack.split_at(begin);
|
|
|
|
let max_cap = get_max_cap_for_search_area(haystack.len());
|
|
|
|
// Cap the cap to `max_cap` if there is a max cap.
|
|
let cap = if let Some(max) = max_cap.as_ref() {
|
|
std::cmp::min(max.get(), Self::CAP_SIZE)
|
|
} else {
|
|
Self::CAP_SIZE
|
|
};
|
|
|
|
let forward = if hf.len() > 0 {
|
|
let cap = cap;
|
|
let sf = &self;
|
|
let complete = &complete;
|
|
// Background thread: Forward search (`forward-searcher`.)
|
|
Some(std::thread::Builder::new().name("forward-searcher".into()).spawn_scoped(s, move || -> Option<_> {
|
|
let mut cap = std::cmp::min(cap, hf.len());
|
|
let len = hf.len();
|
|
|
|
// Check completion before starting loop too.
|
|
if complete.load() {
|
|
return None;
|
|
}
|
|
while cap <= len {
|
|
// If `cap` is larger than the buffer `hf`, truncate it.
|
|
cap = std::cmp::min(cap, hf.len());
|
|
// Search forward in `hf` up to `cap` bytes.
|
|
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
|
|
// Tell other operation we have found something.
|
|
complete.store(true);
|
|
return Some(x);
|
|
} else if complete.load() {
|
|
break;
|
|
}
|
|
// Cut out `cap` bytes from the start.
|
|
hf = &hf[cap..];
|
|
if Self::CAP_GROW {
|
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
|
}
|
|
}
|
|
None::<&'a u8>
|
|
}).expect("Failed to spawn forward-searcher thread"))
|
|
} else {
|
|
None
|
|
};
|
|
//NOTE: There is no need to spawn another thread for the 2nd operation, since they are both join()'d at the end regardless and both already communicate completion.
|
|
let backward = if hb.len() > 0 {
|
|
let cap = cap;
|
|
let sf = &self;
|
|
let complete = &complete;
|
|
|
|
// Main thread: Backwards search.
|
|
move || -> Option<_> {
|
|
let mut cap = std::cmp::min(cap, hb.len());
|
|
let len = hb.len();
|
|
|
|
// Check completion before starting loop too.
|
|
if complete.load() {
|
|
return None;
|
|
} else {
|
|
// Allow previous thread to run if it is not.
|
|
std::thread::yield_now();
|
|
}
|
|
while cap <= len {
|
|
// If `cap` is larger than the buffer `hb`, truncate it.
|
|
cap = std::cmp::min(cap, hb.len());
|
|
// Search backwards in `hb` up to `cap` bytes.
|
|
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
|
|
complete.store(true);
|
|
return Some(x);
|
|
} else if complete.load() {
|
|
break;
|
|
}
|
|
// Cut out `cap` bytes from the end.
|
|
hb = &hb[..cap];
|
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
|
}
|
|
None::<&'a u8>
|
|
}()
|
|
} else {
|
|
None
|
|
};
|
|
if backward.is_some() && forward.as_ref().map(|th| !th.is_finished()).unwrap_or(false) {
|
|
// `backward` found something, `forward` is still running.
|
|
debug_assert_ne!(complete.load(), false, "Complete has not been set! (main thread waiting for forward-searcher thread");
|
|
complete.store(true);
|
|
}
|
|
|
|
#[cold]
|
|
#[inline(never)]
|
|
fn _resume_unwind(e: Box<dyn std::any::Any + Send>) -> Never
|
|
{
|
|
if cfg!(debug_assertions) {
|
|
panic!("forward-searcher thread panic")
|
|
} else {
|
|
std::panic::resume_unwind(e)
|
|
}
|
|
}
|
|
|
|
match (forward, backward) {
|
|
(None, None) => None,
|
|
(None, back @ Some(_)) => back,
|
|
(Some(forward), backward) => backward.or_else(move || forward.join().unwrap_or_panic(_resume_unwind)),
|
|
//(Some(forward), Some(_)) => Handled ^
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
|
|
where S: MidpointFBSearcher<u8>
|
|
{
|
|
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
|
|
}
|
|
|
|
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
|
|
|
|
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
|
|
|
|
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
|
|
#[cfg(test)]
|
|
mod test
|
|
{
|
|
use super::*;
|
|
use std::hint::black_box;
|
|
//TODO: Add a generic randomised lorem-ipsum-like text data generator & a generic assertion tester that can take a unique `MidpointFBSearcher`.
|
|
|
|
#[test]
|
|
fn partition_seq()
|
|
{
|
|
todo!("Test `SearchSeq` sequential partition searcher")
|
|
}
|
|
|
|
#[cfg(feature="threads")]
|
|
#[test]
|
|
fn partition_par_heavy()
|
|
{
|
|
todo!("Test `SearchPar` parallel partition searcher")
|
|
}
|
|
|
|
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
|
|
#[cfg(all(feature="threads-async", feature = "threads"))]
|
|
#[test]
|
|
fn partition_par_light()
|
|
{
|
|
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
|
|
}
|
|
|
|
#[cfg(feature="threads-async")]
|
|
#[/*tokio::*/test]
|
|
fn partition_par_async()
|
|
{
|
|
unimplemented!("A pure async parallel searcher has not yet been implemented")
|
|
}
|
|
|
|
//TODO: Benchmarking the searchers' configuration about capacity size, growth and bounding.
|
|
}
|