sync: Started sync `oneshot` channel impl

Fortune for reverse's current commit: Small blessing − 小吉
refactor
Avril 8 months ago
parent bcaa9e0703
commit bea2631fb7
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -95,6 +95,8 @@ pub mod thread_pool {
channel, channel,
}; };
//TODO: Imlement a oneshot channel: `oneshot::{Sender, Receiver`; works like Tokio's `sync::oneshot`, but is sync, not async: Can use `{Sender,Receiver}slot: {Arc, Weak}<Mutex<MaybeUninit<T>>>, `waiter: {Weak/_PinnedPtrIntoArc<Pin<slot>>(avoids 2nd alloc)/Weak,Arc<_, A = SenderSharedBumpAllocator*>,Arc}<Condvar>` (NOTE: Bump allocator crate name `bumpalo`.)
//TODO: Implement `ThreadPool<'scope = 'static>` like below, but instead use `crossbeam::channel` mpsc to send commands to unparked threads (i.e. loaned threads from the pool that have an active user handle.) //TODO: Implement `ThreadPool<'scope = 'static>` like below, but instead use `crossbeam::channel` mpsc to send commands to unparked threads (i.e. loaned threads from the pool that have an active user handle.)
// The design will be far simpler, since the Arc/Weak, send+recv stuff will be handled fine by `channel` on both ends. The only thing we need dtors for is `force_push()`ing threads back into their pool's ringbuffer, and for force-unparking threads from a pool that are removed (same as loan, except sender should be dropped before unpark happens.) // The design will be far simpler, since the Arc/Weak, send+recv stuff will be handled fine by `channel` on both ends. The only thing we need dtors for is `force_push()`ing threads back into their pool's ringbuffer, and for force-unparking threads from a pool that are removed (same as loan, except sender should be dropped before unpark happens.)

@ -279,3 +279,26 @@ impl<T, E: Into<UnwindPayload>> UnwrapPanicResumeExt<T> for Result<T, E>
} }
} }
} }
pub trait ArcExt
{
/// If the strong-count == 1 and the weak-count is 0.
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool;
/// If the strong-count == 1.
///
/// If there are alive `Weak`s pointing to this object, those are not considered.
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool;
}
impl<T: ?Sized> ArcExt for T
{
#[inline]
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool {
std::sync::Arc::strong_count(&self) == 1 &&
std::sync::Arc::weak_count(&self) == 0
}
#[inline]
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool {
std::sync::Arc::strong_count(&self) == 1
}
}

@ -1,6 +1,10 @@
#![cfg_attr(feature="unstable", feature(never_type))] // See `Never`. #![cfg_attr(feature="unstable", feature(never_type))] // See `Never`.
#[macro_use] mod ext; use ext::*; #[macro_use] mod ext; use ext::*;
#[cfg(any(feature="threads-async", feature="threads"))]
mod sync;
mod buffer; mod buffer;
mod part; mod part;

@ -0,0 +1,184 @@
//! Sync (inter-thread communication) helpers
use super::*;
use std::{
sync::{
Arc, Weak,
},
mem::{
self,
MaybeUninit,
},
ptr,
fmt, error,
};
use parking_lot::{
Condvar,
Mutex,
};
/// Send a single value across thread boundaries to a receiver.
///
/// This is a sync implementation of `tokio::sync::oneshot`.
pub mod oneshot {
use super::*;
/// Error when sending
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SendError
{
/// The corresponding `Receiver<T>` channel has been dropped.
///
/// # Note
/// This operation **cannot** be re-tried
Closed,
//TODO: Should we (or *can* we even?) have a `send_wait()` method?
}
/// Error when receiving
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum RecvError
{
/// The corresponding `Sender<T>` channel has been dropped.
///
/// # Note
/// This operation **cannot** be re-tried
Closed,
/// The `recv()` call timed out before a value was sent.
///
/// This operation can be re-tried
Timeout,
/// The `recv()` call was cancelled by a `StopToken` before a value was sent.
///
/// This operation can be re-tried
//TODO: Maybe merge this and `Timeout`?
Cancelled,
}
impl fmt::Display for SendError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
todo!("`Sender<T>` Error message")
}
}
impl fmt::Display for RecvError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
todo!("`Receiver<T>` Error message")
}
}
impl error::Error for SendError{}
impl error::Error for RecvError{}
impl error::Error for Error
{
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self {
Self::Recv(r) => r,
Self::Send(s) => s,
})
}
}
impl fmt::Display for Error
{
#[inline(always)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Recv(_) => f.write_str("recv error"),
Self::Send(_) => f.write_str("send error"),
}
}
}
/// An error using the `oneshot` channel.
#[derive(Debug)]
pub enum Error
{
/// An error regarding the sending of a value.
Send(SendError),
/// An error regarding the receiving of a value.
Recv(RecvError),
}
impl From<SendError> for Error
{
#[inline]
fn from(from: SendError) -> Self
{
Self::Send(from)
}
}
impl From<RecvError> for Error
{
#[inline]
fn from(from: RecvError) -> Self
{
Self::Recv(from)
}
}
//TODO: impl fmt::Display error::Error for Try*Error[<T>]...
//TODO: XXX: We might also want explicit `Debug` impls for all `Try*Error<T>`s, since `T` is irrelevent to the `Error` part.
/// Error when attempting to send a value using a `try_` function.
///
/// The `Sender<T>` object that originated this is stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TrySendError<T>(SendError, Sender<T>);
/// Error when attempting to receive a value using a `try_` function.
///
/// The `Receiver<T>` object that originated this is stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TryRecvError<T>(RecvError, Receiver<T>);
/// An error when attempting a oneshot function using a consuming `try_` function.
///
/// The `Sender<T>`/`Receiver<T>` object(s) that originated this error are stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TryError<T>(Error, (Option<Sender<T>>, Option<Receiver<T>>));
//TODO: Make a `feature=unstable` version that is allocator-aware *and* re-uses the same allocation for both `Arc` creations (e.g. C++ polymorphic allocator; `bumpalo` or another simple implementation `Sync` bump-allocator would suffice.)
/// Oneshot sender.
///
/// Sends one value of `T` to a corresponding `Receiver`, if it is still alive.
#[derive(Debug)]
pub struct Sender<T>
{
/// The value to write (`Sender`) to / read (`Receiver`) from.
///
/// Write is bound to `send.notify_one()`, read is bound to `send.wait()`.
/// # Ownership
///
/// Note that `Receiver` has a `Weak` variant to hold this. It will fail to read if it cannot upgrade.
/// If this slot is unique, then `send`s should fast-fail as there is no corresponding `Receiver` anyway.
value: Weak<Mutex<MaybeUninit<T>>>,
/// Sends a signal to the receiver to read from `value`.
///
/// # Ownership
/// If this weak-ptr cannot be upgraded, then the `Receiver` ascosiated with this instance *cannot* be waiting on it, and therefore sending should fast-fail
//XXX: Is this order of Sender: `Arc, Weak`, Receiver: `Weak, Arc` correct? Check and think about it before proceeding pls...
//NOTE: It **is correct** to be this order.
send: Arc<Condvar>,
}
/// Oneshot receiver.
///
/// Receive one value of `T` from a corresponding `Sender`, if it is still alive.
#[derive(Debug)]
pub struct Receiver<T> {
value: Arc<Mutex<MaybeUninit<T>>>,
send: Weak<Condvar>,
}
}
Loading…
Cancel
Save