You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
6.2 KiB

//! `Channel<T>`: Move objects between parent and child processes.
use super::*;
use bytes::{
Buf,
BufMut,
Bytes,
};
use std::marker::PhantomData;
use std::{fmt, error};
use std::io::{
self,
Read,
Write,
Cursor,
};
use serde::{
Serialize,
de::DeserializeOwned,
};
use sys::pipe;
/// A value that can be sent through a serial `Channel`
///
/// Bitwise channels can take `ChannelBitwiseValue` objects.
pub trait ChannelSerialValue: Serialize + DeserializeOwned{}
impl<T: ?Sized> ChannelSerialValue for T
where T: Serialize + DeserializeOwned{}
/// A value that can be sent through a bitwise `Channel`
///
/// Serial channels require `ChannelSerialValue`.
pub trait ChannelBitwiseValue: Send + Unpin{}
impl<T: ?Sized> ChannelBitwiseValue for T
where T: Send + Unpin{}
//TODO: We can create a non-stream version of this using a memory mapped ring-buffer on an fd shared between parent and child. (i.e: map two pages, the first and second one pointing to the start of the file). But that might be unneeded or too much work for this project's scope.
/// Controls how objects are sent through the streams.
pub trait TransferMethod
{
/// The returned buffer that is build from `send`.
///
/// `recv` takes any `impl Buf` object.
type Buffer: Buf;
/// An optional internal state object to hold any additional data needed.
/// This can be set to `()`.
type State;
/// Error for the send/recv conversion.
/// Must be able to convert into `channel::Error`.
type Error: Into<Error>;
/// Serialise the object `T` to a byte buffer for sending.
fn send<T>(state: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error>;
/// Read the object `T` from a byte buffer that was read from the pipe.
///
/// The buffer's size is handled by the channel.
fn recv<T, B: Buf>(state: &mut Self::State, buffer: B) -> Result<T, Error>;
}
/// Serialize objects deeply to send them.
#[derive(Debug)]
pub struct TransferSerial;
impl TransferMethod for TransferSerial
{
type Buffer = Cursor<Vec<u8>>;
type State = ();
type Error = Error;
fn send<T>(_: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error> {
todo!()
}
fn recv<T, B: Buf>(_: &mut Self::State, buffer: B) -> Result<T, Error> {
todo!()
}
}
/// Send objects through the stream as bitwise copied (unsafe.)
#[derive(Debug)]
pub struct TransferBitwise;
impl TransferMethod for TransferBitwise
{
type Buffer = Bytes;
type State = ();
type Error = std::convert::Infallible;
fn send<T>(_: &mut Self::State, obj: &T) -> Result<Self::Buffer, Error> {
let bytes = unsafe {
std::slice::from_raw_parts(obj as *const T as *const _, std::mem::size_of::<T>())
};
Ok(bytes.into())
}
fn recv<T, B: Buf>(_: &mut Self::State, mut buffer: B) -> Result<T, Error> {
use std::mem::{
self,
MaybeUninit,
};
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
unsafe {
let slice = std::slice::from_raw_parts_mut(&mut value as *mut MaybeUninit<T> as *mut _, mem::size_of::<T>());
buffer.copy_to_slice(slice);
Ok(value.assume_init())
}
}
}
/// Send objects of type `T` through parent-child streams.
#[derive(Debug)]
pub struct Sender<T, M: ?Sized + TransferMethod = TransferSerial>
{
tx: pipe::WriteHalf,
state: M::State,
_data: PhantomData<Vec<T>>,
_method: PhantomData<M>,
}
/// Receive objects of type `T` from a parent-child stream.
#[derive(Debug)]
pub struct Receiver<T, M: ?Sized + TransferMethod = TransferSerial>
{
rx: pipe::ReadHalf,
state: M::State,
_data: PhantomData<Vec<T>>,
_mathod: PhantomData<M>,
}
#[inline] pub fn pair<T: ChannelSerialValue>() -> (Sender<T, TransferSerial>, Receiver<T, TransferSerial>)
{
Sender::<_, TransferSerial>::new()
}
// TODO: TransferBitwise is very unsafe, document that here.
#[inline] pub unsafe fn pair_bitwise<T: ChannelBitwiseValue>() -> (Sender<T, TransferBitwise>, Receiver<T, TransferBitwise>)
{
Sender::<_, TransferBitwise>::new()
}
// TODO: What bounds should we have on this, if any? Send? Unpin? Maybe even restricted to Copy.
impl<T: ChannelBitwiseValue> Sender<T, TransferBitwise>
{
// TODO: TransferBitwise is very unsafe, document that here.
pub unsafe fn new() -> (Self, Receiver<T, TransferBitwise>)
{
todo!()
}
}
impl<T: ChannelSerialValue> Sender<T, TransferSerial>
{
pub fn new() -> (Self, Receiver<T, TransferSerial>)
{
todo!()
}
}
/// The kind of channel error
#[derive(Debug)]
#[non_exhaustive]
pub enum ErrorKind
{
Unknown,
Serialisation(serde_cbor::Error),
Deserialisation(serde_cbor::Error),
IO(io::Error),
}
/// An error in a channel operation.
#[derive(Debug)]
pub struct Error(Box<(ErrorKind, Option<bool>)>);
impl From<(ErrorKind, bool)> for Error
{
#[inline] fn from((k, s): (ErrorKind, bool)) -> Self
{
let from = (k, Some(s));
Self(Box::new(from))
}
}
impl From<ErrorKind> for Error
{
fn from(from: ErrorKind) -> Self
{
Self(Box::new((from, None)))
}
}
impl From<std::convert::Infallible> for Error
{
#[inline] fn from(from: std::convert::Infallible) -> Self
{
match from{}
}
}
remove! {
/// The kind of channel error
#[derive(Debug)]
#[non_exhaustive]
pub enum ErrorKind
{
IO(io::Error),
Unknown,
}
/// An error on a channel operation
#[derive(Debug)]
pub struct Error<T>(Box<(ErrorKind, bool, Option<T>)>);
impl fmt::Display for ErrorKind
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
todo!()
}
}
impl<T: fmt::Debug> fmt::Display for Error<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "channel<{}>: failed to ", std::any::type_name::<T>())?;
match (self.0.1, self.0.2.as_ref()) {
(true, None) => write!(f, "send"),
(false, None) => write!(f, "recv"),
(true, Some(val)) => write!(f, "send (value [{:?}])", val),
(false, Some(val)) => write!(f, "recv (value [{:?}])", val),
}?;
write!(f, ": {}", self.0.0)
}
}
impl<T> Error<T>
{
pub fn recv(kind: ErrorKind, value: Option<T>) -> Self
{
Self(Box::new((
kind,
false,
value
)))
}
pub fn send(kind: ErrorKind, value: Option<T>) -> Self
{
Self(Box::new((
kind,
true,
value
)))
}
}
}