Compare commits

...

2 Commits

Author SHA1 Message Date
Avril 9d2ae29e8b
part: `SearchSeq` now uses capacity-extended-switching between forward and backward searching (like `SearchPar` does.)
1 month ago
Avril f9068beca1
part: started `partition_one_with()`: Perform one nearest-needle-to-half partition on a buffer. The behaviour of this partition can be controlled with its `method` parameter. There are currently two: `SearchSeq` - search forward then backward in sequence. `SearchPar` - search forward and backward in parallel (NOTE: `SearchPar` is heavy and not well optimised, it will *always* spawn at least one thread.)
1 month ago

34
Cargo.lock generated

@ -184,6 +184,28 @@ version = "0.1.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68bc55329711cd719c2687bb147bc06211b0521f97ef398280108ccb23227e9"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
@ -360,6 +382,9 @@ name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin",
]
[[package]]
name = "libc"
@ -553,8 +578,9 @@ dependencies = [
"bytes",
"clap",
"color-eyre",
"crossbeam-queue",
"crossbeam",
"futures",
"lazy_static",
"mapped-file",
"memchr",
"num_cpus",
@ -639,6 +665,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"

@ -7,14 +7,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mapped-file"]
default = ["mapped-file", "threads"]
unstable = ["parking_lot?/nightly", "memchr/compiler_builtins", "crossbeam-queue?/nightly", "futures?/unstable", "color-eyre/track-caller"]
unstable = ["parking_lot?/nightly", "memchr/compiler_builtins", "crossbeam?/nightly", "futures?/unstable", "color-eyre/track-caller"]
# Enables parallelising the operation where possible.
#
# Cli config options can control this, but in default auto setting the process will decide if parallelisation is worth using on the dimensions of the provided input and system.
threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam-queue", "dep:num_cpus", "threads-async"]
threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam", "dep:num_cpus", "threads-async"]
# Attempt `mmap()`ping the input instead of reading it.
# If the output can be mapped, it may also be.
@ -22,9 +22,9 @@ threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam-queue", "dep:num_cpus"
# Useable on unix-family systems. (TODO: Make this dependency target-dependent for unix instead of a manual feature addition like this.)
mapped-file = ["dep:mapped-file"]
# We are using `tokio_uring` & `tokio-stream` to allow parallel queued writes. (TODO: If we end up using (or needing) this feature for partitioning (e.g. we need a controlled IO thread that takes the data gathered by the parallelised partition system or we need a task pool (NOTE: the task pool will execute on tokio_uring's thread only; rayon's thread pool is being used for auto parallelisation currently) to spawn reverse-reader tasks per partition.) then: Remove the use of crossbeam (XXX: and maybe rayon?), and use Tokio runtime for manual parallelised partitioning/line-reversing.)
# TODO: Make this non-dependent feature of `threads`: use `tokio_uring` tasks for I/O, and pull in `tokio` **instead of** rayon for operations. For now, we'll see if we can get away with single-threaded async for `tokio_uring` usage and `rayon`'s auto parallisation for the chunk partitioning; if not, change this to not be internal.
#
# NOTE: **internal feature**
# We are using `tokio_uring` & `tokio-stream` to allow parallel queued writes. (TODO: If we end up using (or needing) this feature for partitioning (e.g. we need a controlled IO thread that takes the data gathered by the parallelised partition system or we need a task pool (NOTE: the task pool will execute on tokio_uring's thread only; rayon's thread pool is being used for auto parallelisation currently) to spawn reverse-reader tasks per partition.) then: Remove the use of crossbeam (XXX: and maybe rayon?), and use Tokio runtime for manual parallelised partitioning/line-reversing.)
threads-async = ["threads", "dep:futures", "dep:tokio-uring", "dep:tokio-stream"]
#rayon = ["dep:rayon"]
@ -72,8 +72,10 @@ strip=false
bytes = "1.6.0"
clap = { version = "4.5.4", features = ["derive", "string", "unicode", "wrap_help"] }
color-eyre = { version = "0.6.3", default-features = false }
crossbeam-queue = { version = "0.3.11", optional = true }
crossbeam = { version = "0.8.4", optional = true, features = ["crossbeam-queue", "crossbeam-channel", "crossbeam-epoch"] }
#crossbeam-queue = { version = "0.3.11", optional = true }
futures = { version = "0.3.30", optional = true, default-features = false, features = ["alloc", "std", "async-await"] }
lazy_static = { version = "1.4.0", features = ["spin"] }
mapped-file = { version = "0.0.8", features = ["file"], optional = true }
memchr = "2.7.2"
num_cpus = { version = "1.16.0", optional = true }

@ -1,3 +1,354 @@
//! Partitioning even areas by delimitor byte.
use super::*;
use std::{
num::NonZeroUsize,
};
/// Midpoint searcher (forwards & backwards)
trait MidpointFBSearcher<T=u8>
{
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
}
/// Search the pivot for the needle sequentially.
///
/// The order of operations will be: `search_forward()?, search_backward()`.
#[derive(Debug)]
struct SearchSeq;
impl MidpointFBSearcher<u8> for SearchSeq
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline]
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let max_cap = match get_max_pivot_search_area(haystack.len() > (DEFAULT_PIVOT_MAX_SEARCH_AREA * DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES)){ // Assume huge-page memory if len is larger than 4 pages.
// On debug builds, cap search area to one system page only.
_ignore if cfg!(debug_assertions) && (*REAL_PAGE_SIZE) > 0 =>
// SAFETY: We have checked if `*REAL_PAGE_SIZE` is non-zero above.
Some(unsafe { NonZeroUsize::new_unchecked(*REAL_PAGE_SIZE as usize) }),
// Otherwise, use the detected value.
cap => cap,
};
match haystack.split_at(begin) {
([], []) => None,
([], x) => self.search_forward(x, needle),
(x, []) => self.search_backward(x, needle),
// If both the buffers are lower than `max_cap`, just do the entire operation on each
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
self.search_forward(y, needle)?;
self.search_backward(x, needle)
},
(mut x, mut y) => {
let len = std::cmp::min(x.len(), y.len());
let mut cap = std::cmp::min(len, DEFAULT_PIVOT_MAX_SEARCH_AREA);
while cap <= len {
// If cap is too large for one (or more) of the buffers, truncate it.
if cap > y.len() || cap > x.len() {
cap = std::cmp::min(y.len(), x.len());
}
// Search forwards in `y`. (up to `cap`)
if let Some(y) = self.search_forward(&y[..cap], needle) {
return Some(y);
}
// Search backwards in `x`. (down to `cap`)
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
return Some(x);
}
// Cut out `cap` bytes from the start of forwards
y = &y[cap..];
// Cut out `cap` bytes from the end of backwards.
x = &x[..cap];
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
None
}
}
}
}
#[cfg(feature="async")]
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
#[derive(Debug, Clone)]
struct SearchAsync<F>
{
spawn_task: F,
result: oneshot::Receiver<usize>,
}
#[cfg(feature="threads-async")]
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
where F: Fn() -> Fu,
Fu: futures::Future + Send + Sync + 'static
{
}
};
/// Search in parallel.
///
/// # Warning
/// This search operation is heavy. It **always** spawns its own (up to) two threads when `search_combined()` is invoked. This may not be ideal...
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SearchPar
{
cap_start: usize,
}
/// For f/b pivot-searching, the max area for each operation to attempt.
const DEFAULT_PIVOT_MAX_SEARCH_AREA: usize = 1024;
/// For f/b pivot-searching, the max *possible* area for each operation to attempt when it grows in capacity
const DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA: usize = (1024 * 1024 * 1024) * 2; // 2GB
/// The number of pages of memory loaded where non-page-bound operations assume they're using HP-mapped data.
const DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES: usize = 4;
lazy_static::lazy_static! {
/// Real system page size.
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
use std::ffi::c_int;
extern "C" {
fn getpagesize() -> c_int;
}
unsafe {
getpagesize()
}
};
}
/// Get a recommended bound for the pivot search area (if there is one.)
///
/// # Returns
/// The recommended max bound for a pivot search area, or `None` for unbounded search.
///
/// # Page kinds
/// If the operation is using huge-page mapped memory, set `use_hp` to true.
#[inline]
fn get_max_pivot_search_area(_use_hp: bool) -> Option<NonZeroUsize>
{
use std::ffi::c_int;
lazy_static::lazy_static! {
static ref PAGE_SIZE: usize = {
match *REAL_PAGE_SIZE {
c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA,
// Very large (hp)
very_large if very_large as usize > DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA => 0,
// Large (limit to upper bound of non-hp)
large if large as usize >= DEFAULT_PIVOT_MAX_SEARCH_AREA => std::cmp::min(large as usize, DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA),
// Smaller than default bound
small => small as usize
}
};
}
//XXX: Should we return a different value if `use_hp` is enabled? ("using hugepage")
NonZeroUsize::new(*PAGE_SIZE)
}
impl SearchPar {
#[inline(always)]
pub const fn new() -> Self
{
Self::with_capacity(DEFAULT_PIVOT_MAX_SEARCH_AREA)
}
#[inline]
pub const fn with_capacity(cap_start: usize) -> Self
{
Self { cap_start }
}
#[inline]
pub const fn cap(&self) -> usize
{
self.cap_start
}
#[inline(always)]
pub unsafe fn cap_mut(&mut self) -> &mut usize
{
&mut self.cap_start
}
}
impl Default for SearchPar
{
#[inline]
fn default() -> Self
{
Self {
cap_start: match *REAL_PAGE_SIZE {
std::ffi::c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA,
above_zero => above_zero as usize,
},
}
}
}
#[cfg(feature="threads")]
impl MidpointFBSearcher<u8> for SearchPar
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let complete = crossbeam::atomic::AtomicCell::new(false);
std::thread::scope(|s| {
//let mut complete_val = UnsafeCell::new(false);
//let complete: parking_lot::Once = parking_lot::Once::new();
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
let (mut hb, mut hf) = haystack.split_at(begin);
let max_cap = match get_max_pivot_search_area(hf.len() > (DEFAULT_PIVOT_MAX_SEARCH_AREA * DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES)){ // Assume huge-page memory if len is larger than 4 pages.
// On debug builds, cap search area to one system page only.
_ignore if cfg!(debug_assertions) && (*REAL_PAGE_SIZE) > 0 =>
// SAFETY: We have checked if `*REAL_PAGE_SIZE` is non-zero above.
Some(unsafe { NonZeroUsize::new_unchecked(*REAL_PAGE_SIZE as usize) }),
// Otherwise, use the detected value.
cap => cap,
};
// Cap the cap to `max_cap` if there is a max cap.
let cap = if let Some(max) = max_cap.as_ref() {
std::cmp::min(max.get(), self.cap_start)
} else {
self.cap_start
};
let forward = if hf.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
Some(s.spawn(move || -> Option<_> {
let mut cap = std::cmp::min(cap, hf.len());
let len = hf.len();
while cap <= len {
// If `cap` is larger than the buffer `hf`, truncate it.
cap = std::cmp::min(cap, hf.len());
// Search forward in `hf` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
complete.store(true);
//complete.call_once(|| complete_val = true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the start.
hf = &hf[cap..];
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
None::<&'a u8>
}))
} else {
None
};
let backward = if hb.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
Some(s.spawn(move || -> Option<_> {
let mut cap = std::cmp::min(cap, hb.len());
let len = hb.len();
while cap <= len {
// If `cap` is larger than the buffer `hb`, truncate it.
cap = std::cmp::min(cap, hb.len());
// Search backwards in `hb` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
complete.store(true);
//complete.call_once(|| complete_val = true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the end.
hb = &hb[..cap];
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
None::<&'a u8>
}))
} else {
None
};
match (forward, backward) {
(None, None) => None,
(None, Some(back)) => back.join().unwrap_or(None),
(Some(forward), None) => forward.join().unwrap_or(None),
(Some(forward), Some(backward)) => forward.join().unwrap_or(None).
or_else(move || backward.join().unwrap_or(None)),
}
})
}
}
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
where S: MidpointFBSearcher<u8>
{
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
}
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
#[cfg(test)]
mod test
{
#[test]
fn partition_seq()
{
todo!("Test `SearchSeq` sequential partition searcher")
}
#[cfg(feature="threads")]
#[test]
fn partition_par_heavy()
{
todo!("Test `SearchPar` parallel partition searcher")
}
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
#[cfg(all(feature="threads-async", feature = "threads"))]
#[test]
fn partition_par_light()
{
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
}
#[cfg(feature="threads-async")]
#[/*tokio::*/test]
fn partition_par_async()
{
unimplemented!("A pure async parallel searcher has not yet been implemented")
}
}

Loading…
Cancel
Save