Compare commits
8 Commits
master
...
refactor-t
Author | SHA1 | Date |
---|---|---|
Avril | 13be69c884 | 8 months ago |
Avril | 0ce496cafd | 8 months ago |
Avril | 03a3a1bfd0 | 8 months ago |
Avril | 49e0dd6073 | 8 months ago |
Avril | 9d2ae29e8b | 8 months ago |
Avril | f9068beca1 | 8 months ago |
Avril | ff898fc9b3 | 8 months ago |
Avril | 5144539191 | 8 months ago |
@ -0,0 +1,349 @@
|
|||||||
|
//! Buffering and resource pooling utilities
|
||||||
|
//!
|
||||||
|
//!
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub use bytes::{
|
||||||
|
Buf, BufMut,
|
||||||
|
Bytes, BytesMut,
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
fmt, error,
|
||||||
|
sync::Arc,
|
||||||
|
pin::Pin,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(feature="_loan")]
|
||||||
|
const _TODO_REWORK_THIS_INTERFACE_ITS_OVERCOMPLICATED_SHITE: () = {
|
||||||
|
type OwnedResult<S, T, E> = Result<T, (E, S)>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
enum MaybeRef<'r, T>
|
||||||
|
{
|
||||||
|
Owned(T),
|
||||||
|
Borrowed(&'r T),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Loan<'owner, O: Loaner>
|
||||||
|
{
|
||||||
|
owner: MaybeRef<'owner, O>,
|
||||||
|
item: O::Item<'owner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'o, O> Loaned<'o, O> for Loan<'o, O>
|
||||||
|
where O: Loaner
|
||||||
|
{
|
||||||
|
fn try_release(self) -> Result<(), <O as Loaner>::Error> {
|
||||||
|
self.item.try_release()
|
||||||
|
}
|
||||||
|
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), <O as Loaner>::Error> {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Loaned<'owner, Owner: ?Sized + 'owner>: Sized
|
||||||
|
where Owner: Loaner
|
||||||
|
{
|
||||||
|
//fn owner(&self) -> Option<&Owner>;
|
||||||
|
|
||||||
|
fn try_release(self) -> Result<(), Owner::Error>;
|
||||||
|
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), Owner::Error>
|
||||||
|
where Self: 'owner; // XXX: What? 'owner or 'static??? I think it's `'owner` == `'static` but how tf do I write that?
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Loaner {
|
||||||
|
type Item<'owner>: Loaned<'owner, Self>
|
||||||
|
where Self: 'owner;
|
||||||
|
type Error: error::Error + Send + 'static;
|
||||||
|
|
||||||
|
fn try_loan(&self) -> Result<Self::Item<'_>, Self::Error>;
|
||||||
|
fn try_loan_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, Self::Item<'static>, Self::Error>
|
||||||
|
where Self: 'static;
|
||||||
|
|
||||||
|
fn try_release(&self, item: Self::Item<'_>) -> OwnedResult<Self::Item<'_>, (), Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
compile_error!("This `trait Loan` design isn't going to fucking work ofc...")
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Thread pooling
|
||||||
|
#[cfg(feature="threads")]
|
||||||
|
pub mod thread_pool {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
mem::{
|
||||||
|
self,
|
||||||
|
ManuallyDrop,
|
||||||
|
},
|
||||||
|
thread::{
|
||||||
|
self,
|
||||||
|
Thread, ThreadId,
|
||||||
|
},
|
||||||
|
ops::Drop,
|
||||||
|
sync::{Arc, Weak,},
|
||||||
|
|
||||||
|
};
|
||||||
|
use parking_lot::{
|
||||||
|
Condvar,
|
||||||
|
Mutex,
|
||||||
|
};
|
||||||
|
use crossbeam::{
|
||||||
|
queue::ArrayQueue,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ThreadHandleKind<'scope, T>
|
||||||
|
{
|
||||||
|
Detached(Option<ThreadId>),
|
||||||
|
Attached(thread::JoinHandle<T>),
|
||||||
|
Scoped(thread::ScopedJoinHandle<'scope, T>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'scope, T> ThreadHandleKind<'scope, T>
|
||||||
|
{
|
||||||
|
/// Get a reference to the attached thread (if there is one.)
|
||||||
|
#[inline]
|
||||||
|
pub fn thread(&self) -> Option<&Thread>
|
||||||
|
{
|
||||||
|
Some(match self {
|
||||||
|
Self::Scoped(s) => s.thread(),
|
||||||
|
Self::Attached(s) => s.thread(),
|
||||||
|
_ => return None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Is this handle attached to a thread?
|
||||||
|
///
|
||||||
|
/// If it is *attached*, this means the thread *should not* outlive the instance (unless the instance is `'static`.)
|
||||||
|
#[inline]
|
||||||
|
pub const fn is_attached(&self) -> bool
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
Self::Attached(_) |
|
||||||
|
Self::Scoped(_) => true,
|
||||||
|
Self::Detached(_) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Is this handle no longer referring to *any* thread?
|
||||||
|
///
|
||||||
|
/// In a *poisoned* state, the handle is useless.
|
||||||
|
#[inline]
|
||||||
|
pub const fn is_poisoned(&self) -> bool
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
Self::Detached(None) => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the thread id of the handle, if there is a referred to thread.
|
||||||
|
#[inline]
|
||||||
|
pub fn id(&self) -> Option<ThreadId>
|
||||||
|
{
|
||||||
|
Some(match self {
|
||||||
|
Self::Scoped(x) => x.thread().id(),
|
||||||
|
Self::Attached(x) => x.thread().id(),
|
||||||
|
Self::Detached(Some(id)) => *id,
|
||||||
|
_ => return None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents a raw thread handle
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct ThreadHandle<'scope, T>(ThreadHandleKind<'scope, T>);
|
||||||
|
|
||||||
|
//TODO: forward `ThreadHandle<'s, T>` methods to `ThreadHandleKind<'s, T>`: `thread(), id(), is_attached()`. Add methods `is_running(), join(), try_join(),` etc...
|
||||||
|
|
||||||
|
impl<'s, T> Drop for ThreadHandle<'s, T>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn drop(&mut self) {
|
||||||
|
match mem::replace(&mut self.0, ThreadHandleKind::Detached(None)) {
|
||||||
|
ThreadHandleKind::Attached(a) => drop(a.join()), // Attempt join, ignore panic.
|
||||||
|
ThreadHandleKind::Scoped(s) => drop(s.join().unwrap_or_resume()), // Attempt join, carry panic back through into scope.
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum PooledSendContext<T: Send, F: ?Sized + Send>
|
||||||
|
{
|
||||||
|
/// An execute directive `Err(Some(func))` is taken by the thread and replaced by `Ok(func())`.
|
||||||
|
///
|
||||||
|
/// # Direction
|
||||||
|
/// Bidirectional.
|
||||||
|
/// (Handle -> Thread; Thread -> Handle)
|
||||||
|
Execute(Result<T, Option<Box<F>>>),
|
||||||
|
/// Pass a caught panic within an `Execute(Err(Some(F)))` payload back from the thread.
|
||||||
|
///
|
||||||
|
/// # Direction
|
||||||
|
/// Thread -> Handle
|
||||||
|
Panic(UnwindPayload),
|
||||||
|
/// When the loaned `PooledThread` is moved back into the pool, tell the thread to park itself again.
|
||||||
|
///
|
||||||
|
/// # Direction
|
||||||
|
/// Handle -> Thread
|
||||||
|
Park,
|
||||||
|
|
||||||
|
/// The default invariant used for when an operation has been taken by a thread.
|
||||||
|
///
|
||||||
|
/// # When `Taken`.
|
||||||
|
/// The replacement is always `Taken` after a `take(&mut self)` that contains a value.
|
||||||
|
/// The `Park` directive will not be replaced to `Taken`.
|
||||||
|
Taken,
|
||||||
|
|
||||||
|
//NOTE: `Close` variant not needed because `handle` will have a `Weak` reference to `callback_passage`; when the upgrade fails, the thread will exit.
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Send, F: ?Sized + Send> PooledSendContext<T, F>
|
||||||
|
{
|
||||||
|
pub fn take(&mut self) -> Option<Self>
|
||||||
|
{
|
||||||
|
Some(match self {
|
||||||
|
//XXX: I don't think not replacing cloneable variants is a good or useful default. (see below V)
|
||||||
|
//// Clone non-data-holding variants.
|
||||||
|
//Self::Park => Self::Park,
|
||||||
|
//// If the ctx has already been taken, return `None`.
|
||||||
|
//Self::Taken => return None,
|
||||||
|
// Move out of data-holding variants.
|
||||||
|
_ => mem::replace(self, Self::Taken)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A loanable thread that belongs to a pool.
|
||||||
|
///
|
||||||
|
/// # Pooling
|
||||||
|
/// When taken from a `ThreadPool`, the object is *moved* out of the thread-pool's ring-buffer and then pinned in return (See `PooledThread`.)
|
||||||
|
///
|
||||||
|
/// To return the thread to the pool, the `callback_passage` writes a `Park` command to the thread, unpins itself, and then on drop moves itself back into the start of the ring-buffer of the thread pool `owner`
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct PooledThreadHandle<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||||
|
where F: FnOnce() -> T + Send + 'scope,
|
||||||
|
T: Send + 'scope
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Passes `F` to `handle` and retrieves `T`.
|
||||||
|
callback_passage: Arc<(Condvar, Mutex<PooledSendContext<T, F>>)>,
|
||||||
|
/// The backing thread, which when started parks itself.
|
||||||
|
/// The `PooledThread` object will unpark the thread when the object is loaned (moved) out of the pool, and re-parked when enter
|
||||||
|
///
|
||||||
|
/// Will wait on `callback_passage?.0` for `Execute` directives, pass them back to the handle, and when a `Park` directive is recieved (i.e. when moved back into the pool) the thread will park itself and wait to be unparked when moved out of the pool or closed again.
|
||||||
|
///
|
||||||
|
/// If `callback_message` is dropped, the thread will exit when unparked (dropped from pool.)
|
||||||
|
//TODO: Ensure unpark on drop?
|
||||||
|
handle: ThreadHandle<'scope, Result<(), ()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'scope, T, F:?Sized> PooledThreadHandle<'scope, T, F>
|
||||||
|
where F: FnOnce() -> T + Send + 'scope,
|
||||||
|
T: Send + 'scope
|
||||||
|
{
|
||||||
|
fn send_to_thread_raw(&self, message: PooledSendContext<T, F>) -> bool
|
||||||
|
{
|
||||||
|
let mut ctx = self.callback_passage.1.lock();
|
||||||
|
*ctx = message;
|
||||||
|
self.callback_passage.0.notify_one()
|
||||||
|
}
|
||||||
|
fn send_from_thread_raw(&self, message: PooledSendContext<T, F>)
|
||||||
|
{
|
||||||
|
let mut ctx = self.callback_passage.1.lock();
|
||||||
|
*ctx = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_in_thread_raw(&self, wait_for: bool) -> Option<PooledSendContext<T, F>>
|
||||||
|
{
|
||||||
|
let mut ctx = self.callback_passage.1.lock(); // NOTE: Should we fair lock here?
|
||||||
|
// Wait on condvar if `wait_for` is true.
|
||||||
|
if wait_for {
|
||||||
|
self.callback_passage.0.wait(&mut ctx);
|
||||||
|
}
|
||||||
|
// Then replace the ctx with `Taken`.
|
||||||
|
ctx.take()
|
||||||
|
}
|
||||||
|
fn try_read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
|
||||||
|
{
|
||||||
|
let mut ctx = self.callback_passage.1.try_lock()?;
|
||||||
|
ctx.take()
|
||||||
|
}
|
||||||
|
fn read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
|
||||||
|
{
|
||||||
|
let mut ctx = self.callback_passage.1.lock();
|
||||||
|
ctx.take()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PooledThread<'scope, 'pool: 'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||||
|
where F: FnOnce() -> T + Send + 'scope,
|
||||||
|
T: Send + 'scope
|
||||||
|
{
|
||||||
|
/// The owner to return `thread` to on drop.
|
||||||
|
owner: Option<&'pool ThreadPool<'scope, T, F>>,
|
||||||
|
/// The pinned thread handle that is returned on drop.
|
||||||
|
thread: ManuallyDrop<PooledThreadHandle<'scope, T, F>>, //XXX: Pin<> needed (or even useful at all) here?
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'scope, 'pool: 'scope, T, F:?Sized> Drop for PooledThread<'scope, 'pool, T, F>
|
||||||
|
where F: FnOnce() -> T + Send + 'scope,
|
||||||
|
T: Send + 'scope
|
||||||
|
{
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let Some(loaned_from) = self.owner else {
|
||||||
|
// There is no owner.
|
||||||
|
// Instead, unpark the thread after drop.
|
||||||
|
let PooledThreadHandle { callback_passage, handle } = unsafe { ManuallyDrop::take(&mut self.thread) };
|
||||||
|
|
||||||
|
// Drop the Arc, so when `thread` wakes from unpark, it will fail to acquire context.
|
||||||
|
drop(callback_passage);
|
||||||
|
|
||||||
|
if let Some(thread) = handle.0.thread() {
|
||||||
|
thread.unpark();
|
||||||
|
}
|
||||||
|
|
||||||
|
return todo!("How do we know *when* to unpark? How can we unpark *after* dropping the Arc?");
|
||||||
|
};
|
||||||
|
|
||||||
|
// There is an owner. Tell the thread to park itself.
|
||||||
|
let mut handle = unsafe { ManuallyDrop::take(&mut self.thread) };
|
||||||
|
handle.send_to_thread_raw(PooledSendContext::Park);
|
||||||
|
|
||||||
|
// Read the message object, take it from the thread after it notifies us it is about to park.
|
||||||
|
let ctx = {
|
||||||
|
let mut ctx = handle.callback_passage.1.lock();
|
||||||
|
// Before thread parks, it should signal condvar, and callback_passage should not be unique again yet.
|
||||||
|
handle.callback_passage.0.wait_while(&mut ctx, |_| Arc::strong_count(&handle.callback_passage) > 1);
|
||||||
|
ctx.take()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Force insert back into thred pool.
|
||||||
|
if let Some( PooledThreadHandle { callback_passage, handle }) = loaned_from.pool_queue.force_push(handle) {
|
||||||
|
// If one is removed, drop the context..
|
||||||
|
drop(callback_passage);
|
||||||
|
// Then force-unpark the thread.
|
||||||
|
match handle.0.thread() {
|
||||||
|
Some(thread) => thread.unpark(),
|
||||||
|
None => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
todo!("Should we force `join()` here? Or allow non-scoped handles to be forgotten?")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ThreadPool<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||||
|
where F: FnOnce() -> T + Send + 'scope,
|
||||||
|
T: Send + 'scope
|
||||||
|
{
|
||||||
|
pool_queue: ArrayQueue<PooledThreadHandle<'scope, T, F>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: Build a ThreadPool<'scope = 'static> using crossbeam::ArrayQueue<...> (XXX: See phone for details on how pool should use un/parking for loaning; and CondVar for passing tasks.) Also, add a `Loan` trait with assoc type `Item<'owner>` and methods `try_loan(&self) -> Result<Self::Item<'_>, ...>`, `return(&self, item: Self::Item<'_>) -> Result<(), ...>` and `try_loan_owned(self: Arc<Self>) -> `MovedResult<Arc<Self>, OwnedLoan<'static, Self>, ...>` (struct OwnedLoad<'scope, L>{ item: Option<L::Item<'scope>> } )`, etc.
|
||||||
|
}
|
@ -0,0 +1,281 @@
|
|||||||
|
//! Extension traits, global functions + macros.
|
||||||
|
#![allow(unused)]
|
||||||
|
use super::*;
|
||||||
|
use std::convert::Infallible;
|
||||||
|
|
||||||
|
pub use std::{
|
||||||
|
convert::{
|
||||||
|
TryFrom, TryInto,
|
||||||
|
},
|
||||||
|
borrow::*,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Add forwarding borrow + deref (+ optional `into_inner()`) impls for a type.
|
||||||
|
///
|
||||||
|
/// # Usage
|
||||||
|
/// For a mutable forwarding newtype:
|
||||||
|
/// ```
|
||||||
|
/// # use crate::forward_newtype;
|
||||||
|
/// # use core::borrow::*;
|
||||||
|
///
|
||||||
|
/// /// A mutable buffer newtype over an array.
|
||||||
|
/// struct Buffer([u8; 16]);
|
||||||
|
/// forward_newtype!(mut Buffer => [u8], 0); // Generates `Borrow<[u8]>`, `BorrowMut<[u8]>`, `Deref<Target=[u8]>`, and `DerefMut` impls for `Buffer` that return `<&[mut] self.>0` (as specified by `0`.)
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// For an immutable forwarding newtype:
|
||||||
|
/// ```
|
||||||
|
/// # use crate::forward_newtype;
|
||||||
|
/// # use core::borrow::*;
|
||||||
|
///
|
||||||
|
/// /// A mutable buffer newtype over an array.
|
||||||
|
/// struct Data([u8; 16]);
|
||||||
|
/// forward_newtype!(ref Buffer => [u8], 0); // Generates `Borrow<[u8]>` and `Deref<Target=[u8]>` impls for `Buffer` that return `<& self.>0` (as specified by `0`.) Immutable access only is specified by `ref`.
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// ## Consuming into inner
|
||||||
|
/// To generate an `into_inner(self) -> T` inherent impl for the type, the syntax `forward_newtype!(move [const] Type => Inner, accessor)` can be used.
|
||||||
|
/// If `const` is passed, then the `into_inner()` function will be a `const fn`, if not, then it won't be.
|
||||||
|
///
|
||||||
|
/// To combine with ref-forwarding accessors, the syntax `forward_newtype!(move [const] {ref/mut} Type => Inner, accessor)` can be used to generate them all; the `Borrow`, `BorrowMut`, `Deref`, `DerefMut` and `pub [const] fn into_inner()`.
|
||||||
|
/// This is the most likely to be useful.
|
||||||
|
///
|
||||||
|
/// If you need a seperate `into_inner()` impl, you can either not use the `move` declarator, or use the `ref`/`mut` accessor generator in a different statement than the `move` one:
|
||||||
|
/// ```
|
||||||
|
/// # use crate::forward_newtype;
|
||||||
|
/// # use core::borrow::*;
|
||||||
|
///
|
||||||
|
/// /// A mutable buffer newtype over an array.
|
||||||
|
/// struct Buffer([u8; 16]);
|
||||||
|
/// forward_newtype!(mut Buffer => [u8], 0); // Generate a mutable & immutable forwarding ref to a slice of bytes.
|
||||||
|
/// forward_newtype!(move const Buffer => [u8; 16], 0); // Generate a seperately typed `into_inner()` that returns the sized array.
|
||||||
|
/// ```
|
||||||
|
macro_rules! forward_newtype {
|
||||||
|
(ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
impl Borrow<$inner> for $type
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn borrow(&self) -> &$inner
|
||||||
|
{
|
||||||
|
&self.$($expr)+
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::std::ops::Deref for $type {
|
||||||
|
type Target = $inner;
|
||||||
|
#[inline]
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.borrow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||||
|
|
||||||
|
impl BorrowMut<$inner> for $type
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn borrow_mut(&mut self) -> &mut $inner
|
||||||
|
{
|
||||||
|
&mut self.$($expr)+
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::std::ops::DerefMut for $type {
|
||||||
|
#[inline]
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.borrow_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(move const $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
impl $type {
|
||||||
|
/// Consume into the inner value.
|
||||||
|
pub const fn into_inner(self) -> $inner {
|
||||||
|
self.$($expr)+
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(move $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
impl $type {
|
||||||
|
/// Consume into the inner value.
|
||||||
|
pub fn into_inner(self) -> $inner {
|
||||||
|
self.$($expr)+
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(move const ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
forward_newtype!(move const $type => $inner, $($expr)+);
|
||||||
|
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||||
|
};
|
||||||
|
(move ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
forward_newtype!(move $type => $inner, $($expr)+);
|
||||||
|
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||||
|
};
|
||||||
|
(move const mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
forward_newtype!(move const $type => $inner, $($expr)+);
|
||||||
|
forward_newtype!(mut $type => $inner, $($expr)+);
|
||||||
|
};
|
||||||
|
(move mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||||
|
forward_newtype!(move $type => $inner, $($expr)+);
|
||||||
|
forward_newtype!(mut $type => $inner, $($expr)+);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The default bottom type.
|
||||||
|
///
|
||||||
|
/// To use the `unwrap_infallible()`-like interface, functions that return `-> !` should be changed to `-> Never`.
|
||||||
|
/// When `unstable` is enabled, this is an alias to `!` and `-> !` is not special cased.
|
||||||
|
///
|
||||||
|
/// # As return argument
|
||||||
|
/// When feature `unstable` is enabled, `into_unreachable()` may not be required to ensure propogation to `!` from a function returning `-> Never`.
|
||||||
|
#[cfg(feature="unstable")]
|
||||||
|
pub type Never = !;
|
||||||
|
|
||||||
|
/// The default bottom type.
|
||||||
|
///
|
||||||
|
/// To use the `unwrap_infallible()`-like interface, functions special cased to `-> !` should be changed to `-> Never`.
|
||||||
|
///
|
||||||
|
/// # As return argument
|
||||||
|
/// When feature `unstable` is not enabled, `into_unreachable()` may be required to be used when dealing with return bottom types other than the special case `-> !`.
|
||||||
|
/// This is a current limitation of the type system.
|
||||||
|
#[cfg(not(feature="unstable"))]
|
||||||
|
pub type Never = Infallible;
|
||||||
|
|
||||||
|
/// Contractually ensures this type cannot exist (i.e. it is a bottom type.)
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
/// Instances of the impl type **cannot exist**.
|
||||||
|
/// They must be bottom types (i.e. empty enums, types contatining an `Infallible` / `!` object, etc.)
|
||||||
|
///
|
||||||
|
/// # Auto-impl
|
||||||
|
/// This trait is not intended to be implemented on any user-defined type other than empty enums.
|
||||||
|
///
|
||||||
|
/// By default it is implemented for the following types:
|
||||||
|
/// - `core::convert::Infallible`
|
||||||
|
/// - `!` (**feature**: `unstable`)
|
||||||
|
/// - `Box<T>` *where* `T: ?Sized + Unreachable`
|
||||||
|
pub unsafe trait Unreachable {
|
||||||
|
/// Force control flow to terminate type checking here.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// This function will never be executed, it is used to terminate the value's existence in the type system, by converting it from any `Unreachable` type into the bottom return type `!`.
|
||||||
|
/// If this function ever **can** be called at all, it is undefined behaviour.
|
||||||
|
#[inline]
|
||||||
|
#[cold]
|
||||||
|
fn into_unreachable(self) -> !
|
||||||
|
where Self: Sized
|
||||||
|
{
|
||||||
|
if cfg!(debug_assertions) {
|
||||||
|
unreachable!("Unreachable conversion from {}!", std::any::type_name::<Self>())
|
||||||
|
} else {
|
||||||
|
// SAFETY: Contractually enforced by the trait impl itself.
|
||||||
|
unsafe {
|
||||||
|
std::hint::unreachable_unchecked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Unreachable for Infallible {
|
||||||
|
#[inline(always)]
|
||||||
|
#[cold]
|
||||||
|
fn into_unreachable(self) -> ! {
|
||||||
|
match self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(feature="unstable")]
|
||||||
|
unsafe impl Unreachable for ! {
|
||||||
|
#[inline(always)]
|
||||||
|
#[cold]
|
||||||
|
fn into_unreachable(self) -> ! {
|
||||||
|
match self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T: ?Sized + Unreachable> Unreachable for Box<T> {}
|
||||||
|
|
||||||
|
pub trait UnwrapPanicExt<T, E> {
|
||||||
|
/// Unwrap the result `Ok` value or panic as described by the non-returning function `F`, with the `Unreachable` bottom type `N`.
|
||||||
|
/// This will usually be an `-> !` function, (or an `-> Never` function using the `Unreachable` interface.)
|
||||||
|
///
|
||||||
|
/// # Panic usage
|
||||||
|
/// `func` must not return. It should panic, resume a panic, or exit the thread/program, trap, or terminate in an infinite loop.
|
||||||
|
///
|
||||||
|
/// It does not *have* to call `panic!()` if it terminates in another way that is not a panic, however.
|
||||||
|
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T;
|
||||||
|
|
||||||
|
/// Unwrap the result `Ok` value or panic as described by the non-returning function `func` with the default bottom type `Never`.
|
||||||
|
#[inline(always)]
|
||||||
|
fn unwrap_or_panic_unreachable<F: FnOnce(E) -> Never>(self, func: F) -> T
|
||||||
|
where Self: Sized {
|
||||||
|
self.unwrap_or_panic::<Never, _>(func)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait UnwrapInfallibleExt<T> {
|
||||||
|
/// Unwrapping is infallible and therefore safe to do so without checking.
|
||||||
|
fn unwrap_infallible(self) -> T;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait UnwrapPanicResumeExt<T> {
|
||||||
|
/// Unwrap or resume a previous unwind, with the unwind payload in the `Err` variant.
|
||||||
|
fn unwrap_or_resume(self) -> T;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> UnwrapPanicExt<T, E> for Result<T, E>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T {
|
||||||
|
#[inline(never)]
|
||||||
|
#[cold]
|
||||||
|
fn _do_panic<Nn: Unreachable, Ee, Ff: FnOnce(Ee) -> Nn>(error: Ee, func: Ff) -> !
|
||||||
|
{
|
||||||
|
func(error).into_unreachable()
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => _do_panic(e, func)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E: Unreachable> UnwrapInfallibleExt<T> for Result<T, E>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn unwrap_infallible(self) -> T {
|
||||||
|
match self {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => if cfg!(debug_assertions) {
|
||||||
|
e.into_unreachable()
|
||||||
|
} else {
|
||||||
|
// SAFETY: Contract bound of `E: Unreachable` ensures this path will never be taken.
|
||||||
|
unsafe {
|
||||||
|
std::hint::unreachable_unchecked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The type of a caught unwind payload.
|
||||||
|
pub type UnwindPayload = Box<dyn std::any::Any + Send>;
|
||||||
|
|
||||||
|
#[cold]
|
||||||
|
#[inline(never)]
|
||||||
|
fn _resume_unwind<E: Into<UnwindPayload>>(e: E) -> !
|
||||||
|
{
|
||||||
|
std::panic::resume_unwind(e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E: Into<UnwindPayload>> UnwrapPanicResumeExt<T> for Result<T, E>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn unwrap_or_resume(self) -> T {
|
||||||
|
match self {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => _resume_unwind(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,426 @@
|
|||||||
|
//! Partitioning even areas by delimitor byte.
|
||||||
|
use super::*;
|
||||||
|
use std::{
|
||||||
|
num::NonZeroUsize,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Size of one cache-line.
|
||||||
|
///
|
||||||
|
/// NOTE: alignment padded for `u8`.
|
||||||
|
///
|
||||||
|
/// TODO: Make this comptime env-var configurable (`option_env!()`) on debug builds. (See `SEARCH_CAP_GROW`.)
|
||||||
|
const CACHELINE_SIZE: usize = std::mem::size_of::<crossbeam_utils::CachePadded<u8>>();
|
||||||
|
|
||||||
|
/// A buffer that takes up exactly one cache-line.
|
||||||
|
///
|
||||||
|
/// This type is not `Copy` to ensure copies are made safely. `clone()` is trivial, and to copy explicitly in a const context use `.copied()`
|
||||||
|
///
|
||||||
|
/// # Alignment
|
||||||
|
/// Note that the buffer is *not* 1-cacheline aligned itself by default.
|
||||||
|
/// To ensure its alignment, you should use `crossbeam_utils::CachePadded<CachelineBuffer>` (or the type-alias `AlignedCachelineBuffer`.)
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct CachelineBuffer([u8; CACHELINE_SIZE]);
|
||||||
|
|
||||||
|
impl Default for CachelineBuffer
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn default() -> Self
|
||||||
|
{
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// A buffer that takes up exactly one cache-line, which is itself aligned to 1 cacheline.
|
||||||
|
pub type AlignedCachelineBuffer = crossbeam_utils::CachePadded<CachelineBuffer>;
|
||||||
|
|
||||||
|
impl CachelineBuffer {
|
||||||
|
/// The size of the buffer (1 cacheline of bytes.)
|
||||||
|
pub const SIZE: usize = CACHELINE_SIZE;
|
||||||
|
|
||||||
|
/// Create a new, empty buffer.
|
||||||
|
#[inline]
|
||||||
|
pub const fn new() -> Self
|
||||||
|
{
|
||||||
|
Self([0; Self::SIZE])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clone this value.
|
||||||
|
///
|
||||||
|
/// This is a `const fn` explicit trivial copy of the data.
|
||||||
|
#[inline]
|
||||||
|
pub const fn copied(&self) -> Self
|
||||||
|
{
|
||||||
|
Self(self.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the byte array.
|
||||||
|
pub const fn as_bytes(&self) -> &[u8; Self::SIZE]
|
||||||
|
{
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
forward_newtype!(mut CachelineBuffer => [u8], 0);
|
||||||
|
forward_newtype!(move const CachelineBuffer => [u8; CACHELINE_SIZE], 0);
|
||||||
|
|
||||||
|
const _: () = {
|
||||||
|
debug_assert!(CachelineBuffer::SIZE > std::mem::size_of::<u8>(), "Invalid cacheline-padding size (`CACHELINE_SIZE`)");
|
||||||
|
//debug_assert!(CACHELINE_SIZE == 128, "Unexpected `CACHELINE_SIZE`");
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Grow capacity exponentially when search fails.
|
||||||
|
///
|
||||||
|
/// TODO: Make this comptime env-var settable (`option_env!()`) on debug builds.
|
||||||
|
const SEARCH_CAP_GROW: bool = true;
|
||||||
|
|
||||||
|
/// Settings for a searcher (memory search method configuration.)
|
||||||
|
///
|
||||||
|
/// The default values provided to implementors are globally controlled and (debug-build only) env-var configurable (for benchmarking purposes.)
|
||||||
|
trait SynchonousSearcher {
|
||||||
|
/// Initial size of capacity
|
||||||
|
const CAP_SIZE: usize = CACHELINE_SIZE;
|
||||||
|
|
||||||
|
/// Should the capacity be grown on failed search?
|
||||||
|
const CAP_GROW: bool = SEARCH_CAP_GROW;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default impl global compiled capacity settings for each.
|
||||||
|
impl SynchonousSearcher for SearchPar {}
|
||||||
|
impl SynchonousSearcher for SearchSeq {}
|
||||||
|
|
||||||
|
/// Midpoint searcher (forwards & backwards)
|
||||||
|
trait MidpointFBSearcher<T=u8>: SynchonousSearcher
|
||||||
|
{
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||||
|
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search the pivot for the needle sequentially.
|
||||||
|
///
|
||||||
|
/// The order of operations will be: `search_forward()?, search_backward()`.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct SearchSeq;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get_max_cap_for_search_area(size: usize) -> Option<NonZeroUsize>
|
||||||
|
{
|
||||||
|
SYS_PAGE_SIZE.and_then(move |page| if size == 0 {
|
||||||
|
// Size is unknown, bound by page.
|
||||||
|
Some(page)
|
||||||
|
} else if size >= (page.get() << 2) {
|
||||||
|
// Size is huge, bound by page ^2
|
||||||
|
NonZeroUsize::new(page.get() << 1)
|
||||||
|
} else if size >= page.get() {
|
||||||
|
// Size is larger than page, bound by page.
|
||||||
|
Some(page)
|
||||||
|
} else {
|
||||||
|
// If the area size is lower than one page, do not bound the capacity growth.
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MidpointFBSearcher<u8> for SearchSeq
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||||
|
let max_cap = get_max_cap_for_search_area(haystack.len());
|
||||||
|
|
||||||
|
match haystack.split_at(begin) {
|
||||||
|
([], []) => None,
|
||||||
|
([], x) => self.search_forward(x, needle),
|
||||||
|
(x, []) => self.search_backward(x, needle),
|
||||||
|
|
||||||
|
// If both the buffers are lower than `max_cap`, just do the entire operation on each
|
||||||
|
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
|
||||||
|
self.search_forward(y, needle)?;
|
||||||
|
self.search_backward(x, needle)
|
||||||
|
},
|
||||||
|
|
||||||
|
(mut x, mut y) => {
|
||||||
|
let len = std::cmp::min(x.len(), y.len());
|
||||||
|
let mut cap = std::cmp::min(len, Self::CAP_SIZE);
|
||||||
|
|
||||||
|
if let Some(&max) = max_cap.as_ref() {
|
||||||
|
// Bound `cap` to `max_cap` if it is set.
|
||||||
|
cap = std::cmp::min(cap, max.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
while cap <= len {
|
||||||
|
// If cap is too large for one (or more) of the buffers, truncate it.
|
||||||
|
if cap > y.len() || cap > x.len() {
|
||||||
|
cap = std::cmp::min(y.len(), x.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search forwards in `y`. (up to `cap`)
|
||||||
|
if let Some(y) = self.search_forward(&y[..cap], needle) {
|
||||||
|
return Some(y);
|
||||||
|
}
|
||||||
|
// Search backwards in `x`. (down to `cap`)
|
||||||
|
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
|
||||||
|
return Some(x);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cut out `cap` bytes from the start of forwards
|
||||||
|
y = &y[cap..];
|
||||||
|
// Cut out `cap` bytes from the end of backwards.
|
||||||
|
x = &x[..cap];
|
||||||
|
|
||||||
|
if Self::CAP_GROW {
|
||||||
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||||
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(feature="async")]
|
||||||
|
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct SearchAsync<F>
|
||||||
|
{
|
||||||
|
spawn_task: F,
|
||||||
|
result: oneshot::Receiver<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads-async")]
|
||||||
|
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
|
||||||
|
where F: Fn() -> Fu,
|
||||||
|
Fu: futures::Future + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// Search in parallel.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
/// This search operation is heavy. It **always** spawns its own 2nd thread when `search_combined()` is invoked.
|
||||||
|
/// This may not be ideal... A lighter, thread-pool (async) or thread-reusing (sync) API would be better. (See below.)
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct SearchPar;
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
/// Real system page size (raw.)
|
||||||
|
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
|
||||||
|
use std::ffi::c_int;
|
||||||
|
extern "C" {
|
||||||
|
fn getpagesize() -> c_int;
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
getpagesize()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
/// System page size.
|
||||||
|
///
|
||||||
|
/// If the page size returned from `getpagesize()` (`REAL_PAGE_SIZE`) was invalid (below-or-equal to 0,) `None` will be returned.
|
||||||
|
static ref SYS_PAGE_SIZE: Option<NonZeroUsize> = {
|
||||||
|
match *REAL_PAGE_SIZE {
|
||||||
|
std::ffi::c_int::MIN..=0 => None,
|
||||||
|
// SAFETY: We have masked out `0` in the above branch.
|
||||||
|
rest => unsafe {
|
||||||
|
debug_assert!(usize::try_from(rest).is_ok(), "Page size `c_int` out of range of system `usize`??? (Got {})", rest);
|
||||||
|
Some(NonZeroUsize::new_unchecked(rest as usize))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads")]
|
||||||
|
impl MidpointFBSearcher<u8> for SearchPar
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
#[inline(always)]
|
||||||
|
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||||
|
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||||
|
|
||||||
|
let complete = crossbeam::atomic::AtomicCell::new(false);
|
||||||
|
std::thread::scope(|s| {
|
||||||
|
//let mut complete_val = UnsafeCell::new(false);
|
||||||
|
//let complete: parking_lot::Once = parking_lot::Once::new();
|
||||||
|
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
|
||||||
|
|
||||||
|
let (mut hb, mut hf) = haystack.split_at(begin);
|
||||||
|
|
||||||
|
let max_cap = get_max_cap_for_search_area(haystack.len());
|
||||||
|
|
||||||
|
// Cap the cap to `max_cap` if there is a max cap.
|
||||||
|
let cap = if let Some(max) = max_cap.as_ref() {
|
||||||
|
std::cmp::min(max.get(), Self::CAP_SIZE)
|
||||||
|
} else {
|
||||||
|
Self::CAP_SIZE
|
||||||
|
};
|
||||||
|
|
||||||
|
let forward = if hf.len() > 0 {
|
||||||
|
let cap = cap;
|
||||||
|
let sf = &self;
|
||||||
|
let complete = &complete;
|
||||||
|
// Background thread: Forward search (`forward-searcher`.)
|
||||||
|
Some(std::thread::Builder::new().name("forward-searcher".into()).spawn_scoped(s, move || -> Option<_> {
|
||||||
|
let mut cap = std::cmp::min(cap, hf.len());
|
||||||
|
let len = hf.len();
|
||||||
|
|
||||||
|
// Check completion before starting loop too.
|
||||||
|
if complete.load() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
while cap <= len {
|
||||||
|
// If `cap` is larger than the buffer `hf`, truncate it.
|
||||||
|
cap = std::cmp::min(cap, hf.len());
|
||||||
|
// Search forward in `hf` up to `cap` bytes.
|
||||||
|
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
|
||||||
|
// Tell other operation we have found something.
|
||||||
|
complete.store(true);
|
||||||
|
return Some(x);
|
||||||
|
} else if complete.load() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Cut out `cap` bytes from the start.
|
||||||
|
hf = &hf[cap..];
|
||||||
|
if Self::CAP_GROW {
|
||||||
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||||
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None::<&'a u8>
|
||||||
|
}).expect("Failed to spawn forward-searcher thread"))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
//NOTE: There is no need to spawn another thread for the 2nd operation, since they are both join()'d at the end regardless and both already communicate completion.
|
||||||
|
let backward = if hb.len() > 0 {
|
||||||
|
let cap = cap;
|
||||||
|
let sf = &self;
|
||||||
|
let complete = &complete;
|
||||||
|
|
||||||
|
// Main thread: Backwards search.
|
||||||
|
move || -> Option<_> {
|
||||||
|
let mut cap = std::cmp::min(cap, hb.len());
|
||||||
|
let len = hb.len();
|
||||||
|
|
||||||
|
// Check completion before starting loop too.
|
||||||
|
if complete.load() {
|
||||||
|
return None;
|
||||||
|
} else {
|
||||||
|
// Allow previous thread to run if it is not.
|
||||||
|
std::thread::yield_now();
|
||||||
|
}
|
||||||
|
while cap <= len {
|
||||||
|
// If `cap` is larger than the buffer `hb`, truncate it.
|
||||||
|
cap = std::cmp::min(cap, hb.len());
|
||||||
|
// Search backwards in `hb` up to `cap` bytes.
|
||||||
|
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
|
||||||
|
complete.store(true);
|
||||||
|
return Some(x);
|
||||||
|
} else if complete.load() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Cut out `cap` bytes from the end.
|
||||||
|
hb = &hb[..cap];
|
||||||
|
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||||
|
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||||
|
}
|
||||||
|
None::<&'a u8>
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
if backward.is_some() && forward.as_ref().map(|th| !th.is_finished()).unwrap_or(false) {
|
||||||
|
// `backward` found something, `forward` is still running.
|
||||||
|
debug_assert_ne!(complete.load(), false, "Complete has not been set! (main thread waiting for forward-searcher thread");
|
||||||
|
complete.store(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cold]
|
||||||
|
#[inline(never)]
|
||||||
|
fn _resume_unwind(e: Box<dyn std::any::Any + Send>) -> Never
|
||||||
|
{
|
||||||
|
if cfg!(debug_assertions) {
|
||||||
|
panic!("forward-searcher thread panic")
|
||||||
|
} else {
|
||||||
|
std::panic::resume_unwind(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match (forward, backward) {
|
||||||
|
(None, None) => None,
|
||||||
|
(None, back @ Some(_)) => back,
|
||||||
|
(Some(forward), backward) => backward.or_else(move || forward.join().unwrap_or_panic(_resume_unwind)),
|
||||||
|
//(Some(forward), Some(_)) => Handled ^
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
|
||||||
|
where S: MidpointFBSearcher<u8>
|
||||||
|
{
|
||||||
|
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
|
||||||
|
}
|
||||||
|
|
||||||
|
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
|
||||||
|
|
||||||
|
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
|
||||||
|
|
||||||
|
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test
|
||||||
|
{
|
||||||
|
use super::*;
|
||||||
|
use std::hint::black_box;
|
||||||
|
//TODO: Add a generic randomised lorem-ipsum-like text data generator & a generic assertion tester that can take a unique `MidpointFBSearcher`.
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partition_seq()
|
||||||
|
{
|
||||||
|
todo!("Test `SearchSeq` sequential partition searcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads")]
|
||||||
|
#[test]
|
||||||
|
fn partition_par_heavy()
|
||||||
|
{
|
||||||
|
todo!("Test `SearchPar` parallel partition searcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
|
||||||
|
#[cfg(all(feature="threads-async", feature = "threads"))]
|
||||||
|
#[test]
|
||||||
|
fn partition_par_light()
|
||||||
|
{
|
||||||
|
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature="threads-async")]
|
||||||
|
#[/*tokio::*/test]
|
||||||
|
fn partition_par_async()
|
||||||
|
{
|
||||||
|
unimplemented!("A pure async parallel searcher has not yet been implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: Benchmarking the searchers' configuration about capacity size, growth and bounding.
|
||||||
|
}
|
Loading…
Reference in new issue