You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
reverse/src/part.rs

427 lines
14 KiB

//! Partitioning even areas by delimitor byte.
use super::*;
use std::{
num::NonZeroUsize,
};
/// Size of one cache-line.
///
/// NOTE: alignment padded for `u8`.
///
/// TODO: Make this comptime env-var configurable (`option_env!()`) on debug builds. (See `SEARCH_CAP_GROW`.)
const CACHELINE_SIZE: usize = std::mem::size_of::<crossbeam_utils::CachePadded<u8>>();
/// A buffer that takes up exactly one cache-line.
///
/// This type is not `Copy` to ensure copies are made safely. `clone()` is trivial, and to copy explicitly in a const context use `.copied()`
///
/// # Alignment
/// Note that the buffer is *not* 1-cacheline aligned itself by default.
/// To ensure its alignment, you should use `crossbeam_utils::CachePadded<CachelineBuffer>` (or the type-alias `AlignedCachelineBuffer`.)
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub struct CachelineBuffer([u8; CACHELINE_SIZE]);
impl Default for CachelineBuffer
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
/// A buffer that takes up exactly one cache-line, which is itself aligned to 1 cacheline.
pub type AlignedCachelineBuffer = crossbeam_utils::CachePadded<CachelineBuffer>;
impl CachelineBuffer {
/// The size of the buffer (1 cacheline of bytes.)
pub const SIZE: usize = CACHELINE_SIZE;
/// Create a new, empty buffer.
#[inline]
pub const fn new() -> Self
{
Self([0; Self::SIZE])
}
/// Clone this value.
///
/// This is a `const fn` explicit trivial copy of the data.
#[inline]
pub const fn copied(&self) -> Self
{
Self(self.0)
}
/// Get a reference to the byte array.
pub const fn as_bytes(&self) -> &[u8; Self::SIZE]
{
&self.0
}
}
forward_newtype!(mut CachelineBuffer => [u8], 0);
forward_newtype!(move const CachelineBuffer => [u8; CACHELINE_SIZE], 0);
const _: () = {
debug_assert!(CachelineBuffer::SIZE > std::mem::size_of::<u8>(), "Invalid cacheline-padding size (`CACHELINE_SIZE`)");
//debug_assert!(CACHELINE_SIZE == 128, "Unexpected `CACHELINE_SIZE`");
};
/// Grow capacity exponentially when search fails.
///
/// TODO: Make this comptime env-var settable (`option_env!()`) on debug builds.
const SEARCH_CAP_GROW: bool = true;
/// Settings for a searcher (memory search method configuration.)
///
/// The default values provided to implementors are globally controlled and (debug-build only) env-var configurable (for benchmarking purposes.)
trait SynchonousSearcher {
/// Initial size of capacity
const CAP_SIZE: usize = CACHELINE_SIZE;
/// Should the capacity be grown on failed search?
const CAP_GROW: bool = SEARCH_CAP_GROW;
}
// Default impl global compiled capacity settings for each.
impl SynchonousSearcher for SearchPar {}
impl SynchonousSearcher for SearchSeq {}
/// Midpoint searcher (forwards & backwards)
trait MidpointFBSearcher<T=u8>: SynchonousSearcher
{
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
}
/// Search the pivot for the needle sequentially.
///
/// The order of operations will be: `search_forward()?, search_backward()`.
#[derive(Debug, Clone, Default)]
struct SearchSeq;
#[inline]
fn get_max_cap_for_search_area(size: usize) -> Option<NonZeroUsize>
{
SYS_PAGE_SIZE.and_then(move |page| if size == 0 {
// Size is unknown, bound by page.
Some(page)
} else if size >= (page.get() << 2) {
// Size is huge, bound by page ^2
NonZeroUsize::new(page.get() << 1)
} else if size >= page.get() {
// Size is larger than page, bound by page.
Some(page)
} else {
// If the area size is lower than one page, do not bound the capacity growth.
None
})
}
impl MidpointFBSearcher<u8> for SearchSeq
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline]
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let max_cap = get_max_cap_for_search_area(haystack.len());
match haystack.split_at(begin) {
([], []) => None,
([], x) => self.search_forward(x, needle),
(x, []) => self.search_backward(x, needle),
// If both the buffers are lower than `max_cap`, just do the entire operation on each
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
self.search_forward(y, needle)?;
self.search_backward(x, needle)
},
(mut x, mut y) => {
let len = std::cmp::min(x.len(), y.len());
let mut cap = std::cmp::min(len, Self::CAP_SIZE);
if let Some(&max) = max_cap.as_ref() {
// Bound `cap` to `max_cap` if it is set.
cap = std::cmp::min(cap, max.get());
}
while cap <= len {
// If cap is too large for one (or more) of the buffers, truncate it.
if cap > y.len() || cap > x.len() {
cap = std::cmp::min(y.len(), x.len());
}
// Search forwards in `y`. (up to `cap`)
if let Some(y) = self.search_forward(&y[..cap], needle) {
return Some(y);
}
// Search backwards in `x`. (down to `cap`)
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
return Some(x);
}
// Cut out `cap` bytes from the start of forwards
y = &y[cap..];
// Cut out `cap` bytes from the end of backwards.
x = &x[..cap];
if Self::CAP_GROW {
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
}
None
}
}
}
}
#[cfg(feature="async")]
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
#[derive(Debug, Clone)]
struct SearchAsync<F>
{
spawn_task: F,
result: oneshot::Receiver<usize>,
}
#[cfg(feature="threads-async")]
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
where F: Fn() -> Fu,
Fu: futures::Future + Send + Sync + 'static
{
}
};
/// Search in parallel.
///
/// # Warning
/// This search operation is heavy. It **always** spawns its own 2nd thread when `search_combined()` is invoked.
/// This may not be ideal... A lighter, thread-pool (async) or thread-reusing (sync) API would be better. (See below.)
#[derive(Debug, Clone, Default)]
struct SearchPar;
lazy_static::lazy_static! {
/// Real system page size (raw.)
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
use std::ffi::c_int;
extern "C" {
fn getpagesize() -> c_int;
}
unsafe {
getpagesize()
}
};
/// System page size.
///
/// If the page size returned from `getpagesize()` (`REAL_PAGE_SIZE`) was invalid (below-or-equal to 0,) `None` will be returned.
static ref SYS_PAGE_SIZE: Option<NonZeroUsize> = {
match *REAL_PAGE_SIZE {
std::ffi::c_int::MIN..=0 => None,
// SAFETY: We have masked out `0` in the above branch.
rest => unsafe {
debug_assert!(usize::try_from(rest).is_ok(), "Page size `c_int` out of range of system `usize`??? (Got {})", rest);
Some(NonZeroUsize::new_unchecked(rest as usize))
}
}
};
}
#[cfg(feature="threads")]
impl MidpointFBSearcher<u8> for SearchPar
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let complete = crossbeam::atomic::AtomicCell::new(false);
std::thread::scope(|s| {
//let mut complete_val = UnsafeCell::new(false);
//let complete: parking_lot::Once = parking_lot::Once::new();
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
let (mut hb, mut hf) = haystack.split_at(begin);
let max_cap = get_max_cap_for_search_area(haystack.len());
// Cap the cap to `max_cap` if there is a max cap.
let cap = if let Some(max) = max_cap.as_ref() {
std::cmp::min(max.get(), Self::CAP_SIZE)
} else {
Self::CAP_SIZE
};
let forward = if hf.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
// Background thread: Forward search (`forward-searcher`.)
Some(std::thread::Builder::new().name("forward-searcher".into()).spawn_scoped(s, move || -> Option<_> {
let mut cap = std::cmp::min(cap, hf.len());
let len = hf.len();
// Check completion before starting loop too.
if complete.load() {
return None;
}
while cap <= len {
// If `cap` is larger than the buffer `hf`, truncate it.
cap = std::cmp::min(cap, hf.len());
// Search forward in `hf` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
// Tell other operation we have found something.
complete.store(true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the start.
hf = &hf[cap..];
if Self::CAP_GROW {
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
}
None::<&'a u8>
}).expect("Failed to spawn forward-searcher thread"))
} else {
None
};
//NOTE: There is no need to spawn another thread for the 2nd operation, since they are both join()'d at the end regardless and both already communicate completion.
let backward = if hb.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
// Main thread: Backwards search.
move || -> Option<_> {
let mut cap = std::cmp::min(cap, hb.len());
let len = hb.len();
// Check completion before starting loop too.
if complete.load() {
return None;
} else {
// Allow previous thread to run if it is not.
std::thread::yield_now();
}
while cap <= len {
// If `cap` is larger than the buffer `hb`, truncate it.
cap = std::cmp::min(cap, hb.len());
// Search backwards in `hb` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
complete.store(true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the end.
hb = &hb[..cap];
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
None::<&'a u8>
}()
} else {
None
};
if backward.is_some() && forward.as_ref().map(|th| !th.is_finished()).unwrap_or(false) {
// `backward` found something, `forward` is still running.
debug_assert_ne!(complete.load(), false, "Complete has not been set! (main thread waiting for forward-searcher thread");
complete.store(true);
}
#[cold]
#[inline(never)]
fn _resume_unwind(e: Box<dyn std::any::Any + Send>) -> Never
{
if cfg!(debug_assertions) {
panic!("forward-searcher thread panic")
} else {
std::panic::resume_unwind(e)
}
}
match (forward, backward) {
(None, None) => None,
(None, back @ Some(_)) => back,
(Some(forward), backward) => backward.or_else(move || forward.join().unwrap_or_panic(_resume_unwind)),
//(Some(forward), Some(_)) => Handled ^
}
})
}
}
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
where S: MidpointFBSearcher<u8>
{
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
}
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
#[cfg(test)]
mod test
{
use super::*;
use std::hint::black_box;
//TODO: Add a generic randomised lorem-ipsum-like text data generator & a generic assertion tester that can take a unique `MidpointFBSearcher`.
#[test]
fn partition_seq()
{
todo!("Test `SearchSeq` sequential partition searcher")
}
#[cfg(feature="threads")]
#[test]
fn partition_par_heavy()
{
todo!("Test `SearchPar` parallel partition searcher")
}
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
#[cfg(all(feature="threads-async", feature = "threads"))]
#[test]
fn partition_par_light()
{
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
}
#[cfg(feature="threads-async")]
#[/*tokio::*/test]
fn partition_par_async()
{
unimplemented!("A pure async parallel searcher has not yet been implemented")
}
//TODO: Benchmarking the searchers' configuration about capacity size, growth and bounding.
}