diff --git a/Cargo.lock b/Cargo.lock index 735484d..3f99d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,28 @@ version = "0.1.108" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d68bc55329711cd719c2687bb147bc06211b0521f97ef398280108ccb23227e9" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -360,6 +382,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin", +] [[package]] name = "libc" @@ -553,8 +578,9 @@ dependencies = [ "bytes", "clap", "color-eyre", - "crossbeam-queue", + "crossbeam", "futures", + "lazy_static", "mapped-file", "memchr", "num_cpus", @@ -639,6 +665,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index ac9d24c..212e9e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,14 +7,14 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["mapped-file"] +default = ["mapped-file", "threads"] -unstable = ["parking_lot?/nightly", "memchr/compiler_builtins", "crossbeam-queue?/nightly", "futures?/unstable", "color-eyre/track-caller"] +unstable = ["parking_lot?/nightly", "memchr/compiler_builtins", "crossbeam?/nightly", "futures?/unstable", "color-eyre/track-caller"] # Enables parallelising the operation where possible. # # Cli config options can control this, but in default auto setting the process will decide if parallelisation is worth using on the dimensions of the provided input and system. -threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam-queue", "dep:num_cpus", "threads-async"] +threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam", "dep:num_cpus", "threads-async"] # Attempt `mmap()`ping the input instead of reading it. # If the output can be mapped, it may also be. @@ -22,9 +22,9 @@ threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam-queue", "dep:num_cpus" # Useable on unix-family systems. (TODO: Make this dependency target-dependent for unix instead of a manual feature addition like this.) mapped-file = ["dep:mapped-file"] -# We are using `tokio_uring` & `tokio-stream` to allow parallel queued writes. (TODO: If we end up using (or needing) this feature for partitioning (e.g. we need a controlled IO thread that takes the data gathered by the parallelised partition system or we need a task pool (NOTE: the task pool will execute on tokio_uring's thread only; rayon's thread pool is being used for auto parallelisation currently) to spawn reverse-reader tasks per partition.) then: Remove the use of crossbeam (XXX: and maybe rayon?), and use Tokio runtime for manual parallelised partitioning/line-reversing.) +# TODO: Make this non-dependent feature of `threads`: use `tokio_uring` tasks for I/O, and pull in `tokio` **instead of** rayon for operations. For now, we'll see if we can get away with single-threaded async for `tokio_uring` usage and `rayon`'s auto parallisation for the chunk partitioning; if not, change this to not be internal. # -# NOTE: **internal feature** +# We are using `tokio_uring` & `tokio-stream` to allow parallel queued writes. (TODO: If we end up using (or needing) this feature for partitioning (e.g. we need a controlled IO thread that takes the data gathered by the parallelised partition system or we need a task pool (NOTE: the task pool will execute on tokio_uring's thread only; rayon's thread pool is being used for auto parallelisation currently) to spawn reverse-reader tasks per partition.) then: Remove the use of crossbeam (XXX: and maybe rayon?), and use Tokio runtime for manual parallelised partitioning/line-reversing.) threads-async = ["threads", "dep:futures", "dep:tokio-uring", "dep:tokio-stream"] #rayon = ["dep:rayon"] @@ -72,8 +72,10 @@ strip=false bytes = "1.6.0" clap = { version = "4.5.4", features = ["derive", "string", "unicode", "wrap_help"] } color-eyre = { version = "0.6.3", default-features = false } -crossbeam-queue = { version = "0.3.11", optional = true } +crossbeam = { version = "0.8.4", optional = true, features = ["crossbeam-queue", "crossbeam-channel", "crossbeam-epoch"] } +#crossbeam-queue = { version = "0.3.11", optional = true } futures = { version = "0.3.30", optional = true, default-features = false, features = ["alloc", "std", "async-await"] } +lazy_static = { version = "1.4.0", features = ["spin"] } mapped-file = { version = "0.0.8", features = ["file"], optional = true } memchr = "2.7.2" num_cpus = { version = "1.16.0", optional = true } diff --git a/src/part.rs b/src/part.rs index d003187..e0666fb 100644 --- a/src/part.rs +++ b/src/part.rs @@ -1,3 +1,268 @@ //! Partitioning even areas by delimitor byte. use super::*; +use std::{ + num::NonZeroUsize, +}; +/// Midpoint searcher (forwards & backwards) +trait MidpointFBSearcher +{ + fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>; + fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>; + + fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>; +} + +/// Search the pivot for the needle sequentially. +/// +/// The order of operations will be: `search_forward()?, search_backward()`. +#[derive(Debug)] +struct SearchSeq; + +impl MidpointFBSearcher for SearchSeq +{ + #[inline(always)] + fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> { + memchr::memchr(needle, haystack).map(move |i| &haystack[i]) + } + #[inline(always)] + fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> { + memchr::memrchr(needle, haystack).map(move |i| &haystack[i]) + } + + #[inline] + fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> { + match haystack.split_at(begin) { + ([], []) => None, + ([], x) => self.search_forward(x, needle), + (x, []) => self.search_backward(x, needle), + + (x, y) => { + self.search_forward(y, needle)?; + self.search_backward(x, needle) + } + } + } +} + + +#[cfg(feature="async")] +const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = { + #[derive(Debug, Clone)] + struct SearchAsync + { + spawn_task: F, + result: oneshot::Receiver, + } + + #[cfg(feature="threads-async")] + impl MidpointFBSearcher for SearchAsync + where F: Fn() -> Fu, + Fu: futures::Future + Send + Sync + 'static + { + + } +}; + + +/// Search in parallel. +/// +/// # Warning +/// This search operation is heavy. It **always** spawns its own (up to) two threads when `search_combined()` is invoked. This may not be ideal... +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct SearchPar +{ + cap_start: usize, +} + +/// For f/b pivot-searching, the max area for each operation to attempt. +const DEFAULT_PIVOT_MAX_SEARCH_AREA: usize = 1024; +/// For f/b pivot-searching, the max *possible* area for each operation to attempt when it grows in capacity +const DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA: usize = (1024 * 1024 * 1024) * 2; // 2GB + +/// The number of pages of memory loaded where non-page-bound operations assume they're using HP-mapped data. +const DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES: usize = 4; + +lazy_static::lazy_static! { + /// Real system page size. + static ref REAL_PAGE_SIZE: std::ffi::c_int = { + use std::ffi::c_int; + extern "C" { + fn getpagesize() -> c_int; + } + unsafe { + getpagesize() + } + }; +} +/// Get a recommended bound for the pivot search area (if there is one.) +/// +/// # Returns +/// The recommended max bound for a pivot search area, or `None` for unbounded search. +/// +/// # Page kinds +/// If the operation is using huge-page mapped memory, set `use_hp` to true. +#[inline] +fn get_max_pivot_search_area(_use_hp: bool) -> Option +{ + use std::ffi::c_int; + lazy_static::lazy_static! { + static ref PAGE_SIZE: usize = { + match *REAL_PAGE_SIZE { + c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA, + // Very large (hp) + very_large if very_large as usize > DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA => 0, + // Large (limit to upper bound of non-hp) + large if large as usize >= DEFAULT_PIVOT_MAX_SEARCH_AREA => std::cmp::min(large as usize, DEFAULT_PIVOT_MAX_POSSIBLE_SEARCH_AREA), + // Smaller than default bound + small => small as usize + } + + }; + } + //XXX: Should we return a different value if `use_hp` is enabled? ("using hugepage") + NonZeroUsize::new(*PAGE_SIZE) +} + +impl SearchPar { + #[inline(always)] + pub const fn new() -> Self + { + Self::with_capacity(DEFAULT_PIVOT_MAX_SEARCH_AREA) + } + #[inline] + pub const fn with_capacity(cap_start: usize) -> Self + { + Self { cap_start } + } + #[inline] + pub const fn cap(&self) -> usize + { + self.cap_start + } + + #[inline(always)] + pub unsafe fn cap_mut(&mut self) -> &mut usize + { + &mut self.cap_start + } +} + +impl Default for SearchPar +{ + #[inline] + fn default() -> Self + { + Self { + cap_start: match *REAL_PAGE_SIZE { + std::ffi::c_int::MIN..=0 => DEFAULT_PIVOT_MAX_SEARCH_AREA, + above_zero => above_zero as usize, + }, + } + } +} + +#[cfg(feature="threads")] +impl MidpointFBSearcher for SearchPar +{ + #[inline(always)] + fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> { + memchr::memchr(needle, haystack).map(move |i| &haystack[i]) + } + #[inline(always)] + fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> { + memchr::memrchr(needle, haystack).map(move |i| &haystack[i]) + } + + fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> { + + let complete = crossbeam::atomic::AtomicCell::new(false); + std::thread::scope(|s| { + //let mut complete_val = UnsafeCell::new(false); + //let complete: parking_lot::Once = parking_lot::Once::new(); + // let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>)); + + let (mut hb, mut hf) = haystack.split_at(begin); + + let max_cap = match get_max_pivot_search_area(hf.len() > (DEFAULT_PIVOT_MAX_SEARCH_AREA * DEFAULT_MEM_DETECT_HUGE_SIZE_PAGES)){ // Assume huge-page memory if len is larger than 4 pages. + + // On debug builds, cap search area to one system page only. + _ignore if cfg!(debug_assertions) && (*REAL_PAGE_SIZE) > 0 => + // SAFETY: We have checked if `*REAL_PAGE_SIZE` is non-zero above. + Some(unsafe { NonZeroUsize::new_unchecked(*REAL_PAGE_SIZE as usize) }), + // Otherwise, use the detected value. + cap => cap, + }; + + let forward = if hf.len() > 0 { + let cap = self.cap_start; + let sf = &self; + let complete = &complete; + Some(s.spawn(move || -> Option<_> { + let mut cap = std::cmp::min(cap, hf.len()); + let len = hf.len(); + while cap <= len { + if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) { + complete.store(true); + //complete.call_once(|| complete_val = true); + return Some(x); + } else if complete.load() { + break; + } + // Cut out `cap` bytes from the start. + hf = &hf[cap..]; + // Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.) + cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1); + } + None::<&'a u8> + })) + } else { + None + }; + let backward = if hb.len() > 0 { + let cap = self.cap_start; + let sf = &self; + let complete = &complete; + Some(s.spawn(move || -> Option<_> { + let mut cap = std::cmp::min(cap, hb.len()); + let len = hb.len(); + while cap <= len { + if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) { + complete.store(true); + //complete.call_once(|| complete_val = true); + return Some(x); + } else if complete.load() { + break; + } + // Cut out `cap` bytes from the end. + hb = &hb[..cap]; + // Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.) + cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1); + } + None::<&'a u8> + })) + } else { + None + }; + + match (forward, backward) { + (None, None) => None, + (None, Some(back)) => back.join().unwrap_or(None), + (Some(forward), None) => forward.join().unwrap_or(None), + (Some(forward), Some(backward)) => forward.join().unwrap_or(None). + or_else(move || backward.join().unwrap_or(None)), + } + + }) + } +} + +fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8]) +where S: MidpointFBSearcher +{ + todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)") +} + +//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition... + +//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)