You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
478 lines
14 KiB
478 lines
14 KiB
//! Wrapper around internal pipes fds.
|
|
//!
|
|
//! These pipe fds are valid across forks, and can be used for IPC between parent and non-exec()d child process forks.
|
|
use super::*;
|
|
use errno::Errno;
|
|
use std::{error, fmt};
|
|
use std::ops::Drop;
|
|
use std::io::{
|
|
self,
|
|
Read, Write,
|
|
};
|
|
use std::panic::{
|
|
AssertUnwindSafe,
|
|
catch_unwind,
|
|
resume_unwind,
|
|
};
|
|
|
|
// TODO: Add non-`sys` type: `Channel<T>`, which can be used to send/recv typed data between parent and child.
|
|
// It should leverage the two below functions: `move_*_stream()`.
|
|
|
|
/// Move a value into a stream.
|
|
///
|
|
/// This function will return `Ok` if and only if all bytes from `val` were successfully written to the stream `to`.
|
|
/// If only a subset of them were written, then the data is corrupt and that is counted as an error.
|
|
///
|
|
/// # Safety
|
|
/// The object `val` is written into the stream `to` verbatim. Its destructor is not run.
|
|
/// The object should be read verbatim from the stream with no errors or corruption before its dropped, or it will leak resources.
|
|
///
|
|
/// This function is safe, but reading from the stream into a new object is not.
|
|
///
|
|
/// ## Unpin
|
|
/// `T` should be `Unpin` in case it contains self-referrential data, as those references will no longer be valid after being moved across a stream.
|
|
// TODO: Should we expose these to the user somehow, with an unsafe helper module maybe? They could be useful.
|
|
// Or perhaps: Add the functionality to move types to/from the child/parent into the library, similar to `mpsc`.
|
|
|
|
// XXX: This is not safe, and will cause a memory leak when moving from parent to child. If a box is passed to the child, then it will not be deep copied, nor will it be correctly freed on the parent. Idk how to handle this for complex types...
|
|
pub(crate) fn move_write_value<T: Send, S: Write>(mut to: S, val: T) -> io::Result<()>
|
|
{
|
|
let bytes = unsafe {
|
|
std::slice::from_raw_parts(&val as *const T as *const u8, std::mem::size_of::<T>())
|
|
};
|
|
// Catch any panics that may happen in `write`.
|
|
let res = catch_unwind(AssertUnwindSafe(move || {
|
|
to.write_all(bytes)
|
|
}));
|
|
// Ensure this happens even if the above panics.
|
|
drop(bytes); // Make sure `bytes` isn't dangling after we move `val` into `forget()`.
|
|
std::mem::forget(val);
|
|
// Resume unwind if one occoured, if not return the value
|
|
match res {
|
|
Ok(v) => v,
|
|
Err(panic) => resume_unwind(panic),
|
|
}
|
|
}
|
|
|
|
/// Write a value into a stream. The value can be unsized but must be `Send` as we cannot guarantee the stream will not be read on a seperate thread.
|
|
///
|
|
/// This is a safe function, but reading from the other stream is not.
|
|
/// Ownership is not transfered to the other side of the stream, so make sure that if the value is not `Copy` any resources owned by `val` are properly transfered to the read object.
|
|
// Should this require `Send` as a bound?
|
|
pub(crate) fn copy_write_value<T: Send, S: Write>(mut to: S, val: &T) -> io::Result<usize>
|
|
{
|
|
let sz = std::mem::size_of_val(val);
|
|
let bytes = unsafe {
|
|
std::slice::from_raw_parts(val as *const T as *const _, sz)
|
|
};
|
|
to.write_all(bytes)?;
|
|
Ok(sz)
|
|
}
|
|
|
|
/// Read a value from a stream.
|
|
///
|
|
/// # Safety
|
|
/// This is extremely unsafe, it reads the type `T` bitwise from the stream.
|
|
/// The caller must ensure that any pointers contained within `T` remain valid after being passed through a stream, and that the type passed through has not been corrupted within the stream.
|
|
pub(crate) unsafe fn copy_read_value<T: Send, S: Read>(mut from: S) -> io::Result<T>
|
|
{
|
|
let mut value = std::mem::MaybeUninit::<T>::uninit();
|
|
Ok({
|
|
let bytes = {
|
|
std::slice::from_raw_parts_mut(value.as_mut_ptr() as *mut _ as *mut _, std::mem::size_of::<T>())
|
|
};
|
|
from.read_exact(&mut bytes[..])?;
|
|
{
|
|
value.assume_init()
|
|
}
|
|
})
|
|
}
|
|
|
|
|
|
/// Read a value from a stream.
|
|
///
|
|
/// This is currently identical to `copy_read_value()`.
|
|
///
|
|
/// # Safety
|
|
/// This is extremely unsafe, it reads the type `T` bitwise from the stream.
|
|
/// The caller must ensure that any pointers contained within `T` remain valid after being passed through a stream, and that the type passed through has not been corrupted within the stream.
|
|
#[inline] pub(crate) unsafe fn move_read_value<T: Send, S: Read>(from: S) -> io::Result<T>
|
|
{
|
|
// These are effectively the same.
|
|
copy_read_value(from)
|
|
}
|
|
|
|
|
|
/// Kind of pipe error
|
|
#[derive(Debug)]
|
|
#[non_exhaustive]
|
|
pub enum ErrorKind
|
|
{
|
|
/// Failed to create pipes
|
|
Create,
|
|
/// Unknown error
|
|
Unknown,
|
|
}
|
|
|
|
/// A raw unix pipe.
|
|
///
|
|
/// Encapsulates a raw fd, proventing arbitrary operations on it while it is owned by the instance.
|
|
/// The fd is closed when this instance is dropped.
|
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
#[repr(transparent)]
|
|
pub struct RawPipe(i32);
|
|
|
|
impl io::Read for RawPipe
|
|
{
|
|
#[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
Ok(read_raw(self.0, buf)?)
|
|
}
|
|
}
|
|
impl io::Write for RawPipe
|
|
{
|
|
#[inline] fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
Ok(write_raw(self.0, buf)?)
|
|
}
|
|
#[inline] fn flush(&mut self) -> io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl From<RawPipe> for i32
|
|
{
|
|
#[inline] fn from(from: RawPipe) -> Self
|
|
{
|
|
from.take()
|
|
}
|
|
}
|
|
|
|
impl RawPipe
|
|
{
|
|
/// Transfer ownership of the internal fd to the caller
|
|
#[inline] pub fn take(self) -> i32
|
|
{
|
|
let i = self.0;
|
|
std::mem::forget(self);
|
|
i
|
|
}
|
|
|
|
/// Transfer ownership of an fd into a new managed instance.
|
|
///
|
|
/// # Safety
|
|
/// The caller should ensure the fd is valid, or all operations on the instance returned will fail.
|
|
#[inline] pub const unsafe fn new_unchecked(int: i32) -> Self
|
|
{
|
|
Self(int)
|
|
}
|
|
|
|
/// Get a raw handle to the fd
|
|
///
|
|
/// # Safety
|
|
/// This operation is unsafe as the caller could close the fd while its still owned by the instance.
|
|
/// If you want to transfer ownership of the fd to yourself, use `take()` or `Into::into()`.
|
|
#[inline] pub const unsafe fn as_raw_fd(&self) -> i32
|
|
{
|
|
self.0
|
|
}
|
|
|
|
/// Transfer ownership of an fd into a new managed instance.
|
|
///
|
|
/// # Panics
|
|
/// If the fd value was below 0.
|
|
pub fn new(int: i32) -> Self
|
|
{
|
|
assert!(int > 0, "invalid fd");
|
|
Self(int)
|
|
}
|
|
}
|
|
|
|
impl Drop for RawPipe
|
|
{
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
libc::close(self.0);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Create a raw pipe pair. Returns `(tx, rx)`: writer then reader.
|
|
pub(crate) fn unix_pipe() -> Result<(RawPipe, RawPipe), Error>
|
|
{
|
|
use libc::pipe;
|
|
let mut pipe_fds: [libc::c_int; 2] = [-1;2];
|
|
if unsafe{pipe(pipe_fds.as_mut_ptr())} == 0 {
|
|
Ok((RawPipe(i32::from(pipe_fds[1])), RawPipe(i32::from(pipe_fds[0]))))
|
|
} else {
|
|
Err(ErrorKind::Create.into_error())
|
|
}
|
|
}
|
|
|
|
/// A writer for a bi-directinoal unix pipe
|
|
///
|
|
/// Created by splitting `Pipe`, data written to this is available to be read from its `ReadHalf` counterpart.
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct WriteHalf(RawPipe);
|
|
|
|
/// A reader for a bi-directional unix pipe.
|
|
///
|
|
/// Created by splitting `Pipe`, data read from this is data that was sent to its `WriteHalf` counterpart.
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct ReadHalf(RawPipe);
|
|
|
|
/// A bi-drectional unix pipe
|
|
///
|
|
/// Data written to the pipe is available to be read from it (unless the pipe is mismatched.)
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct Pipe
|
|
{
|
|
tx: RawPipe,
|
|
rx: RawPipe,
|
|
}
|
|
|
|
impl Pipe
|
|
{
|
|
/// Split this pipe into a read and write half.
|
|
///
|
|
/// Data written to the `WriteHalf` is sent to the `ReadHalf` (unless the pipe is mismatched.)
|
|
#[inline] pub fn split(self) -> (WriteHalf, ReadHalf)
|
|
{
|
|
(WriteHalf(self.tx), ReadHalf(self.rx))
|
|
}
|
|
|
|
/// Create a `Pipe` from two halves.
|
|
///
|
|
/// # Mismatched halves
|
|
/// The halves do not need to have originated from the same `Pipe`.
|
|
/// If they are not, then writing to the `Pipe` will send the data to its original receiver, and reading from it will take the data from its original sender.
|
|
#[inline] pub fn unsplit(tx: WriteHalf, rx: ReadHalf) -> Self
|
|
{
|
|
Self::new_from_raw((tx.0, rx.0))
|
|
}
|
|
|
|
/// Create a new `Pipe` from a pair of raw file descriptors.
|
|
///
|
|
/// # Safety
|
|
/// Does not check that the integers provided are valid and open file descriptors.
|
|
#[inline(always)] fn new_from_raw((tx, rx): (RawPipe, RawPipe)) -> Self
|
|
{
|
|
Self{tx,rx}
|
|
}
|
|
|
|
/// Create a new `Pipe` from a pair of raw file descriptors.
|
|
///
|
|
/// # Safety
|
|
/// The caller must check that `tx` and `rx` are valid, open file descriptors
|
|
// TODO: When writing the async version make sure to state that the fds need to be non-blocking.
|
|
#[inline] pub unsafe fn from_raw(tx: RawPipe, rx: RawPipe) -> Self
|
|
{
|
|
Self::new_from_raw((tx, rx))
|
|
}
|
|
|
|
/// Transfer the ownership of the socket's inner Tx and Rx fds.
|
|
pub fn into_raw(self) -> (RawPipe, RawPipe)
|
|
{
|
|
(self.tx, self.rx)
|
|
}
|
|
|
|
/// Create a new `Pipe` from two new file descriptors that stream to eachother.
|
|
///
|
|
/// Data written to this pipe will also be read from it.
|
|
/// The pipe can be `split()`, and then `unsplit()` with a different split pipe's half to create a stream between those two pipes.
|
|
#[inline] pub fn new() -> Result<Self, Error>
|
|
{
|
|
let sk = unix_pipe()?;
|
|
Ok(Self::new_from_raw(sk))
|
|
}
|
|
}
|
|
|
|
/// Create a pair of bi-directional linked pipes.
|
|
///
|
|
/// Writing into one of the pipes will send the data to the other one, and vise-versa.
|
|
pub fn pair() -> Result<(Pipe, Pipe), Error>
|
|
{
|
|
let (a, b) = (Pipe::new()?, Pipe::new()?);
|
|
let (atx, arx) = a.split();
|
|
let (btx, brx) = b.split();
|
|
Ok((
|
|
Pipe::unsplit(atx, brx),
|
|
Pipe::unsplit(btx, arx),
|
|
))
|
|
}
|
|
|
|
/// Write to a raw fd.
|
|
fn write_raw(sock: i32, data: &[u8]) -> Result<usize, Errno>
|
|
{
|
|
match unsafe {libc::write(sock, data.as_ptr() as *const _, data.len())} {
|
|
x if x < 0 => Err(Errno::last().unwrap_err()),
|
|
x => Ok(usize::try_from(x).unwrap()),
|
|
}
|
|
}
|
|
|
|
/// Read from a raw fd.
|
|
fn read_raw(sock: i32, data: &mut [u8]) -> Result<usize, Errno>
|
|
{
|
|
match unsafe {libc::read(sock, data.as_ptr() as *mut _, data.len())} {
|
|
x if x < 0 => Err(Errno::last().unwrap_err()),
|
|
x => Ok(usize::try_from(x).unwrap()),
|
|
}
|
|
}
|
|
|
|
// There's nothing to flush, the fd isn't buffered.
|
|
// fn flush_raw(sock: i32) -> Result<(), Errno>
|
|
// {
|
|
// libc::fsync(sock)
|
|
// }
|
|
|
|
impl io::Read for Pipe
|
|
{
|
|
#[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.rx.read(buf)
|
|
}
|
|
}
|
|
impl io::Write for Pipe
|
|
{
|
|
#[inline] fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
self.tx.write(buf)
|
|
}
|
|
#[inline] fn flush(&mut self) -> io::Result<()> {
|
|
self.tx.flush()
|
|
}
|
|
}
|
|
|
|
impl io::Read for ReadHalf
|
|
{
|
|
#[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.0.read(buf)
|
|
}
|
|
}
|
|
impl io::Write for WriteHalf
|
|
{
|
|
#[inline] fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
self.0.write(buf)
|
|
}
|
|
#[inline] fn flush(&mut self) -> io::Result<()> {
|
|
self.0.flush()
|
|
}
|
|
}
|
|
|
|
impl fmt::Display for ErrorKind
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
match self {
|
|
Self::Create => write!(f, "failed to create raw pipes"),
|
|
_ => write!(f, "unknown error"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// An error on a pipe operation
|
|
#[derive(Debug)]
|
|
pub struct Error(pub(super) Box<(ErrorKind, Errno)>);
|
|
|
|
impl ErrorKind
|
|
{
|
|
/// Consume into `Error` using the last `errno` value, if it's not set then return `val`.
|
|
#[inline] pub(crate) fn try_into_error_or<T>(self, val: T) -> Result<T, Error>
|
|
{
|
|
Error::or_last(self, val)
|
|
}
|
|
/// Consume into `Error` using the last `errno` value, if it's not set then return the result of `fun`.
|
|
#[inline] pub(crate) fn try_into_error_or_with<F, T>(self, fun: F) -> Result<T, Error>
|
|
where F: FnOnce() -> T
|
|
{
|
|
Error::or_last_with(self, fun)
|
|
}
|
|
/// Consume into `Error` using the last `errno` value.
|
|
pub(crate) fn try_into_error(self) -> Result<(), Error>
|
|
{
|
|
Error::last(self)
|
|
}
|
|
|
|
/// Consume into an `Error` using the last `errno` value.
|
|
///
|
|
/// # Panics
|
|
/// If `errno` was success.
|
|
#[inline] pub(crate) fn into_error(self) -> Error
|
|
{
|
|
Self::try_into_error(self).expect_err("Expected an error to be set in errno")
|
|
}
|
|
}
|
|
|
|
impl Error
|
|
{
|
|
/// Create a new error of this kind from the last set errno if it's not success.
|
|
/// If it is, return `val`.
|
|
#[inline] pub(crate) fn or_last<T>(err: ErrorKind, val: T) -> Result<T, Self>
|
|
{
|
|
Self::or_last_with(err, move || val)
|
|
}
|
|
/// Create a new error of this kind from the last set errno if it's not success.
|
|
/// If it is, return the result of `fun`.
|
|
pub(crate) fn or_last_with<F, T>(err: ErrorKind, fun: F) -> Result<T, Self>
|
|
where F: FnOnce() -> T
|
|
{
|
|
Err(Self(Box::new((err, match Errno::or_last_with(fun) {
|
|
Ok(v) => return Ok(v),
|
|
Err(e) => e,
|
|
}))))
|
|
}
|
|
/// Create a new error of this kind from the last set errno (if it's not success.)
|
|
pub(crate) fn last(err: ErrorKind) -> Result<(), Self>
|
|
{
|
|
Err(Self(Box::new((err, match Errno::last() {
|
|
Ok(()) => return Ok(()),
|
|
Err(e) => e,
|
|
}))))
|
|
}
|
|
|
|
/// Create a new error of this kind with this errno value
|
|
#[inline] pub fn new(kind: ErrorKind, err: Errno) -> Self
|
|
{
|
|
Self(Box::new((kind, err)))
|
|
}
|
|
|
|
/// The kind of pipe error
|
|
#[inline] pub fn kind(&self) -> &ErrorKind
|
|
{
|
|
&self.0.0
|
|
}
|
|
|
|
/// The errno value
|
|
#[inline] pub fn error(&self) -> &Errno
|
|
{
|
|
&self.0.1
|
|
}
|
|
}
|
|
|
|
impl error::Error for Error
|
|
{
|
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
|
Some(&self.0.1)
|
|
}
|
|
}
|
|
impl fmt::Display for Error
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
write!(f, "{}: {}", self.0.0, self.0.1)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests
|
|
{
|
|
use std::io::{Read, Write};
|
|
#[test]
|
|
fn pipe_single_rw()
|
|
{
|
|
const DATA: &'static [u8] = b"hello world";
|
|
let mut pipe = super::Pipe::new().expect("Creation");
|
|
print!("Pipe: {:?}", pipe);
|
|
pipe.write_all(DATA).expect("write (ex)");
|
|
let mut buffer = [0u8; DATA.len()];
|
|
pipe.read_exact(&mut buffer[..]).expect("read (ex)");
|
|
assert_eq!(&buffer, DATA, "data different");
|
|
}
|
|
}
|