You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
268 lines
6.2 KiB
268 lines
6.2 KiB
//! `Channel<T>`: Move objects between parent and child processes.
|
|
use super::*;
|
|
use bytes::{
|
|
Buf,
|
|
BufMut,
|
|
Bytes,
|
|
};
|
|
use std::marker::PhantomData;
|
|
use std::{fmt, error};
|
|
use std::io::{
|
|
self,
|
|
Read,
|
|
Write,
|
|
Cursor,
|
|
};
|
|
use serde::{
|
|
Serialize,
|
|
de::DeserializeOwned,
|
|
};
|
|
|
|
use sys::pipe;
|
|
|
|
/// A value that can be sent through a serial `Channel`
|
|
///
|
|
/// Bitwise channels can take `ChannelBitwiseValue` objects.
|
|
pub trait ChannelSerialValue: Serialize + DeserializeOwned{}
|
|
|
|
impl<T: ?Sized> ChannelSerialValue for T
|
|
where T: Serialize + DeserializeOwned{}
|
|
|
|
/// A value that can be sent through a bitwise `Channel`
|
|
///
|
|
/// Serial channels require `ChannelSerialValue`.
|
|
pub trait ChannelBitwiseValue: Send + Unpin{}
|
|
|
|
impl<T: ?Sized> ChannelBitwiseValue for T
|
|
where T: Send + Unpin{}
|
|
|
|
//TODO: We can create a non-stream version of this using a memory mapped ring-buffer on an fd shared between parent and child. (i.e: map two pages, the first and second one pointing to the start of the file). But that might be unneeded or too much work for this project's scope.
|
|
|
|
/// Controls how objects are sent through the streams.
|
|
pub trait TransferMethod
|
|
{
|
|
/// The returned buffer that is build from `send`.
|
|
///
|
|
/// `recv` takes any `impl Buf` object.
|
|
type Buffer: Buf;
|
|
/// An optional internal state object to hold any additional data needed.
|
|
/// This can be set to `()`.
|
|
type State;
|
|
|
|
/// Error for the send/recv conversion.
|
|
/// Must be able to convert into `channel::Error`.
|
|
type Error: Into<Error>;
|
|
|
|
/// Serialise the object `T` to a byte buffer for sending.
|
|
fn send<T>(state: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error>;
|
|
/// Read the object `T` from a byte buffer that was read from the pipe.
|
|
///
|
|
/// The buffer's size is handled by the channel.
|
|
fn recv<T, B: Buf>(state: &mut Self::State, buffer: B) -> Result<T, Error>;
|
|
}
|
|
|
|
/// Serialize objects deeply to send them.
|
|
#[derive(Debug)]
|
|
pub struct TransferSerial;
|
|
|
|
impl TransferMethod for TransferSerial
|
|
{
|
|
type Buffer = Cursor<Vec<u8>>;
|
|
type State = ();
|
|
type Error = Error;
|
|
|
|
fn send<T>(_: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error> {
|
|
todo!()
|
|
}
|
|
fn recv<T, B: Buf>(_: &mut Self::State, buffer: B) -> Result<T, Error> {
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
/// Send objects through the stream as bitwise copied (unsafe.)
|
|
#[derive(Debug)]
|
|
pub struct TransferBitwise;
|
|
|
|
impl TransferMethod for TransferBitwise
|
|
{
|
|
type Buffer = Bytes;
|
|
type State = ();
|
|
type Error = std::convert::Infallible;
|
|
|
|
fn send<T>(_: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error> {
|
|
let bytes = unsafe {
|
|
std::slice::from_raw_parts(obj as *const T as *const _, std::mem::size_of::<T>())
|
|
};
|
|
Ok(bytes.into())
|
|
}
|
|
fn recv<T, B: Buf>(_: &mut Self::State, mut buffer: B) -> Result<T, Error> {
|
|
use std::mem::{
|
|
self,
|
|
MaybeUninit,
|
|
};
|
|
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
|
|
unsafe {
|
|
let slice = std::slice::from_raw_parts_mut(&mut value as *mut MaybeUninit<T> as *mut _, mem::size_of::<T>());
|
|
buffer.copy_to_slice(slice);
|
|
|
|
Ok(value.assume_init())
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Send objects of type `T` through parent-child streams.
|
|
#[derive(Debug)]
|
|
pub struct Sender<T, M: ?Sized + TransferMethod = TransferSerial>
|
|
{
|
|
tx: pipe::WriteHalf,
|
|
|
|
state: M::State,
|
|
|
|
_data: PhantomData<Vec<T>>,
|
|
_method: PhantomData<M>,
|
|
}
|
|
|
|
|
|
/// Receive objects of type `T` from a parent-child stream.
|
|
#[derive(Debug)]
|
|
pub struct Receiver<T, M: ?Sized + TransferMethod = TransferSerial>
|
|
{
|
|
rx: pipe::ReadHalf,
|
|
|
|
state: M::State,
|
|
|
|
_data: PhantomData<Vec<T>>,
|
|
_mathod: PhantomData<M>,
|
|
}
|
|
|
|
#[inline] pub fn pair<T: ChannelSerialValue>() -> (Sender<T, TransferSerial>, Receiver<T, TransferSerial>)
|
|
{
|
|
Sender::<_, TransferSerial>::new()
|
|
}
|
|
|
|
// TODO: TransferBitwise is very unsafe, document that here.
|
|
#[inline] pub unsafe fn pair_bitwise<T: ChannelBitwiseValue>() -> (Sender<T, TransferBitwise>, Receiver<T, TransferBitwise>)
|
|
{
|
|
Sender::<_, TransferBitwise>::new()
|
|
}
|
|
|
|
// TODO: What bounds should we have on this, if any? Send? Unpin? Maybe even restricted to Copy.
|
|
impl<T: ChannelBitwiseValue> Sender<T, TransferBitwise>
|
|
{
|
|
// TODO: TransferBitwise is very unsafe, document that here.
|
|
pub unsafe fn new() -> (Self, Receiver<T, TransferBitwise>)
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
impl<T: ChannelSerialValue> Sender<T, TransferSerial>
|
|
{
|
|
pub fn new() -> (Self, Receiver<T, TransferSerial>)
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
/// The kind of channel error
|
|
#[derive(Debug)]
|
|
#[non_exhaustive]
|
|
pub enum ErrorKind
|
|
{
|
|
Unknown,
|
|
|
|
Serialisation(serde_cbor::Error),
|
|
Deserialisation(serde_cbor::Error),
|
|
|
|
IO(io::Error),
|
|
}
|
|
|
|
/// An error in a channel operation.
|
|
#[derive(Debug)]
|
|
pub struct Error(Box<(ErrorKind, Option<bool>)>);
|
|
|
|
impl From<(ErrorKind, bool)> for Error
|
|
{
|
|
#[inline] fn from((k, s): (ErrorKind, bool)) -> Self
|
|
{
|
|
let from = (k, Some(s));
|
|
Self(Box::new(from))
|
|
}
|
|
}
|
|
|
|
impl From<ErrorKind> for Error
|
|
{
|
|
fn from(from: ErrorKind) -> Self
|
|
{
|
|
Self(Box::new((from, None)))
|
|
}
|
|
}
|
|
|
|
impl From<std::convert::Infallible> for Error
|
|
{
|
|
#[inline] fn from(from: std::convert::Infallible) -> Self
|
|
{
|
|
match from{}
|
|
}
|
|
}
|
|
|
|
remove! {
|
|
/// The kind of channel error
|
|
#[derive(Debug)]
|
|
#[non_exhaustive]
|
|
pub enum ErrorKind
|
|
{
|
|
IO(io::Error),
|
|
|
|
Unknown,
|
|
}
|
|
|
|
/// An error on a channel operation
|
|
#[derive(Debug)]
|
|
pub struct Error<T>(Box<(ErrorKind, bool, Option<T>)>);
|
|
|
|
impl fmt::Display for ErrorKind
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
|
|
impl<T: fmt::Debug> fmt::Display for Error<T>
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
write!(f, "channel<{}>: failed to ", std::any::type_name::<T>())?;
|
|
match (self.0.1, self.0.2.as_ref()) {
|
|
(true, None) => write!(f, "send"),
|
|
(false, None) => write!(f, "recv"),
|
|
(true, Some(val)) => write!(f, "send (value [{:?}])", val),
|
|
(false, Some(val)) => write!(f, "recv (value [{:?}])", val),
|
|
}?;
|
|
write!(f, ": {}", self.0.0)
|
|
}
|
|
}
|
|
|
|
impl<T> Error<T>
|
|
{
|
|
pub fn recv(kind: ErrorKind, value: Option<T>) -> Self
|
|
{
|
|
Self(Box::new((
|
|
kind,
|
|
false,
|
|
value
|
|
)))
|
|
}
|
|
pub fn send(kind: ErrorKind, value: Option<T>) -> Self
|
|
{
|
|
Self(Box::new((
|
|
kind,
|
|
true,
|
|
value
|
|
)))
|
|
}
|
|
}
|
|
}
|