buffer: Added `ThreadPool<"scope = "static>` skeleton. Old design removed, redesign using `mpsc` (`crossbeam::channel`) instead of bare-bones channel impl we were using with `Arc<(Condvar, Mutex<...>)>` (See comment in `buffer::thread_pool` about previous impl"s shortcomings and needless complexity.)

Fortune for reverse's current commit: Curse − 凶
refactor
Avril 9 months ago
parent 0ce496cafd
commit bcaa9e0703
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -0,0 +1,356 @@
//! Buffering and resource pooling utilities
//!
//!
use super::*;
pub use bytes::{
Buf, BufMut,
Bytes, BytesMut,
};
use std::{
fmt, error,
sync::Arc,
pin::Pin,
};
#[cfg(feature="_loan")]
const _TODO_REWORK_THIS_INTERFACE_ITS_OVERCOMPLICATED_SHITE: () = {
type OwnedResult<S, T, E> = Result<T, (E, S)>;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum MaybeRef<'r, T>
{
Owned(T),
Borrowed(&'r T),
}
#[derive(Debug)]
struct Loan<'owner, O: Loaner>
{
owner: MaybeRef<'owner, O>,
item: O::Item<'owner>,
}
impl<'o, O> Loaned<'o, O> for Loan<'o, O>
where O: Loaner
{
fn try_release(self) -> Result<(), <O as Loaner>::Error> {
self.item.try_release()
}
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), <O as Loaner>::Error> {
}
}
pub trait Loaned<'owner, Owner: ?Sized + 'owner>: Sized
where Owner: Loaner
{
//fn owner(&self) -> Option<&Owner>;
fn try_release(self) -> Result<(), Owner::Error>;
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), Owner::Error>
where Self: 'owner; // XXX: What? 'owner or 'static??? I think it's `'owner` == `'static` but how tf do I write that?
}
pub trait Loaner {
type Item<'owner>: Loaned<'owner, Self>
where Self: 'owner;
type Error: error::Error + Send + 'static;
fn try_loan(&self) -> Result<Self::Item<'_>, Self::Error>;
fn try_loan_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, Self::Item<'static>, Self::Error>
where Self: 'static;
fn try_release(&self, item: Self::Item<'_>) -> OwnedResult<Self::Item<'_>, (), Self::Error>;
}
compile_error!("This `trait Loan` design isn't going to fucking work ofc...")
};
/// Thread pooling
#[cfg(feature="threads")]
pub mod thread_pool {
use super::*;
use std::{
mem::{
self,
ManuallyDrop,
},
thread::{
self,
Thread, ThreadId,
},
ops::Drop,
sync::{Arc, Weak,},
};
use parking_lot::{
Condvar,
Mutex,
};
use crossbeam::{
queue::ArrayQueue,
channel,
};
//TODO: Implement `ThreadPool<'scope = 'static>` like below, but instead use `crossbeam::channel` mpsc to send commands to unparked threads (i.e. loaned threads from the pool that have an active user handle.)
// The design will be far simpler, since the Arc/Weak, send+recv stuff will be handled fine by `channel` on both ends. The only thing we need dtors for is `force_push()`ing threads back into their pool's ringbuffer, and for force-unparking threads from a pool that are removed (same as loan, except sender should be dropped before unpark happens.)
#[cfg(feature="_thread-pool-no-mpsc")]
const _: () = {
#[derive(Debug)]
pub enum ThreadHandleKind<'scope, T>
{
Detached(Option<ThreadId>),
Attached(thread::JoinHandle<T>),
Scoped(thread::ScopedJoinHandle<'scope, T>),
}
impl<'scope, T> ThreadHandleKind<'scope, T>
{
/// Get a reference to the attached thread (if there is one.)
#[inline]
pub fn thread(&self) -> Option<&Thread>
{
Some(match self {
Self::Scoped(s) => s.thread(),
Self::Attached(s) => s.thread(),
_ => return None,
})
}
/// Is this handle attached to a thread?
///
/// If it is *attached*, this means the thread *should not* outlive the instance (unless the instance is `'static`.)
#[inline]
pub const fn is_attached(&self) -> bool
{
match self {
Self::Attached(_) |
Self::Scoped(_) => true,
Self::Detached(_) => false,
}
}
/// Is this handle no longer referring to *any* thread?
///
/// In a *poisoned* state, the handle is useless.
#[inline]
pub const fn is_poisoned(&self) -> bool
{
match self {
Self::Detached(None) => true,
_ => false,
}
}
/// Get the thread id of the handle, if there is a referred to thread.
#[inline]
pub fn id(&self) -> Option<ThreadId>
{
Some(match self {
Self::Scoped(x) => x.thread().id(),
Self::Attached(x) => x.thread().id(),
Self::Detached(Some(id)) => *id,
_ => return None
})
}
}
/// Represents a raw thread handle
#[derive(Debug)]
#[repr(transparent)]
pub struct ThreadHandle<'scope, T>(ThreadHandleKind<'scope, T>);
//TODO: forward `ThreadHandle<'s, T>` methods to `ThreadHandleKind<'s, T>`: `thread(), id(), is_attached()`. Add methods `is_running(), join(), try_join(),` etc...
impl<'s, T> Drop for ThreadHandle<'s, T>
{
#[inline]
fn drop(&mut self) {
match mem::replace(&mut self.0, ThreadHandleKind::Detached(None)) {
ThreadHandleKind::Attached(a) => drop(a.join()), // Attempt join, ignore panic.
ThreadHandleKind::Scoped(s) => drop(s.join().unwrap_or_resume()), // Attempt join, carry panic back through into scope.
_ => (),
}
}
}
#[derive(Debug)]
enum PooledSendContext<T: Send, F: ?Sized + Send>
{
/// An execute directive `Err(Some(func))` is taken by the thread and replaced by `Ok(func())`.
///
/// # Direction
/// Bidirectional.
/// (Handle -> Thread; Thread -> Handle)
Execute(Result<T, Option<Box<F>>>),
/// Pass a caught panic within an `Execute(Err(Some(F)))` payload back from the thread.
///
/// # Direction
/// Thread -> Handle
Panic(UnwindPayload),
/// When the loaned `PooledThread` is moved back into the pool, tell the thread to park itself again.
///
/// # Direction
/// Handle -> Thread
Park,
/// The default invariant used for when an operation has been taken by a thread.
///
/// # When `Taken`.
/// The replacement is always `Taken` after a `take(&mut self)` that contains a value.
/// The `Park` directive will not be replaced to `Taken`.
Taken,
//NOTE: `Close` variant not needed because `handle` will have a `Weak` reference to `callback_passage`; when the upgrade fails, the thread will exit.
}
impl<T: Send, F: ?Sized + Send> PooledSendContext<T, F>
{
pub fn take(&mut self) -> Option<Self>
{
Some(match self {
//XXX: I don't think not replacing cloneable variants is a good or useful default. (see below V)
//// Clone non-data-holding variants.
//Self::Park => Self::Park,
//// If the ctx has already been taken, return `None`.
//Self::Taken => return None,
// Move out of data-holding variants.
_ => mem::replace(self, Self::Taken)
})
}
}
/// A loanable thread that belongs to a pool.
///
/// # Pooling
/// When taken from a `ThreadPool`, the object is *moved* out of the thread-pool's ring-buffer and then pinned in return (See `PooledThread`.)
///
/// To return the thread to the pool, the `callback_passage` writes a `Park` command to the thread, unpins itself, and then on drop moves itself back into the start of the ring-buffer of the thread pool `owner`
#[derive(Debug)]
struct PooledThreadHandle<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
/// Passes `F` to `handle` and retrieves `T`.
callback_passage: Arc<(Condvar, Mutex<PooledSendContext<T, F>>)>,
/// The backing thread, which when started parks itself.
/// The `PooledThread` object will unpark the thread when the object is loaned (moved) out of the pool, and re-parked when enter
///
/// Will wait on `callback_passage?.0` for `Execute` directives, pass them back to the handle, and when a `Park` directive is recieved (i.e. when moved back into the pool) the thread will park itself and wait to be unparked when moved out of the pool or closed again.
///
/// If `callback_message` is dropped, the thread will exit when unparked (dropped from pool.)
//TODO: Ensure unpark on drop?
handle: ThreadHandle<'scope, Result<(), ()>>,
}
impl<'scope, T, F:?Sized> PooledThreadHandle<'scope, T, F>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
fn send_to_thread_raw(&self, message: PooledSendContext<T, F>) -> bool
{
let mut ctx = self.callback_passage.1.lock();
*ctx = message;
self.callback_passage.0.notify_one()
}
fn send_from_thread_raw(&self, message: PooledSendContext<T, F>)
{
let mut ctx = self.callback_passage.1.lock();
*ctx = message;
}
fn read_in_thread_raw(&self, wait_for: bool) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.lock(); // NOTE: Should we fair lock here?
// Wait on condvar if `wait_for` is true.
if wait_for {
self.callback_passage.0.wait(&mut ctx);
}
// Then replace the ctx with `Taken`.
ctx.take()
}
fn try_read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.try_lock()?;
ctx.take()
}
fn read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.lock();
ctx.take()
}
}
#[derive(Debug)]
pub struct PooledThread<'scope, 'pool: 'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
/// The owner to return `thread` to on drop.
owner: Option<&'pool ThreadPool<'scope, T, F>>,
/// The pinned thread handle that is returned on drop.
thread: ManuallyDrop<PooledThreadHandle<'scope, T, F>>, //XXX: Pin<> needed (or even useful at all) here?
}
impl<'scope, 'pool: 'scope, T, F:?Sized> Drop for PooledThread<'scope, 'pool, T, F>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
fn drop(&mut self) {
let Some(loaned_from) = self.owner else {
// There is no owner.
// Instead, unpark the thread after drop.
let PooledThreadHandle { callback_passage, handle } = unsafe { ManuallyDrop::take(&mut self.thread) };
// Drop the Arc, so when `thread` wakes from unpark, it will fail to acquire context.
drop(callback_passage);
if let Some(thread) = handle.0.thread() {
thread.unpark();
}
return todo!("How do we know *when* to unpark? How can we unpark *after* dropping the Arc?");
};
// There is an owner. Tell the thread to park itself.
let mut handle = unsafe { ManuallyDrop::take(&mut self.thread) };
handle.send_to_thread_raw(PooledSendContext::Park);
// Read the message object, take it from the thread after it notifies us it is about to park.
let ctx = {
let mut ctx = handle.callback_passage.1.lock();
// Before thread parks, it should signal condvar, and callback_passage should not be unique again yet.
handle.callback_passage.0.wait_while(&mut ctx, |_| Arc::strong_count(&handle.callback_passage) > 1);
ctx.take()
};
// Force insert back into thred pool.
if let Some( PooledThreadHandle { callback_passage, handle }) = loaned_from.pool_queue.force_push(handle) {
// If one is removed, drop the context..
drop(callback_passage);
// Then force-unpark the thread.
match handle.0.thread() {
Some(thread) => thread.unpark(),
None => (),
}
}
todo!("Should we force `join()` here? Or allow non-scoped handles to be forgotten?")
}
}
#[derive(Debug)]
pub struct ThreadPool<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
pool_queue: ArrayQueue<PooledThreadHandle<'scope, T, F>>,
}
};
//TODO: Build a ThreadPool<'scope = 'static> using crossbeam::ArrayQueue<...> (XXX: See phone for details on how pool should use un/parking for loaning; and CondVar for passing tasks.) Also, add a `Loan` trait with assoc type `Item<'owner>` and methods `try_loan(&self) -> Result<Self::Item<'_>, ...>`, `return(&self, item: Self::Item<'_>) -> Result<(), ...>` and `try_loan_owned(self: Arc<Self>) -> `MovedResult<Arc<Self>, OwnedLoan<'static, Self>, ...>` (struct OwnedLoad<'scope, L>{ item: Option<L::Item<'scope>> } )`, etc.
}

@ -1,6 +1,7 @@
#![cfg_attr(feature="unstable", feature(never_type))] // See `Never`.
#[macro_use] mod ext; use ext::*;
mod buffer;
mod part;
//#[inline]

Loading…
Cancel
Save