Compare commits
10 Commits
Author | SHA1 | Date |
---|---|---|
Avril | 491867fbbb | 8 months ago |
Avril | bea2631fb7 | 8 months ago |
Avril | bcaa9e0703 | 8 months ago |
Avril | 0ce496cafd | 8 months ago |
Avril | 03a3a1bfd0 | 8 months ago |
Avril | 49e0dd6073 | 8 months ago |
Avril | 9d2ae29e8b | 8 months ago |
Avril | f9068beca1 | 8 months ago |
Avril | ff898fc9b3 | 8 months ago |
Avril | 5144539191 | 8 months ago |
@ -0,0 +1,358 @@
|
||||
//! Buffering and resource pooling utilities
|
||||
//!
|
||||
//!
|
||||
use super::*;
|
||||
|
||||
pub use bytes::{
|
||||
Buf, BufMut,
|
||||
Bytes, BytesMut,
|
||||
};
|
||||
use std::{
|
||||
fmt, error,
|
||||
sync::Arc,
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
|
||||
#[cfg(feature="_loan")]
|
||||
const _TODO_REWORK_THIS_INTERFACE_ITS_OVERCOMPLICATED_SHITE: () = {
|
||||
type OwnedResult<S, T, E> = Result<T, (E, S)>;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
enum MaybeRef<'r, T>
|
||||
{
|
||||
Owned(T),
|
||||
Borrowed(&'r T),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Loan<'owner, O: Loaner>
|
||||
{
|
||||
owner: MaybeRef<'owner, O>,
|
||||
item: O::Item<'owner>,
|
||||
}
|
||||
|
||||
impl<'o, O> Loaned<'o, O> for Loan<'o, O>
|
||||
where O: Loaner
|
||||
{
|
||||
fn try_release(self) -> Result<(), <O as Loaner>::Error> {
|
||||
self.item.try_release()
|
||||
}
|
||||
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), <O as Loaner>::Error> {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Loaned<'owner, Owner: ?Sized + 'owner>: Sized
|
||||
where Owner: Loaner
|
||||
{
|
||||
//fn owner(&self) -> Option<&Owner>;
|
||||
|
||||
fn try_release(self) -> Result<(), Owner::Error>;
|
||||
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), Owner::Error>
|
||||
where Self: 'owner; // XXX: What? 'owner or 'static??? I think it's `'owner` == `'static` but how tf do I write that?
|
||||
}
|
||||
|
||||
pub trait Loaner {
|
||||
type Item<'owner>: Loaned<'owner, Self>
|
||||
where Self: 'owner;
|
||||
type Error: error::Error + Send + 'static;
|
||||
|
||||
fn try_loan(&self) -> Result<Self::Item<'_>, Self::Error>;
|
||||
fn try_loan_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, Self::Item<'static>, Self::Error>
|
||||
where Self: 'static;
|
||||
|
||||
fn try_release(&self, item: Self::Item<'_>) -> OwnedResult<Self::Item<'_>, (), Self::Error>;
|
||||
}
|
||||
|
||||
compile_error!("This `trait Loan` design isn't going to fucking work ofc...")
|
||||
};
|
||||
|
||||
/// Thread pooling
|
||||
#[cfg(feature="threads")]
|
||||
pub mod thread_pool {
|
||||
use super::*;
|
||||
|
||||
use std::{
|
||||
mem::{
|
||||
self,
|
||||
ManuallyDrop,
|
||||
},
|
||||
thread::{
|
||||
self,
|
||||
Thread, ThreadId,
|
||||
},
|
||||
ops::Drop,
|
||||
sync::{Arc, Weak,},
|
||||
|
||||
};
|
||||
use parking_lot::{
|
||||
Condvar,
|
||||
Mutex,
|
||||
};
|
||||
use crossbeam::{
|
||||
queue::ArrayQueue,
|
||||
channel,
|
||||
};
|
||||
|
||||
//TODO: Imlement a oneshot channel: `oneshot::{Sender, Receiver`; works like Tokio's `sync::oneshot`, but is sync, not async: Can use `{Sender,Receiver}slot: {Arc, Weak}<Mutex<MaybeUninit<T>>>, `waiter: {Weak/_PinnedPtrIntoArc<Pin<slot>>(avoids 2nd alloc)/Weak,Arc<_, A = SenderSharedBumpAllocator*>,Arc}<Condvar>` (NOTE: Bump allocator crate name `bumpalo`.)
|
||||
|
||||
//TODO: Implement `ThreadPool<'scope = 'static>` like below, but instead use `crossbeam::channel` mpsc to send commands to unparked threads (i.e. loaned threads from the pool that have an active user handle.)
|
||||
// The design will be far simpler, since the Arc/Weak, send+recv stuff will be handled fine by `channel` on both ends. The only thing we need dtors for is `force_push()`ing threads back into their pool's ringbuffer, and for force-unparking threads from a pool that are removed (same as loan, except sender should be dropped before unpark happens.)
|
||||
|
||||
#[cfg(feature="_thread-pool-no-mpsc")]
|
||||
const _: () = {
|
||||
#[derive(Debug)]
|
||||
pub enum ThreadHandleKind<'scope, T>
|
||||
{
|
||||
Detached(Option<ThreadId>),
|
||||
Attached(thread::JoinHandle<T>),
|
||||
Scoped(thread::ScopedJoinHandle<'scope, T>),
|
||||
}
|
||||
|
||||
impl<'scope, T> ThreadHandleKind<'scope, T>
|
||||
{
|
||||
/// Get a reference to the attached thread (if there is one.)
|
||||
#[inline]
|
||||
pub fn thread(&self) -> Option<&Thread>
|
||||
{
|
||||
Some(match self {
|
||||
Self::Scoped(s) => s.thread(),
|
||||
Self::Attached(s) => s.thread(),
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Is this handle attached to a thread?
|
||||
///
|
||||
/// If it is *attached*, this means the thread *should not* outlive the instance (unless the instance is `'static`.)
|
||||
#[inline]
|
||||
pub const fn is_attached(&self) -> bool
|
||||
{
|
||||
match self {
|
||||
Self::Attached(_) |
|
||||
Self::Scoped(_) => true,
|
||||
Self::Detached(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this handle no longer referring to *any* thread?
|
||||
///
|
||||
/// In a *poisoned* state, the handle is useless.
|
||||
#[inline]
|
||||
pub const fn is_poisoned(&self) -> bool
|
||||
{
|
||||
match self {
|
||||
Self::Detached(None) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the thread id of the handle, if there is a referred to thread.
|
||||
#[inline]
|
||||
pub fn id(&self) -> Option<ThreadId>
|
||||
{
|
||||
Some(match self {
|
||||
Self::Scoped(x) => x.thread().id(),
|
||||
Self::Attached(x) => x.thread().id(),
|
||||
Self::Detached(Some(id)) => *id,
|
||||
_ => return None
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a raw thread handle
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct ThreadHandle<'scope, T>(ThreadHandleKind<'scope, T>);
|
||||
|
||||
//TODO: forward `ThreadHandle<'s, T>` methods to `ThreadHandleKind<'s, T>`: `thread(), id(), is_attached()`. Add methods `is_running(), join(), try_join(),` etc...
|
||||
|
||||
impl<'s, T> Drop for ThreadHandle<'s, T>
|
||||
{
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
match mem::replace(&mut self.0, ThreadHandleKind::Detached(None)) {
|
||||
ThreadHandleKind::Attached(a) => drop(a.join()), // Attempt join, ignore panic.
|
||||
ThreadHandleKind::Scoped(s) => drop(s.join().unwrap_or_resume()), // Attempt join, carry panic back through into scope.
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum PooledSendContext<T: Send, F: ?Sized + Send>
|
||||
{
|
||||
/// An execute directive `Err(Some(func))` is taken by the thread and replaced by `Ok(func())`.
|
||||
///
|
||||
/// # Direction
|
||||
/// Bidirectional.
|
||||
/// (Handle -> Thread; Thread -> Handle)
|
||||
Execute(Result<T, Option<Box<F>>>),
|
||||
/// Pass a caught panic within an `Execute(Err(Some(F)))` payload back from the thread.
|
||||
///
|
||||
/// # Direction
|
||||
/// Thread -> Handle
|
||||
Panic(UnwindPayload),
|
||||
/// When the loaned `PooledThread` is moved back into the pool, tell the thread to park itself again.
|
||||
///
|
||||
/// # Direction
|
||||
/// Handle -> Thread
|
||||
Park,
|
||||
|
||||
/// The default invariant used for when an operation has been taken by a thread.
|
||||
///
|
||||
/// # When `Taken`.
|
||||
/// The replacement is always `Taken` after a `take(&mut self)` that contains a value.
|
||||
/// The `Park` directive will not be replaced to `Taken`.
|
||||
Taken,
|
||||
|
||||
//NOTE: `Close` variant not needed because `handle` will have a `Weak` reference to `callback_passage`; when the upgrade fails, the thread will exit.
|
||||
}
|
||||
|
||||
impl<T: Send, F: ?Sized + Send> PooledSendContext<T, F>
|
||||
{
|
||||
pub fn take(&mut self) -> Option<Self>
|
||||
{
|
||||
Some(match self {
|
||||
//XXX: I don't think not replacing cloneable variants is a good or useful default. (see below V)
|
||||
//// Clone non-data-holding variants.
|
||||
//Self::Park => Self::Park,
|
||||
//// If the ctx has already been taken, return `None`.
|
||||
//Self::Taken => return None,
|
||||
// Move out of data-holding variants.
|
||||
_ => mem::replace(self, Self::Taken)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A loanable thread that belongs to a pool.
|
||||
///
|
||||
/// # Pooling
|
||||
/// When taken from a `ThreadPool`, the object is *moved* out of the thread-pool's ring-buffer and then pinned in return (See `PooledThread`.)
|
||||
///
|
||||
/// To return the thread to the pool, the `callback_passage` writes a `Park` command to the thread, unpins itself, and then on drop moves itself back into the start of the ring-buffer of the thread pool `owner`
|
||||
#[derive(Debug)]
|
||||
struct PooledThreadHandle<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||
where F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope
|
||||
{
|
||||
|
||||
/// Passes `F` to `handle` and retrieves `T`.
|
||||
callback_passage: Arc<(Condvar, Mutex<PooledSendContext<T, F>>)>,
|
||||
/// The backing thread, which when started parks itself.
|
||||
/// The `PooledThread` object will unpark the thread when the object is loaned (moved) out of the pool, and re-parked when enter
|
||||
///
|
||||
/// Will wait on `callback_passage?.0` for `Execute` directives, pass them back to the handle, and when a `Park` directive is recieved (i.e. when moved back into the pool) the thread will park itself and wait to be unparked when moved out of the pool or closed again.
|
||||
///
|
||||
/// If `callback_message` is dropped, the thread will exit when unparked (dropped from pool.)
|
||||
//TODO: Ensure unpark on drop?
|
||||
handle: ThreadHandle<'scope, Result<(), ()>>,
|
||||
}
|
||||
|
||||
impl<'scope, T, F:?Sized> PooledThreadHandle<'scope, T, F>
|
||||
where F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope
|
||||
{
|
||||
fn send_to_thread_raw(&self, message: PooledSendContext<T, F>) -> bool
|
||||
{
|
||||
let mut ctx = self.callback_passage.1.lock();
|
||||
*ctx = message;
|
||||
self.callback_passage.0.notify_one()
|
||||
}
|
||||
fn send_from_thread_raw(&self, message: PooledSendContext<T, F>)
|
||||
{
|
||||
let mut ctx = self.callback_passage.1.lock();
|
||||
*ctx = message;
|
||||
}
|
||||
|
||||
fn read_in_thread_raw(&self, wait_for: bool) -> Option<PooledSendContext<T, F>>
|
||||
{
|
||||
let mut ctx = self.callback_passage.1.lock(); // NOTE: Should we fair lock here?
|
||||
// Wait on condvar if `wait_for` is true.
|
||||
if wait_for {
|
||||
self.callback_passage.0.wait(&mut ctx);
|
||||
}
|
||||
// Then replace the ctx with `Taken`.
|
||||
ctx.take()
|
||||
}
|
||||
fn try_read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
|
||||
{
|
||||
let mut ctx = self.callback_passage.1.try_lock()?;
|
||||
ctx.take()
|
||||
}
|
||||
fn read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
|
||||
{
|
||||
let mut ctx = self.callback_passage.1.lock();
|
||||
ctx.take()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PooledThread<'scope, 'pool: 'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||
where F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope
|
||||
{
|
||||
/// The owner to return `thread` to on drop.
|
||||
owner: Option<&'pool ThreadPool<'scope, T, F>>,
|
||||
/// The pinned thread handle that is returned on drop.
|
||||
thread: ManuallyDrop<PooledThreadHandle<'scope, T, F>>, //XXX: Pin<> needed (or even useful at all) here?
|
||||
}
|
||||
|
||||
impl<'scope, 'pool: 'scope, T, F:?Sized> Drop for PooledThread<'scope, 'pool, T, F>
|
||||
where F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
let Some(loaned_from) = self.owner else {
|
||||
// There is no owner.
|
||||
// Instead, unpark the thread after drop.
|
||||
let PooledThreadHandle { callback_passage, handle } = unsafe { ManuallyDrop::take(&mut self.thread) };
|
||||
|
||||
// Drop the Arc, so when `thread` wakes from unpark, it will fail to acquire context.
|
||||
drop(callback_passage);
|
||||
|
||||
if let Some(thread) = handle.0.thread() {
|
||||
thread.unpark();
|
||||
}
|
||||
|
||||
return todo!("How do we know *when* to unpark? How can we unpark *after* dropping the Arc?");
|
||||
};
|
||||
|
||||
// There is an owner. Tell the thread to park itself.
|
||||
let mut handle = unsafe { ManuallyDrop::take(&mut self.thread) };
|
||||
handle.send_to_thread_raw(PooledSendContext::Park);
|
||||
|
||||
// Read the message object, take it from the thread after it notifies us it is about to park.
|
||||
let ctx = {
|
||||
let mut ctx = handle.callback_passage.1.lock();
|
||||
// Before thread parks, it should signal condvar, and callback_passage should not be unique again yet.
|
||||
handle.callback_passage.0.wait_while(&mut ctx, |_| Arc::strong_count(&handle.callback_passage) > 1);
|
||||
ctx.take()
|
||||
};
|
||||
|
||||
// Force insert back into thred pool.
|
||||
if let Some( PooledThreadHandle { callback_passage, handle }) = loaned_from.pool_queue.force_push(handle) {
|
||||
// If one is removed, drop the context..
|
||||
drop(callback_passage);
|
||||
// Then force-unpark the thread.
|
||||
match handle.0.thread() {
|
||||
Some(thread) => thread.unpark(),
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
|
||||
todo!("Should we force `join()` here? Or allow non-scoped handles to be forgotten?")
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPool<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
|
||||
where F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope
|
||||
{
|
||||
pool_queue: ArrayQueue<PooledThreadHandle<'scope, T, F>>,
|
||||
}
|
||||
};
|
||||
|
||||
//TODO: Build a ThreadPool<'scope = 'static> using crossbeam::ArrayQueue<...> (XXX: See phone for details on how pool should use un/parking for loaning; and CondVar for passing tasks.) Also, add a `Loan` trait with assoc type `Item<'owner>` and methods `try_loan(&self) -> Result<Self::Item<'_>, ...>`, `return(&self, item: Self::Item<'_>) -> Result<(), ...>` and `try_loan_owned(self: Arc<Self>) -> `MovedResult<Arc<Self>, OwnedLoan<'static, Self>, ...>` (struct OwnedLoad<'scope, L>{ item: Option<L::Item<'scope>> } )`, etc.
|
||||
}
|
@ -0,0 +1,304 @@
|
||||
//! Extension traits, global functions + macros.
|
||||
#![allow(unused)]
|
||||
use super::*;
|
||||
use std::convert::Infallible;
|
||||
|
||||
pub use std::{
|
||||
convert::{
|
||||
TryFrom, TryInto,
|
||||
},
|
||||
borrow::*,
|
||||
};
|
||||
|
||||
/// Add forwarding borrow + deref (+ optional `into_inner()`) impls for a type.
|
||||
///
|
||||
/// # Usage
|
||||
/// For a mutable forwarding newtype:
|
||||
/// ```
|
||||
/// # use crate::forward_newtype;
|
||||
/// # use core::borrow::*;
|
||||
///
|
||||
/// /// A mutable buffer newtype over an array.
|
||||
/// struct Buffer([u8; 16]);
|
||||
/// forward_newtype!(mut Buffer => [u8], 0); // Generates `Borrow<[u8]>`, `BorrowMut<[u8]>`, `Deref<Target=[u8]>`, and `DerefMut` impls for `Buffer` that return `<&[mut] self.>0` (as specified by `0`.)
|
||||
/// ```
|
||||
///
|
||||
/// For an immutable forwarding newtype:
|
||||
/// ```
|
||||
/// # use crate::forward_newtype;
|
||||
/// # use core::borrow::*;
|
||||
///
|
||||
/// /// A mutable buffer newtype over an array.
|
||||
/// struct Data([u8; 16]);
|
||||
/// forward_newtype!(ref Buffer => [u8], 0); // Generates `Borrow<[u8]>` and `Deref<Target=[u8]>` impls for `Buffer` that return `<& self.>0` (as specified by `0`.) Immutable access only is specified by `ref`.
|
||||
/// ```
|
||||
///
|
||||
/// ## Consuming into inner
|
||||
/// To generate an `into_inner(self) -> T` inherent impl for the type, the syntax `forward_newtype!(move [const] Type => Inner, accessor)` can be used.
|
||||
/// If `const` is passed, then the `into_inner()` function will be a `const fn`, if not, then it won't be.
|
||||
///
|
||||
/// To combine with ref-forwarding accessors, the syntax `forward_newtype!(move [const] {ref/mut} Type => Inner, accessor)` can be used to generate them all; the `Borrow`, `BorrowMut`, `Deref`, `DerefMut` and `pub [const] fn into_inner()`.
|
||||
/// This is the most likely to be useful.
|
||||
///
|
||||
/// If you need a seperate `into_inner()` impl, you can either not use the `move` declarator, or use the `ref`/`mut` accessor generator in a different statement than the `move` one:
|
||||
/// ```
|
||||
/// # use crate::forward_newtype;
|
||||
/// # use core::borrow::*;
|
||||
///
|
||||
/// /// A mutable buffer newtype over an array.
|
||||
/// struct Buffer([u8; 16]);
|
||||
/// forward_newtype!(mut Buffer => [u8], 0); // Generate a mutable & immutable forwarding ref to a slice of bytes.
|
||||
/// forward_newtype!(move const Buffer => [u8; 16], 0); // Generate a seperately typed `into_inner()` that returns the sized array.
|
||||
/// ```
|
||||
macro_rules! forward_newtype {
|
||||
(ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
impl Borrow<$inner> for $type
|
||||
{
|
||||
#[inline]
|
||||
fn borrow(&self) -> &$inner
|
||||
{
|
||||
&self.$($expr)+
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::ops::Deref for $type {
|
||||
type Target = $inner;
|
||||
#[inline]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.borrow()
|
||||
}
|
||||
}
|
||||
};
|
||||
(mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||
|
||||
impl BorrowMut<$inner> for $type
|
||||
{
|
||||
#[inline]
|
||||
fn borrow_mut(&mut self) -> &mut $inner
|
||||
{
|
||||
&mut self.$($expr)+
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::ops::DerefMut for $type {
|
||||
#[inline]
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.borrow_mut()
|
||||
}
|
||||
}
|
||||
};
|
||||
(move const $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
impl $type {
|
||||
/// Consume into the inner value.
|
||||
pub const fn into_inner(self) -> $inner {
|
||||
self.$($expr)+
|
||||
}
|
||||
}
|
||||
};
|
||||
(move $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
impl $type {
|
||||
/// Consume into the inner value.
|
||||
pub fn into_inner(self) -> $inner {
|
||||
self.$($expr)+
|
||||
}
|
||||
}
|
||||
};
|
||||
(move const ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
forward_newtype!(move const $type => $inner, $($expr)+);
|
||||
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||
};
|
||||
(move ref $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
forward_newtype!(move $type => $inner, $($expr)+);
|
||||
forward_newtype!(ref $type => $inner, $($expr)+);
|
||||
};
|
||||
(move const mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
forward_newtype!(move const $type => $inner, $($expr)+);
|
||||
forward_newtype!(mut $type => $inner, $($expr)+);
|
||||
};
|
||||
(move mut $type:ty => $inner:ty, $($expr:tt)+) => {
|
||||
forward_newtype!(move $type => $inner, $($expr)+);
|
||||
forward_newtype!(mut $type => $inner, $($expr)+);
|
||||
};
|
||||
}
|
||||
|
||||
/// The default bottom type.
|
||||
///
|
||||
/// To use the `unwrap_infallible()`-like interface, functions that return `-> !` should be changed to `-> Never`.
|
||||
/// When `unstable` is enabled, this is an alias to `!` and `-> !` is not special cased.
|
||||
///
|
||||
/// # As return argument
|
||||
/// When feature `unstable` is enabled, `into_unreachable()` may not be required to ensure propogation to `!` from a function returning `-> Never`.
|
||||
#[cfg(feature="unstable")]
|
||||
pub type Never = !;
|
||||
|
||||
/// The default bottom type.
|
||||
///
|
||||
/// To use the `unwrap_infallible()`-like interface, functions special cased to `-> !` should be changed to `-> Never`.
|
||||
///
|
||||
/// # As return argument
|
||||
/// When feature `unstable` is not enabled, `into_unreachable()` may be required to be used when dealing with return bottom types other than the special case `-> !`.
|
||||
/// This is a current limitation of the type system.
|
||||
#[cfg(not(feature="unstable"))]
|
||||
pub type Never = Infallible;
|
||||
|
||||
/// Contractually ensures this type cannot exist (i.e. it is a bottom type.)
|
||||
///
|
||||
/// # Safety
|
||||
/// Instances of the impl type **cannot exist**.
|
||||
/// They must be bottom types (i.e. empty enums, types contatining an `Infallible` / `!` object, etc.)
|
||||
///
|
||||
/// # Auto-impl
|
||||
/// This trait is not intended to be implemented on any user-defined type other than empty enums.
|
||||
///
|
||||
/// By default it is implemented for the following types:
|
||||
/// - `core::convert::Infallible`
|
||||
/// - `!` (**feature**: `unstable`)
|
||||
/// - `Box<T>` *where* `T: ?Sized + Unreachable`
|
||||
pub unsafe trait Unreachable {
|
||||
/// Force control flow to terminate type checking here.
|
||||
///
|
||||
/// # Note
|
||||
/// This function will never be executed, it is used to terminate the value's existence in the type system, by converting it from any `Unreachable` type into the bottom return type `!`.
|
||||
/// If this function ever **can** be called at all, it is undefined behaviour.
|
||||
#[inline]
|
||||
#[cold]
|
||||
fn into_unreachable(self) -> !
|
||||
where Self: Sized
|
||||
{
|
||||
if cfg!(debug_assertions) {
|
||||
unreachable!("Unreachable conversion from {}!", std::any::type_name::<Self>())
|
||||
} else {
|
||||
// SAFETY: Contractually enforced by the trait impl itself.
|
||||
unsafe {
|
||||
std::hint::unreachable_unchecked()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Unreachable for Infallible {
|
||||
#[inline(always)]
|
||||
#[cold]
|
||||
fn into_unreachable(self) -> ! {
|
||||
match self {}
|
||||
}
|
||||
}
|
||||
#[cfg(feature="unstable")]
|
||||
unsafe impl Unreachable for ! {
|
||||
#[inline(always)]
|
||||
#[cold]
|
||||
fn into_unreachable(self) -> ! {
|
||||
match self {}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<T: ?Sized + Unreachable> Unreachable for Box<T> {}
|
||||
|
||||
pub trait UnwrapPanicExt<T, E> {
|
||||
/// Unwrap the result `Ok` value or panic as described by the non-returning function `F`, with the `Unreachable` bottom type `N`.
|
||||
/// This will usually be an `-> !` function, (or an `-> Never` function using the `Unreachable` interface.)
|
||||
///
|
||||
/// # Panic usage
|
||||
/// `func` must not return. It should panic, resume a panic, or exit the thread/program, trap, or terminate in an infinite loop.
|
||||
///
|
||||
/// It does not *have* to call `panic!()` if it terminates in another way that is not a panic, however.
|
||||
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T;
|
||||
|
||||
/// Unwrap the result `Ok` value or panic as described by the non-returning function `func` with the default bottom type `Never`.
|
||||
#[inline(always)]
|
||||
fn unwrap_or_panic_unreachable<F: FnOnce(E) -> Never>(self, func: F) -> T
|
||||
where Self: Sized {
|
||||
self.unwrap_or_panic::<Never, _>(func)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait UnwrapInfallibleExt<T> {
|
||||
/// Unwrapping is infallible and therefore safe to do so without checking.
|
||||
fn unwrap_infallible(self) -> T;
|
||||
}
|
||||
|
||||
pub trait UnwrapPanicResumeExt<T> {
|
||||
/// Unwrap or resume a previous unwind, with the unwind payload in the `Err` variant.
|
||||
fn unwrap_or_resume(self) -> T;
|
||||
}
|
||||
|
||||
impl<T, E> UnwrapPanicExt<T, E> for Result<T, E>
|
||||
{
|
||||
#[inline]
|
||||
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T {
|
||||
#[inline(never)]
|
||||
#[cold]
|
||||
fn _do_panic<Nn: Unreachable, Ee, Ff: FnOnce(Ee) -> Nn>(error: Ee, func: Ff) -> !
|
||||
{
|
||||
func(error).into_unreachable()
|
||||
}
|
||||
|
||||
match self {
|
||||
Ok(v) => v,
|
||||
Err(e) => _do_panic(e, func)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E: Unreachable> UnwrapInfallibleExt<T> for Result<T, E>
|
||||
{
|
||||
#[inline]
|
||||
fn unwrap_infallible(self) -> T {
|
||||
match self {
|
||||
Ok(v) => v,
|
||||
Err(e) => if cfg!(debug_assertions) {
|
||||
e.into_unreachable()
|
||||
} else {
|
||||
// SAFETY: Contract bound of `E: Unreachable` ensures this path will never be taken.
|
||||
unsafe {
|
||||
std::hint::unreachable_unchecked()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The type of a caught unwind payload.
|
||||
pub type UnwindPayload = Box<dyn std::any::Any + Send>;
|
||||
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn _resume_unwind<E: Into<UnwindPayload>>(e: E) -> !
|
||||
{
|
||||
std::panic::resume_unwind(e.into())
|
||||
}
|
||||
|
||||
impl<T, E: Into<UnwindPayload>> UnwrapPanicResumeExt<T> for Result<T, E>
|
||||
{
|
||||
#[inline]
|
||||
fn unwrap_or_resume(self) -> T {
|
||||
match self {
|
||||
Ok(v) => v,
|
||||
Err(e) => _resume_unwind(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ArcExt
|
||||
{
|
||||
/// If the strong-count == 1 and the weak-count is 0.
|
||||
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool;
|
||||
/// If the strong-count == 1.
|
||||
///
|
||||
/// If there are alive `Weak`s pointing to this object, those are not considered.
|
||||
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool;
|
||||
}
|
||||
|
||||
impl<T: ?Sized> ArcExt for T
|
||||
{
|
||||
#[inline]
|
||||
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool {
|
||||
std::sync::Arc::strong_count(&self) == 1 &&
|
||||
std::sync::Arc::weak_count(&self) == 0
|
||||
}
|
||||
#[inline]
|
||||
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool {
|
||||
std::sync::Arc::strong_count(&self) == 1
|
||||
}
|
||||
}
|
@ -0,0 +1,426 @@
|
||||
//! Partitioning even areas by delimitor byte.
|
||||
use super::*;
|
||||
use std::{
|
||||
num::NonZeroUsize,
|
||||
};
|
||||
|
||||
/// Size of one cache-line.
|
||||
///
|
||||
/// NOTE: alignment padded for `u8`.
|
||||
///
|
||||
/// TODO: Make this comptime env-var configurable (`option_env!()`) on debug builds. (See `SEARCH_CAP_GROW`.)
|
||||
const CACHELINE_SIZE: usize = std::mem::size_of::<crossbeam_utils::CachePadded<u8>>();
|
||||
|
||||
/// A buffer that takes up exactly one cache-line.
|
||||
///
|
||||
/// This type is not `Copy` to ensure copies are made safely. `clone()` is trivial, and to copy explicitly in a const context use `.copied()`
|
||||
///
|
||||
/// # Alignment
|
||||
/// Note that the buffer is *not* 1-cacheline aligned itself by default.
|
||||
/// To ensure its alignment, you should use `crossbeam_utils::CachePadded<CachelineBuffer>` (or the type-alias `AlignedCachelineBuffer`.)
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
#[repr(transparent)]
|
||||
pub struct CachelineBuffer([u8; CACHELINE_SIZE]);
|
||||
|
||||
impl Default for CachelineBuffer
|
||||
{
|
||||
#[inline]
|
||||
fn default() -> Self
|
||||
{
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// A buffer that takes up exactly one cache-line, which is itself aligned to 1 cacheline.
|
||||
pub type AlignedCachelineBuffer = crossbeam_utils::CachePadded<CachelineBuffer>;
|
||||
|
||||
impl CachelineBuffer {
|
||||
/// The size of the buffer (1 cacheline of bytes.)
|
||||
pub const SIZE: usize = CACHELINE_SIZE;
|
||||
|
||||
/// Create a new, empty buffer.
|
||||
#[inline]
|
||||
pub const fn new() -> Self
|
||||
{
|
||||
Self([0; Self::SIZE])
|
||||
}
|
||||
|
||||
/// Clone this value.
|
||||
///
|
||||
/// This is a `const fn` explicit trivial copy of the data.
|
||||
#[inline]
|
||||
pub const fn copied(&self) -> Self
|
||||
{
|
||||
Self(self.0)
|
||||
}
|
||||
|
||||
/// Get a reference to the byte array.
|
||||
pub const fn as_bytes(&self) -> &[u8; Self::SIZE]
|
||||
{
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
forward_newtype!(mut CachelineBuffer => [u8], 0);
|
||||
forward_newtype!(move const CachelineBuffer => [u8; CACHELINE_SIZE], 0);
|
||||
|
||||
const _: () = {
|
||||
debug_assert!(CachelineBuffer::SIZE > std::mem::size_of::<u8>(), "Invalid cacheline-padding size (`CACHELINE_SIZE`)");
|
||||
//debug_assert!(CACHELINE_SIZE == 128, "Unexpected `CACHELINE_SIZE`");
|
||||
};
|
||||
|
||||
/// Grow capacity exponentially when search fails.
|
||||
///
|
||||
/// TODO: Make this comptime env-var settable (`option_env!()`) on debug builds.
|
||||
const SEARCH_CAP_GROW: bool = true;
|
||||
|
||||
/// Settings for a searcher (memory search method configuration.)
|
||||
///
|
||||
/// The default values provided to implementors are globally controlled and (debug-build only) env-var configurable (for benchmarking purposes.)
|
||||
trait SynchonousSearcher {
|
||||
/// Initial size of capacity
|
||||
const CAP_SIZE: usize = CACHELINE_SIZE;
|
||||
|
||||
/// Should the capacity be grown on failed search?
|
||||
const CAP_GROW: bool = SEARCH_CAP_GROW;
|
||||
}
|
||||
|
||||
// Default impl global compiled capacity settings for each.
|
||||
impl SynchonousSearcher for SearchPar {}
|
||||
impl SynchonousSearcher for SearchSeq {}
|
||||
|
||||
/// Midpoint searcher (forwards & backwards)
|
||||
trait MidpointFBSearcher<T=u8>: SynchonousSearcher
|
||||
{
|
||||
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
|
||||
|
||||
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
|
||||
}
|
||||
|
||||
/// Search the pivot for the needle sequentially.
|
||||
///
|
||||
/// The order of operations will be: `search_forward()?, search_backward()`.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct SearchSeq;
|
||||
|
||||
#[inline]
|
||||
fn get_max_cap_for_search_area(size: usize) -> Option<NonZeroUsize>
|
||||
{
|
||||
SYS_PAGE_SIZE.and_then(move |page| if size == 0 {
|
||||
// Size is unknown, bound by page.
|
||||
Some(page)
|
||||
} else if size >= (page.get() << 2) {
|
||||
// Size is huge, bound by page ^2
|
||||
NonZeroUsize::new(page.get() << 1)
|
||||
} else if size >= page.get() {
|
||||
// Size is larger than page, bound by page.
|
||||
Some(page)
|
||||
} else {
|
||||
// If the area size is lower than one page, do not bound the capacity growth.
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
impl MidpointFBSearcher<u8> for SearchSeq
|
||||
{
|
||||
#[inline(always)]
|
||||
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||
}
|
||||
#[inline(always)]
|
||||
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||
let max_cap = get_max_cap_for_search_area(haystack.len());
|
||||
|
||||
match haystack.split_at(begin) {
|
||||
([], []) => None,
|
||||
([], x) => self.search_forward(x, needle),
|
||||
(x, []) => self.search_backward(x, needle),
|
||||
|
||||
// If both the buffers are lower than `max_cap`, just do the entire operation on each
|
||||
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
|
||||
self.search_forward(y, needle)?;
|
||||
self.search_backward(x, needle)
|
||||
},
|
||||
|
||||
(mut x, mut y) => {
|
||||
let len = std::cmp::min(x.len(), y.len());
|
||||
let mut cap = std::cmp::min(len, Self::CAP_SIZE);
|
||||
|
||||
if let Some(&max) = max_cap.as_ref() {
|
||||
// Bound `cap` to `max_cap` if it is set.
|
||||
cap = std::cmp::min(cap, max.get());
|
||||
}
|
||||
|
||||
while cap <= len {
|
||||
// If cap is too large for one (or more) of the buffers, truncate it.
|
||||
if cap > y.len() || cap > x.len() {
|
||||
cap = std::cmp::min(y.len(), x.len());
|
||||
}
|
||||
|
||||
// Search forwards in `y`. (up to `cap`)
|
||||
if let Some(y) = self.search_forward(&y[..cap], needle) {
|
||||
return Some(y);
|
||||
}
|
||||
// Search backwards in `x`. (down to `cap`)
|
||||
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
|
||||
return Some(x);
|
||||
}
|
||||
|
||||
// Cut out `cap` bytes from the start of forwards
|
||||
y = &y[cap..];
|
||||
// Cut out `cap` bytes from the end of backwards.
|
||||
x = &x[..cap];
|
||||
|
||||
if Self::CAP_GROW {
|
||||
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(feature="async")]
|
||||
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
|
||||
#[derive(Debug, Clone)]
|
||||
struct SearchAsync<F>
|
||||
{
|
||||
spawn_task: F,
|
||||
result: oneshot::Receiver<usize>,
|
||||
}
|
||||
|
||||
#[cfg(feature="threads-async")]
|
||||
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
|
||||
where F: Fn() -> Fu,
|
||||
Fu: futures::Future + Send + Sync + 'static
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Search in parallel.
|
||||
///
|
||||
/// # Warning
|
||||
/// This search operation is heavy. It **always** spawns its own 2nd thread when `search_combined()` is invoked.
|
||||
/// This may not be ideal... A lighter, thread-pool (async) or thread-reusing (sync) API would be better. (See below.)
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct SearchPar;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
/// Real system page size (raw.)
|
||||
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
|
||||
use std::ffi::c_int;
|
||||
extern "C" {
|
||||
fn getpagesize() -> c_int;
|
||||
}
|
||||
unsafe {
|
||||
getpagesize()
|
||||
}
|
||||
};
|
||||
/// System page size.
|
||||
///
|
||||
/// If the page size returned from `getpagesize()` (`REAL_PAGE_SIZE`) was invalid (below-or-equal to 0,) `None` will be returned.
|
||||
static ref SYS_PAGE_SIZE: Option<NonZeroUsize> = {
|
||||
match *REAL_PAGE_SIZE {
|
||||
std::ffi::c_int::MIN..=0 => None,
|
||||
// SAFETY: We have masked out `0` in the above branch.
|
||||
rest => unsafe {
|
||||
debug_assert!(usize::try_from(rest).is_ok(), "Page size `c_int` out of range of system `usize`??? (Got {})", rest);
|
||||
Some(NonZeroUsize::new_unchecked(rest as usize))
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(feature="threads")]
|
||||
impl MidpointFBSearcher<u8> for SearchPar
|
||||
{
|
||||
#[inline(always)]
|
||||
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
|
||||
}
|
||||
#[inline(always)]
|
||||
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
|
||||
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
|
||||
}
|
||||
|
||||
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
|
||||
|
||||
let complete = crossbeam::atomic::AtomicCell::new(false);
|
||||
std::thread::scope(|s| {
|
||||
//let mut complete_val = UnsafeCell::new(false);
|
||||
//let complete: parking_lot::Once = parking_lot::Once::new();
|
||||
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
|
||||
|
||||
let (mut hb, mut hf) = haystack.split_at(begin);
|
||||
|
||||
let max_cap = get_max_cap_for_search_area(haystack.len());
|
||||
|
||||
// Cap the cap to `max_cap` if there is a max cap.
|
||||
let cap = if let Some(max) = max_cap.as_ref() {
|
||||
std::cmp::min(max.get(), Self::CAP_SIZE)
|
||||
} else {
|
||||
Self::CAP_SIZE
|
||||
};
|
||||
|
||||
let forward = if hf.len() > 0 {
|
||||
let cap = cap;
|
||||
let sf = &self;
|
||||
let complete = &complete;
|
||||
// Background thread: Forward search (`forward-searcher`.)
|
||||
Some(std::thread::Builder::new().name("forward-searcher".into()).spawn_scoped(s, move || -> Option<_> {
|
||||
let mut cap = std::cmp::min(cap, hf.len());
|
||||
let len = hf.len();
|
||||
|
||||
// Check completion before starting loop too.
|
||||
if complete.load() {
|
||||
return None;
|
||||
}
|
||||
while cap <= len {
|
||||
// If `cap` is larger than the buffer `hf`, truncate it.
|
||||
cap = std::cmp::min(cap, hf.len());
|
||||
// Search forward in `hf` up to `cap` bytes.
|
||||
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
|
||||
// Tell other operation we have found something.
|
||||
complete.store(true);
|
||||
return Some(x);
|
||||
} else if complete.load() {
|
||||
break;
|
||||
}
|
||||
// Cut out `cap` bytes from the start.
|
||||
hf = &hf[cap..];
|
||||
if Self::CAP_GROW {
|
||||
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||
}
|
||||
}
|
||||
None::<&'a u8>
|
||||
}).expect("Failed to spawn forward-searcher thread"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
//NOTE: There is no need to spawn another thread for the 2nd operation, since they are both join()'d at the end regardless and both already communicate completion.
|
||||
let backward = if hb.len() > 0 {
|
||||
let cap = cap;
|
||||
let sf = &self;
|
||||
let complete = &complete;
|
||||
|
||||
// Main thread: Backwards search.
|
||||
move || -> Option<_> {
|
||||
let mut cap = std::cmp::min(cap, hb.len());
|
||||
let len = hb.len();
|
||||
|
||||
// Check completion before starting loop too.
|
||||
if complete.load() {
|
||||
return None;
|
||||
} else {
|
||||
// Allow previous thread to run if it is not.
|
||||
std::thread::yield_now();
|
||||
}
|
||||
while cap <= len {
|
||||
// If `cap` is larger than the buffer `hb`, truncate it.
|
||||
cap = std::cmp::min(cap, hb.len());
|
||||
// Search backwards in `hb` up to `cap` bytes.
|
||||
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
|
||||
complete.store(true);
|
||||
return Some(x);
|
||||
} else if complete.load() {
|
||||
break;
|
||||
}
|
||||
// Cut out `cap` bytes from the end.
|
||||
hb = &hb[..cap];
|
||||
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
|
||||
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
|
||||
}
|
||||
None::<&'a u8>
|
||||
}()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if backward.is_some() && forward.as_ref().map(|th| !th.is_finished()).unwrap_or(false) {
|
||||
// `backward` found something, `forward` is still running.
|
||||
debug_assert_ne!(complete.load(), false, "Complete has not been set! (main thread waiting for forward-searcher thread");
|
||||
complete.store(true);
|
||||
}
|
||||
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn _resume_unwind(e: Box<dyn std::any::Any + Send>) -> Never
|
||||
{
|
||||
if cfg!(debug_assertions) {
|
||||
panic!("forward-searcher thread panic")
|
||||
} else {
|
||||
std::panic::resume_unwind(e)
|
||||
}
|
||||
}
|
||||
|
||||
match (forward, backward) {
|
||||
(None, None) => None,
|
||||
(None, back @ Some(_)) => back,
|
||||
(Some(forward), backward) => backward.or_else(move || forward.join().unwrap_or_panic(_resume_unwind)),
|
||||
//(Some(forward), Some(_)) => Handled ^
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
|
||||
where S: MidpointFBSearcher<u8>
|
||||
{
|
||||
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
|
||||
}
|
||||
|
||||
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
|
||||
|
||||
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
|
||||
|
||||
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
|
||||
#[cfg(test)]
|
||||
mod test
|
||||
{
|
||||
use super::*;
|
||||
use std::hint::black_box;
|
||||
//TODO: Add a generic randomised lorem-ipsum-like text data generator & a generic assertion tester that can take a unique `MidpointFBSearcher`.
|
||||
|
||||
#[test]
|
||||
fn partition_seq()
|
||||
{
|
||||
todo!("Test `SearchSeq` sequential partition searcher")
|
||||
}
|
||||
|
||||
#[cfg(feature="threads")]
|
||||
#[test]
|
||||
fn partition_par_heavy()
|
||||
{
|
||||
todo!("Test `SearchPar` parallel partition searcher")
|
||||
}
|
||||
|
||||
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
|
||||
#[cfg(all(feature="threads-async", feature = "threads"))]
|
||||
#[test]
|
||||
fn partition_par_light()
|
||||
{
|
||||
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
|
||||
}
|
||||
|
||||
#[cfg(feature="threads-async")]
|
||||
#[/*tokio::*/test]
|
||||
fn partition_par_async()
|
||||
{
|
||||
unimplemented!("A pure async parallel searcher has not yet been implemented")
|
||||
}
|
||||
|
||||
//TODO: Benchmarking the searchers' configuration about capacity size, growth and bounding.
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
//! Sync (inter-thread communication) helpers
|
||||
use super::*;
|
||||
use std::{
|
||||
sync::{
|
||||
Arc, Weak,
|
||||
},
|
||||
mem::{
|
||||
self,
|
||||
MaybeUninit,
|
||||
},
|
||||
ptr,
|
||||
fmt, error,
|
||||
};
|
||||
use parking_lot::{
|
||||
Condvar,
|
||||
Mutex,
|
||||
};
|
||||
|
||||
/// Send a single value across thread boundaries to a receiver.
|
||||
///
|
||||
/// This is a sync implementation of `tokio::sync::oneshot`.
|
||||
pub mod oneshot {
|
||||
use super::*;
|
||||
|
||||
/// Error when sending
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum SendError
|
||||
{
|
||||
/// The corresponding `Receiver<T>` channel has been dropped.
|
||||
///
|
||||
/// # Note
|
||||
/// This operation **cannot** be re-tried
|
||||
Closed,
|
||||
//TODO: Should we (or *can* we even?) have a `send_wait()` method?
|
||||
}
|
||||
|
||||
/// Error when receiving
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum RecvError
|
||||
{
|
||||
/// The corresponding `Sender<T>` channel has been dropped.
|
||||
///
|
||||
/// # Note
|
||||
/// This operation **cannot** be re-tried
|
||||
Closed,
|
||||
|
||||
/// The `recv()` call timed out before a value was sent.
|
||||
///
|
||||
/// This operation can be re-tried
|
||||
Timeout,
|
||||
|
||||
/// The `recv()` call was cancelled by a `StopToken` before a value was sent.
|
||||
///
|
||||
/// This operation can be re-tried
|
||||
//TODO: Maybe merge this and `Timeout`?
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
//TODO: Add impl Send/RecvError: `fn can_retry(&self) -> bool`
|
||||
|
||||
impl fmt::Display for SendError
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("send error")
|
||||
// TODO: Should we `match self` for detailed error messages here?
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RecvError
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("recv error")
|
||||
// TODO: Should we `match self` for detailed error messages here?
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for SendError{}
|
||||
impl error::Error for RecvError{}
|
||||
|
||||
impl error::Error for Error
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(match &self {
|
||||
Self::Recv(r) => r,
|
||||
Self::Send(s) => s,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("oneshot error")
|
||||
}
|
||||
}
|
||||
|
||||
/// An error using the `oneshot` channel.
|
||||
#[derive(Debug)]
|
||||
pub enum Error
|
||||
{
|
||||
/// An error regarding the sending of a value.
|
||||
Send(SendError),
|
||||
/// An error regarding the receiving of a value.
|
||||
Recv(RecvError),
|
||||
}
|
||||
|
||||
impl From<SendError> for Error
|
||||
{
|
||||
#[inline]
|
||||
fn from(from: SendError) -> Self
|
||||
{
|
||||
Self::Send(from)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RecvError> for Error
|
||||
{
|
||||
#[inline]
|
||||
fn from(from: RecvError) -> Self
|
||||
{
|
||||
Self::Recv(from)
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: impl fmt::Display error::Error for Try*Error[<T>]...
|
||||
impl<T> fmt::Display for TrySendError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "send error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> fmt::Display for TryRecvError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "recv error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TryRecvError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TrySendError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
impl<T> fmt::Display for TryError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "oneshot error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TryError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: XXX: We might also want explicit `Debug` impls for all `Try*Error<T>`s, since `T` is irrelevent to the `Error` part.
|
||||
|
||||
/// Error when attempting to send a value using a `try_` function.
|
||||
///
|
||||
/// The `Sender<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TrySendError<T>(SendError, Sender<T>);
|
||||
|
||||
/// Error when attempting to receive a value using a `try_` function.
|
||||
///
|
||||
/// The `Receiver<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TryRecvError<T>(RecvError, Receiver<T>);
|
||||
|
||||
/// An error when attempting a oneshot function using a consuming `try_` function.
|
||||
///
|
||||
/// The `Sender<T>`/`Receiver<T>` object(s) that originated this error are stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TryError<T>(Error, (Option<Sender<T>>, Option<Receiver<T>>));
|
||||
|
||||
//TODO: Make a `feature=unstable` version that is allocator-aware *and* re-uses the same allocation for both `Arc` creations (e.g. C++ polymorphic allocator; `bumpalo` or another simple implementation `Sync` bump-allocator would suffice.)
|
||||
/// Oneshot sender.
|
||||
///
|
||||
/// Sends one value of `T` to a corresponding `Receiver`, if it is still alive.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<T>
|
||||
{
|
||||
/// The value to write (`Sender`) to / read (`Receiver`) from.
|
||||
///
|
||||
/// Write is bound to `send.notify_one()`, read is bound to `send.wait()`.
|
||||
/// # Ownership
|
||||
///
|
||||
/// Note that `Receiver` has a `Weak` variant to hold this. It will fail to read if it cannot upgrade.
|
||||
/// If this slot is unique, then `send`s should fast-fail as there is no corresponding `Receiver` anyway.
|
||||
value: Weak<Mutex<MaybeUninit<T>>>,
|
||||
/// Sends a signal to the receiver to read from `value`.
|
||||
///
|
||||
/// # Ownership
|
||||
/// If this weak-ptr cannot be upgraded, then the `Receiver` ascosiated with this instance *cannot* be waiting on it, and therefore sending should fast-fail
|
||||
//XXX: Is this order of Sender: `Arc, Weak`, Receiver: `Weak, Arc` correct? Check and think about it before proceeding pls...
|
||||
//NOTE: It **is correct** to be this order.
|
||||
send: Arc<Condvar>,
|
||||
}
|
||||
|
||||
/// Oneshot receiver.
|
||||
///
|
||||
/// Receive one value of `T` from a corresponding `Sender`, if it is still alive.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver<T> {
|
||||
value: Arc<Mutex<MaybeUninit<T>>>,
|
||||
send: Weak<Condvar>,
|
||||
}
|
||||
}
|
Loading…
Reference in new issue