Compare commits
2 Commits
bcaa9e0703
...
491867fbbb
Author | SHA1 | Date |
---|---|---|
Avril | 491867fbbb | 8 months ago |
Avril | bea2631fb7 | 8 months ago |
@ -0,0 +1,236 @@
|
||||
//! Sync (inter-thread communication) helpers
|
||||
use super::*;
|
||||
use std::{
|
||||
sync::{
|
||||
Arc, Weak,
|
||||
},
|
||||
mem::{
|
||||
self,
|
||||
MaybeUninit,
|
||||
},
|
||||
ptr,
|
||||
fmt, error,
|
||||
};
|
||||
use parking_lot::{
|
||||
Condvar,
|
||||
Mutex,
|
||||
};
|
||||
|
||||
/// Send a single value across thread boundaries to a receiver.
|
||||
///
|
||||
/// This is a sync implementation of `tokio::sync::oneshot`.
|
||||
pub mod oneshot {
|
||||
use super::*;
|
||||
|
||||
/// Error when sending
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum SendError
|
||||
{
|
||||
/// The corresponding `Receiver<T>` channel has been dropped.
|
||||
///
|
||||
/// # Note
|
||||
/// This operation **cannot** be re-tried
|
||||
Closed,
|
||||
//TODO: Should we (or *can* we even?) have a `send_wait()` method?
|
||||
}
|
||||
|
||||
/// Error when receiving
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum RecvError
|
||||
{
|
||||
/// The corresponding `Sender<T>` channel has been dropped.
|
||||
///
|
||||
/// # Note
|
||||
/// This operation **cannot** be re-tried
|
||||
Closed,
|
||||
|
||||
/// The `recv()` call timed out before a value was sent.
|
||||
///
|
||||
/// This operation can be re-tried
|
||||
Timeout,
|
||||
|
||||
/// The `recv()` call was cancelled by a `StopToken` before a value was sent.
|
||||
///
|
||||
/// This operation can be re-tried
|
||||
//TODO: Maybe merge this and `Timeout`?
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
//TODO: Add impl Send/RecvError: `fn can_retry(&self) -> bool`
|
||||
|
||||
impl fmt::Display for SendError
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("send error")
|
||||
// TODO: Should we `match self` for detailed error messages here?
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RecvError
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("recv error")
|
||||
// TODO: Should we `match self` for detailed error messages here?
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for SendError{}
|
||||
impl error::Error for RecvError{}
|
||||
|
||||
impl error::Error for Error
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(match &self {
|
||||
Self::Recv(r) => r,
|
||||
Self::Send(s) => s,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error
|
||||
{
|
||||
#[inline(always)]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
f.write_str("oneshot error")
|
||||
}
|
||||
}
|
||||
|
||||
/// An error using the `oneshot` channel.
|
||||
#[derive(Debug)]
|
||||
pub enum Error
|
||||
{
|
||||
/// An error regarding the sending of a value.
|
||||
Send(SendError),
|
||||
/// An error regarding the receiving of a value.
|
||||
Recv(RecvError),
|
||||
}
|
||||
|
||||
impl From<SendError> for Error
|
||||
{
|
||||
#[inline]
|
||||
fn from(from: SendError) -> Self
|
||||
{
|
||||
Self::Send(from)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RecvError> for Error
|
||||
{
|
||||
#[inline]
|
||||
fn from(from: RecvError) -> Self
|
||||
{
|
||||
Self::Recv(from)
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: impl fmt::Display error::Error for Try*Error[<T>]...
|
||||
impl<T> fmt::Display for TrySendError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "send error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> fmt::Display for TryRecvError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "recv error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TryRecvError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TrySendError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
impl<T> fmt::Display for TryError<T>
|
||||
{
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
||||
{
|
||||
write!(f, "oneshot error (T = {})", std::any::type_name::<T>())
|
||||
}
|
||||
}
|
||||
impl<T> error::Error for TryError<T>
|
||||
where T: fmt::Debug
|
||||
{
|
||||
#[inline]
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: XXX: We might also want explicit `Debug` impls for all `Try*Error<T>`s, since `T` is irrelevent to the `Error` part.
|
||||
|
||||
/// Error when attempting to send a value using a `try_` function.
|
||||
///
|
||||
/// The `Sender<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TrySendError<T>(SendError, Sender<T>);
|
||||
|
||||
/// Error when attempting to receive a value using a `try_` function.
|
||||
///
|
||||
/// The `Receiver<T>` object that originated this is stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TryRecvError<T>(RecvError, Receiver<T>);
|
||||
|
||||
/// An error when attempting a oneshot function using a consuming `try_` function.
|
||||
///
|
||||
/// The `Sender<T>`/`Receiver<T>` object(s) that originated this error are stored in this object for a re-try of the operation.
|
||||
#[derive(Debug)]
|
||||
pub struct TryError<T>(Error, (Option<Sender<T>>, Option<Receiver<T>>));
|
||||
|
||||
//TODO: Make a `feature=unstable` version that is allocator-aware *and* re-uses the same allocation for both `Arc` creations (e.g. C++ polymorphic allocator; `bumpalo` or another simple implementation `Sync` bump-allocator would suffice.)
|
||||
/// Oneshot sender.
|
||||
///
|
||||
/// Sends one value of `T` to a corresponding `Receiver`, if it is still alive.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<T>
|
||||
{
|
||||
/// The value to write (`Sender`) to / read (`Receiver`) from.
|
||||
///
|
||||
/// Write is bound to `send.notify_one()`, read is bound to `send.wait()`.
|
||||
/// # Ownership
|
||||
///
|
||||
/// Note that `Receiver` has a `Weak` variant to hold this. It will fail to read if it cannot upgrade.
|
||||
/// If this slot is unique, then `send`s should fast-fail as there is no corresponding `Receiver` anyway.
|
||||
value: Weak<Mutex<MaybeUninit<T>>>,
|
||||
/// Sends a signal to the receiver to read from `value`.
|
||||
///
|
||||
/// # Ownership
|
||||
/// If this weak-ptr cannot be upgraded, then the `Receiver` ascosiated with this instance *cannot* be waiting on it, and therefore sending should fast-fail
|
||||
//XXX: Is this order of Sender: `Arc, Weak`, Receiver: `Weak, Arc` correct? Check and think about it before proceeding pls...
|
||||
//NOTE: It **is correct** to be this order.
|
||||
send: Arc<Condvar>,
|
||||
}
|
||||
|
||||
/// Oneshot receiver.
|
||||
///
|
||||
/// Receive one value of `T` from a corresponding `Sender`, if it is still alive.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver<T> {
|
||||
value: Arc<Mutex<MaybeUninit<T>>>,
|
||||
send: Weak<Condvar>,
|
||||
}
|
||||
}
|
Loading…
Reference in new issue