diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 0000000..eb8ee44 --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,349 @@ +//! Buffering and resource pooling utilities +//! +//! +use super::*; + +pub use bytes::{ + Buf, BufMut, + Bytes, BytesMut, +}; +use std::{ + fmt, error, + sync::Arc, + pin::Pin, +}; + + +#[cfg(feature="_loan")] +const _TODO_REWORK_THIS_INTERFACE_ITS_OVERCOMPLICATED_SHITE: () = { + type OwnedResult = Result; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + enum MaybeRef<'r, T> + { + Owned(T), + Borrowed(&'r T), + } + + #[derive(Debug)] + struct Loan<'owner, O: Loaner> + { + owner: MaybeRef<'owner, O>, + item: O::Item<'owner>, + } + + impl<'o, O> Loaned<'o, O> for Loan<'o, O> + where O: Loaner + { + fn try_release(self) -> Result<(), ::Error> { + self.item.try_release() + } + fn try_release_owned(self: Arc) -> OwnedResult, (), ::Error> { + + } + } + + pub trait Loaned<'owner, Owner: ?Sized + 'owner>: Sized + where Owner: Loaner + { + //fn owner(&self) -> Option<&Owner>; + + fn try_release(self) -> Result<(), Owner::Error>; + fn try_release_owned(self: Arc) -> OwnedResult, (), Owner::Error> + where Self: 'owner; // XXX: What? 'owner or 'static??? I think it's `'owner` == `'static` but how tf do I write that? + } + + pub trait Loaner { + type Item<'owner>: Loaned<'owner, Self> + where Self: 'owner; + type Error: error::Error + Send + 'static; + + fn try_loan(&self) -> Result, Self::Error>; + fn try_loan_owned(self: Arc) -> OwnedResult, Self::Item<'static>, Self::Error> + where Self: 'static; + + fn try_release(&self, item: Self::Item<'_>) -> OwnedResult, (), Self::Error>; + } + + compile_error!("This `trait Loan` design isn't going to fucking work ofc...") +}; + +/// Thread pooling +#[cfg(feature="threads")] +pub mod thread_pool { + use super::*; + + use std::{ + mem::{ + self, + ManuallyDrop, + }, + thread::{ + self, + Thread, ThreadId, + }, + ops::Drop, + sync::{Arc, Weak,}, + + }; + use parking_lot::{ + Condvar, + Mutex, + }; + use crossbeam::{ + queue::ArrayQueue, + }; + + #[derive(Debug)] + pub enum ThreadHandleKind<'scope, T> + { + Detached(Option), + Attached(thread::JoinHandle), + Scoped(thread::ScopedJoinHandle<'scope, T>), + } + + impl<'scope, T> ThreadHandleKind<'scope, T> + { + /// Get a reference to the attached thread (if there is one.) + #[inline] + pub fn thread(&self) -> Option<&Thread> + { + Some(match self { + Self::Scoped(s) => s.thread(), + Self::Attached(s) => s.thread(), + _ => return None, + }) + } + + /// Is this handle attached to a thread? + /// + /// If it is *attached*, this means the thread *should not* outlive the instance (unless the instance is `'static`.) + #[inline] + pub const fn is_attached(&self) -> bool + { + match self { + Self::Attached(_) | + Self::Scoped(_) => true, + Self::Detached(_) => false, + } + } + + /// Is this handle no longer referring to *any* thread? + /// + /// In a *poisoned* state, the handle is useless. + #[inline] + pub const fn is_poisoned(&self) -> bool + { + match self { + Self::Detached(None) => true, + _ => false, + } + } + + /// Get the thread id of the handle, if there is a referred to thread. + #[inline] + pub fn id(&self) -> Option + { + Some(match self { + Self::Scoped(x) => x.thread().id(), + Self::Attached(x) => x.thread().id(), + Self::Detached(Some(id)) => *id, + _ => return None + }) + } + } + + /// Represents a raw thread handle + #[derive(Debug)] + #[repr(transparent)] + pub struct ThreadHandle<'scope, T>(ThreadHandleKind<'scope, T>); + + //TODO: forward `ThreadHandle<'s, T>` methods to `ThreadHandleKind<'s, T>`: `thread(), id(), is_attached()`. Add methods `is_running(), join(), try_join(),` etc... + + impl<'s, T> Drop for ThreadHandle<'s, T> + { + #[inline] + fn drop(&mut self) { + match mem::replace(&mut self.0, ThreadHandleKind::Detached(None)) { + ThreadHandleKind::Attached(a) => drop(a.join()), // Attempt join, ignore panic. + ThreadHandleKind::Scoped(s) => drop(s.join().unwrap_or_resume()), // Attempt join, carry panic back through into scope. + _ => (), + } + } + } + + #[derive(Debug)] + enum PooledSendContext + { + /// An execute directive `Err(Some(func))` is taken by the thread and replaced by `Ok(func())`. + /// + /// # Direction + /// Bidirectional. + /// (Handle -> Thread; Thread -> Handle) + Execute(Result>>), + /// Pass a caught panic within an `Execute(Err(Some(F)))` payload back from the thread. + /// + /// # Direction + /// Thread -> Handle + Panic(UnwindPayload), + /// When the loaned `PooledThread` is moved back into the pool, tell the thread to park itself again. + /// + /// # Direction + /// Handle -> Thread + Park, + + /// The default invariant used for when an operation has been taken by a thread. + /// + /// # When `Taken`. + /// The replacement is always `Taken` after a `take(&mut self)` that contains a value. + /// The `Park` directive will not be replaced to `Taken`. + Taken, + + //NOTE: `Close` variant not needed because `handle` will have a `Weak` reference to `callback_passage`; when the upgrade fails, the thread will exit. + } + + impl PooledSendContext + { + pub fn take(&mut self) -> Option + { + Some(match self { + //XXX: I don't think not replacing cloneable variants is a good or useful default. (see below V) + //// Clone non-data-holding variants. + //Self::Park => Self::Park, + //// If the ctx has already been taken, return `None`. + //Self::Taken => return None, + // Move out of data-holding variants. + _ => mem::replace(self, Self::Taken) + }) + } + } + + /// A loanable thread that belongs to a pool. + /// + /// # Pooling + /// When taken from a `ThreadPool`, the object is *moved* out of the thread-pool's ring-buffer and then pinned in return (See `PooledThread`.) + /// + /// To return the thread to the pool, the `callback_passage` writes a `Park` command to the thread, unpins itself, and then on drop moves itself back into the start of the ring-buffer of the thread pool `owner` + #[derive(Debug)] + struct PooledThreadHandle<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope> + where F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope + { + + /// Passes `F` to `handle` and retrieves `T`. + callback_passage: Arc<(Condvar, Mutex>)>, + /// The backing thread, which when started parks itself. + /// The `PooledThread` object will unpark the thread when the object is loaned (moved) out of the pool, and re-parked when enter + /// + /// Will wait on `callback_passage?.0` for `Execute` directives, pass them back to the handle, and when a `Park` directive is recieved (i.e. when moved back into the pool) the thread will park itself and wait to be unparked when moved out of the pool or closed again. + /// + /// If `callback_message` is dropped, the thread will exit when unparked (dropped from pool.) + //TODO: Ensure unpark on drop? + handle: ThreadHandle<'scope, Result<(), ()>>, + } + + impl<'scope, T, F:?Sized> PooledThreadHandle<'scope, T, F> + where F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope + { + fn send_to_thread_raw(&self, message: PooledSendContext) -> bool + { + let mut ctx = self.callback_passage.1.lock(); + *ctx = message; + self.callback_passage.0.notify_one() + } + fn send_from_thread_raw(&self, message: PooledSendContext) + { + let mut ctx = self.callback_passage.1.lock(); + *ctx = message; + } + + fn read_in_thread_raw(&self, wait_for: bool) -> Option> + { + let mut ctx = self.callback_passage.1.lock(); // NOTE: Should we fair lock here? + // Wait on condvar if `wait_for` is true. + if wait_for { + self.callback_passage.0.wait(&mut ctx); + } + // Then replace the ctx with `Taken`. + ctx.take() + } + fn try_read_from_thread_raw(&self) -> Option> + { + let mut ctx = self.callback_passage.1.try_lock()?; + ctx.take() + } + fn read_from_thread_raw(&self) -> Option> + { + let mut ctx = self.callback_passage.1.lock(); + ctx.take() + } + } + + #[derive(Debug)] + pub struct PooledThread<'scope, 'pool: 'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope> + where F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope + { + /// The owner to return `thread` to on drop. + owner: Option<&'pool ThreadPool<'scope, T, F>>, + /// The pinned thread handle that is returned on drop. + thread: ManuallyDrop>, //XXX: Pin<> needed (or even useful at all) here? + } + + impl<'scope, 'pool: 'scope, T, F:?Sized> Drop for PooledThread<'scope, 'pool, T, F> + where F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope + { + fn drop(&mut self) { + let Some(loaned_from) = self.owner else { + // There is no owner. + // Instead, unpark the thread after drop. + let PooledThreadHandle { callback_passage, handle } = unsafe { ManuallyDrop::take(&mut self.thread) }; + + // Drop the Arc, so when `thread` wakes from unpark, it will fail to acquire context. + drop(callback_passage); + + if let Some(thread) = handle.0.thread() { + thread.unpark(); + } + + return todo!("How do we know *when* to unpark? How can we unpark *after* dropping the Arc?"); + }; + + // There is an owner. Tell the thread to park itself. + let mut handle = unsafe { ManuallyDrop::take(&mut self.thread) }; + handle.send_to_thread_raw(PooledSendContext::Park); + + // Read the message object, take it from the thread after it notifies us it is about to park. + let ctx = { + let mut ctx = handle.callback_passage.1.lock(); + // Before thread parks, it should signal condvar, and callback_passage should not be unique again yet. + handle.callback_passage.0.wait_while(&mut ctx, |_| Arc::strong_count(&handle.callback_passage) > 1); + ctx.take() + }; + + // Force insert back into thred pool. + if let Some( PooledThreadHandle { callback_passage, handle }) = loaned_from.pool_queue.force_push(handle) { + // If one is removed, drop the context.. + drop(callback_passage); + // Then force-unpark the thread. + match handle.0.thread() { + Some(thread) => thread.unpark(), + None => (), + } + } + + todo!("Should we force `join()` here? Or allow non-scoped handles to be forgotten?") + } + } + #[derive(Debug)] + pub struct ThreadPool<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope> + where F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope + { + pool_queue: ArrayQueue>, + } + + //TODO: Build a ThreadPool<'scope = 'static> using crossbeam::ArrayQueue<...> (XXX: See phone for details on how pool should use un/parking for loaning; and CondVar for passing tasks.) Also, add a `Loan` trait with assoc type `Item<'owner>` and methods `try_loan(&self) -> Result, ...>`, `return(&self, item: Self::Item<'_>) -> Result<(), ...>` and `try_loan_owned(self: Arc) -> `MovedResult, OwnedLoan<'static, Self>, ...>` (struct OwnedLoad<'scope, L>{ item: Option> } )`, etc. +} diff --git a/src/main.rs b/src/main.rs index 62b54d8..6e9480c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![cfg_attr(feature="unstable", feature(never_type))] // See `Never`. #[macro_use] mod ext; use ext::*; +mod buffer; mod part; //#[inline]