Compare commits
2 Commits
bcaa9e0703
...
491867fbbb
Author | SHA1 | Date |
---|---|---|
Avril | 491867fbbb | 8 months ago |
Avril | bea2631fb7 | 8 months ago |
@ -0,0 +1,236 @@
|
|||||||
|
//! Sync (inter-thread communication) helpers
|
||||||
|
use super::*;
|
||||||
|
use std::{
|
||||||
|
sync::{
|
||||||
|
Arc, Weak,
|
||||||
|
},
|
||||||
|
mem::{
|
||||||
|
self,
|
||||||
|
MaybeUninit,
|
||||||
|
},
|
||||||
|
ptr,
|
||||||
|
fmt, error,
|
||||||
|
};
|
||||||
|
use parking_lot::{
|
||||||
|
Condvar,
|
||||||
|
Mutex,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Send a single value across thread boundaries to a receiver.
|
||||||
|
///
|
||||||
|
/// This is a sync implementation of `tokio::sync::oneshot`.
|
||||||
|
pub mod oneshot {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Error when sending
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum SendError
|
||||||
|
{
|
||||||
|
/// The corresponding `Receiver<T>` channel has been dropped.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// This operation **cannot** be re-tried
|
||||||
|
Closed,
|
||||||
|
//TODO: Should we (or *can* we even?) have a `send_wait()` method?
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error when receiving
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum RecvError
|
||||||
|
{
|
||||||
|
/// The corresponding `Sender<T>` channel has been dropped.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// This operation **cannot** be re-tried
|
||||||
|
Closed,
|
||||||
|
|
||||||
|
/// The `recv()` call timed out before a value was sent.
|
||||||
|
///
|
||||||
|
/// This operation can be re-tried
|
||||||
|
Timeout,
|
||||||
|
|
||||||
|
/// The `recv()` call was cancelled by a `StopToken` before a value was sent.
|
||||||
|
///
|
||||||
|
/// This operation can be re-tried
|
||||||
|
//TODO: Maybe merge this and `Timeout`?
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: Add impl Send/RecvError: `fn can_retry(&self) -> bool`
|
||||||
|
|
||||||
|
impl fmt::Display for SendError
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
f.write_str("send error")
|
||||||
|
// TODO: Should we `match self` for detailed error messages here?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for RecvError
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
f.write_str("recv error")
|
||||||
|
// TODO: Should we `match self` for detailed error messages here?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for SendError{}
|
||||||
|
impl error::Error for RecvError{}
|
||||||
|
|
||||||
|
impl error::Error for Error
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
Some(match &self {
|
||||||
|
Self::Recv(r) => r,
|
||||||
|
Self::Send(s) => s,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Error
|
||||||
|
{
|
||||||
|
#[inline(always)]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
f.write_str("oneshot error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An error using the `oneshot` channel.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Error
|
||||||
|
{
|
||||||
|
/// An error regarding the sending of a value.
|
||||||
|
Send(SendError),
|
||||||
|
/// An error regarding the receiving of a value.
|
||||||
|
Recv(RecvError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SendError> for Error
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn from(from: SendError) -> Self
|
||||||
|
{
|
||||||
|
Self::Send(from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<RecvError> for Error
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn from(from: RecvError) -> Self
|
||||||
|
{
|
||||||
|
Self::Recv(from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: impl fmt::Display error::Error for Try*Error[<T>]...
|
||||||
|
impl<T> fmt::Display for TrySendError<T>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "send error (T = {})", std::any::type_name::<T>())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> fmt::Display for TryRecvError<T>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "recv error (T = {})", std::any::type_name::<T>())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> error::Error for TryRecvError<T>
|
||||||
|
where T: fmt::Debug
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
Some(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> error::Error for TrySendError<T>
|
||||||
|
where T: fmt::Debug
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
Some(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> fmt::Display for TryError<T>
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||||
|
{
|
||||||
|
write!(f, "oneshot error (T = {})", std::any::type_name::<T>())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> error::Error for TryError<T>
|
||||||
|
where T: fmt::Debug
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
Some(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: XXX: We might also want explicit `Debug` impls for all `Try*Error<T>`s, since `T` is irrelevent to the `Error` part.
|
||||||
|
|
||||||
|
/// Error when attempting to send a value using a `try_` function.
|
||||||
|
///
|
||||||
|
/// The `Sender<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TrySendError<T>(SendError, Sender<T>);
|
||||||
|
|
||||||
|
/// Error when attempting to receive a value using a `try_` function.
|
||||||
|
///
|
||||||
|
/// The `Receiver<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TryRecvError<T>(RecvError, Receiver<T>);
|
||||||
|
|
||||||
|
/// An error when attempting a oneshot function using a consuming `try_` function.
|
||||||
|
///
|
||||||
|
/// The `Sender<T>`/`Receiver<T>` object(s) that originated this error are stored in this object for a re-try of the operation.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TryError<T>(Error, (Option<Sender<T>>, Option<Receiver<T>>));
|
||||||
|
|
||||||
|
//TODO: Make a `feature=unstable` version that is allocator-aware *and* re-uses the same allocation for both `Arc` creations (e.g. C++ polymorphic allocator; `bumpalo` or another simple implementation `Sync` bump-allocator would suffice.)
|
||||||
|
/// Oneshot sender.
|
||||||
|
///
|
||||||
|
/// Sends one value of `T` to a corresponding `Receiver`, if it is still alive.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Sender<T>
|
||||||
|
{
|
||||||
|
/// The value to write (`Sender`) to / read (`Receiver`) from.
|
||||||
|
///
|
||||||
|
/// Write is bound to `send.notify_one()`, read is bound to `send.wait()`.
|
||||||
|
/// # Ownership
|
||||||
|
///
|
||||||
|
/// Note that `Receiver` has a `Weak` variant to hold this. It will fail to read if it cannot upgrade.
|
||||||
|
/// If this slot is unique, then `send`s should fast-fail as there is no corresponding `Receiver` anyway.
|
||||||
|
value: Weak<Mutex<MaybeUninit<T>>>,
|
||||||
|
/// Sends a signal to the receiver to read from `value`.
|
||||||
|
///
|
||||||
|
/// # Ownership
|
||||||
|
/// If this weak-ptr cannot be upgraded, then the `Receiver` ascosiated with this instance *cannot* be waiting on it, and therefore sending should fast-fail
|
||||||
|
//XXX: Is this order of Sender: `Arc, Weak`, Receiver: `Weak, Arc` correct? Check and think about it before proceeding pls...
|
||||||
|
//NOTE: It **is correct** to be this order.
|
||||||
|
send: Arc<Condvar>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Oneshot receiver.
|
||||||
|
///
|
||||||
|
/// Receive one value of `T` from a corresponding `Sender`, if it is still alive.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Receiver<T> {
|
||||||
|
value: Arc<Mutex<MaybeUninit<T>>>,
|
||||||
|
send: Weak<Condvar>,
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in new issue