fork::Child impl Future

master
Avril 4 years ago
parent afa1d93212
commit dd041b6571
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -131,7 +131,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}).await?;
println!("Child: {:?}", child);
println!("Waitpid: {:?}", child.wait().await.map_err(|x| x.to_string()));
//println!("Waitpid: {:?}", child.wait().await.map_err(|x| x.to_string()));
println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string()));
// end test
log::Logger::global().add_hook(log::custom::LogFileHook::new(log::Level::Debug, "test.log", "test.err.log", false));

@ -108,27 +108,30 @@ impl std::fmt::Display for Error
}
}
use pipe::Pipe;
/// Represents the detached child
#[derive(Debug)]
pub struct Child
{
pid: i32,
comm: Pipe,
}
#[derive(Debug)]
pub struct Parent {
pid: i32,
}
comm: Pipe,
}
/// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`.
///
/// # Notes
/// This fork does not set up the async runtime. Using Tokio from the child process will require manually setting up the runtime
pub async fn detach_closure<F: FnOnce(Parent)>(as_uid: Option<u32>, as_gid: Option<u32>, into: F) -> Result<Child, Errno<Error>> {
let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?;
let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?;
let child = unsafe{fork()};
if child == 0 {
// Is child
@ -137,7 +140,11 @@ pub async fn detach_closure<F: FnOnce(Parent)>(as_uid: Option<u32>, as_gid: Opti
unsafe {
if let Ok(_) = pipe_write_value(tx, &!0u32)
{
into(Parent{pid: libc::getppid()});
//if let Ok(mut rt) = new_runtime() {
// rt.block_on(async move {
into(Parent{pid: libc::getppid(),comm:comm_c});
// });
// }
}
}
};
@ -204,7 +211,7 @@ pub async fn detach_closure<F: FnOnce(Parent)>(as_uid: Option<u32>, as_gid: Opti
}
};
if err == !0u32 {
Ok(Child{pid:child})
Ok(Child{pid:child, comm: comm_p})
} else {
Err(Error::from(err).into())
}
@ -222,10 +229,12 @@ impl Child
{
/// Call `waitpid` on this child. Returns the status code if possible, or `Error` if error.
///
/// `Child` implements `Future`, and waiting on that directly is probably preferable to calling this method as it does no blocking.
///
/// # Notes
/// - When `threaded` feature is disabled, this function will not block at all, instead returning error if there is not status available, this is to prevent deadlock.
/// - This function (at present) will return `Error` when the child process exits, too. This is because we don't have `errno` access yet, I'm working on it.
pub async fn waitpid(&self) -> Result<i32, Errno<Error>>
#[deprecated] pub async fn waitpid(&self) -> Result<i32, Errno<Error>>
{
cfg_if! {
if #[cfg(feature="threaded")] {
@ -253,12 +262,15 @@ impl Child
/// Wait for the child process to end, ignoring any other signals.
///
/// `Child` implements `Future`, and waiting on that directly is probably preferable to calling this method as it does no blocking.
///
/// # Notes
/// - When `threaded` feature is disabled, this function will return immediately, this is an error-handling bug because we don't have `errno` access as of yet.
/// - This function will call `waitpid` untill it returns status `ECHILD` (child has exited). Other status values are ignored, if any.
pub async fn wait(&self) -> Result<(), Errno<Error>>
#[deprecated] pub async fn wait(&self) -> Result<(), Errno<Error>>
{
loop {
#[allow(deprecated)]
match self.waitpid().await {
Ok(v) => {
// keep going until we get `no child process'
@ -275,3 +287,67 @@ impl Child
}
}
}
use futures::Future;
use std::{
pin::Pin,
task::{
Context,
Poll,
},
};
impl Future for Child
{
type Output = Result<(), Errno<Error>>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output>
{
let pid = self.pid;
let future = async {
loop {
let mut status: i32 = 0;
if unsafe{libc::waitpid(pid, &mut status as *mut i32, 1)} == pid {
if status == 0 {
tokio::task::yield_now().await;
} else {
//We got a signal, but we don't care.
debug!("Got status {} from child {}, ignoring. ", status, pid);
tokio::task::yield_now().await;
}
} else {
let err: Errno<Error> = Errno::from(Error::WaitPid).into();
if err.error() == 0 {
tokio::task::yield_now().await;
} else if err.error() == 10 {
// Child exited
break Ok(());
}else {
break Err(err);
}
}
}
};
tokio::pin!(future);
future.poll(_cx)
}
}
// -- boilerplate
impl Child {
/// Get the pipe for talking to parent
pub fn pipe(&mut self) -> &mut Pipe
{
&mut self.comm
}
}
impl Parent {
/// Get the pipe for talking to child
pub fn pipe(&mut self) -> &mut Pipe
{
&mut self.comm
}
}

@ -2,6 +2,8 @@
use super::*;
use cfg_if::cfg_if;
pub mod user;
pub mod pipe;
@ -15,3 +17,26 @@ use errno::ResultExt;
{
unsafe{libc::getpid()}
}
/// Start the async runtime manually
pub fn new_runtime() -> Result<tokio::runtime::Runtime, tokio::io::Error>
{
use tokio::runtime::Builder;
cfg_if! {
if #[cfg(feature="threaded")] {
Builder::new()
.enable_all()
.threaded_scheduler()
.thread_name("sys invoked runtime")
.build()
} else {
Builder::new()
.enable_all()
.basic_scheduler()
.thread_name("sys invoked runtime")
.build()
}
}
}

@ -1,4 +1,5 @@
//! `pipe()` related operations
//! The async pipe is kinda broke, but that's fine because we don't really need it anyways.
use super::*;
use errno::Errno;
@ -103,3 +104,289 @@ pub(super) unsafe fn pipe_read_value<T>(fd: i32) -> Result<T, Errno<Error>>
Err(Error::Broken.into())
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct WriteHalf(i32);
#[derive(Debug, PartialEq, Eq)]
pub struct ReadHalf(i32);
#[derive(Debug, PartialEq, Eq)]
pub struct Pipe
{
tx: WriteHalf,
rx: ReadHalf,
}
impl Pipe
{
/// Create from a split
pub fn from_split(tx: WriteHalf, rx: ReadHalf) -> Self
{
Self{
tx,
rx,
}
}
/// Create a new pipe
pub fn new() -> Result<Self, Error>
{
let (tx,rx) = pipe()?;
Ok(Self::from_split(tx,rx))
}
/// Split into write and read half
pub fn split(self) -> (WriteHalf, ReadHalf)
{
(self.tx, self.rx)
}
}
const GETFL: i32 = 3;
const SETFL: i32 = 4;
const NOBLOCK: i32 = 2048;
fn set_non_blocking(fd: i32)
{
use libc::fcntl;
unsafe {
fcntl(fd, SETFL, fcntl(fd, GETFL, 0) | NOBLOCK);
}
}
impl WriteHalf
{
/// Create from a raw file descriptor
pub unsafe fn from_raw(fd: i32) -> Self
{
set_non_blocking(fd);
Self(fd)
}
/// Consume and return the output file descriptor
pub fn into_raw(self) -> i32
{
self.0
}
}
impl ReadHalf
{
/// Create from a raw file descriptor
pub unsafe fn from_raw(fd: i32) -> Self
{
set_non_blocking(fd);
Self(fd)
}
/// Consume and return the output file descriptor
pub fn into_raw(self) -> i32
{
self.0
}
}
/// Create a new pipe's `Read` and `Write` halfs
pub fn pipe() -> Result<(WriteHalf, ReadHalf), Error>
{
let (rx, tx) = unix_pipe().map_err(|x| x.into_inner())?;
if rx <= 0 || tx <= 0 {
return Err(Error::Create);
}
unsafe {
Ok((WriteHalf::from_raw(tx), ReadHalf::from_raw(rx)))
}
}
/// Create 2 linked together pipes.
///
/// # Usage
/// Useful for `fork()`. 1st is parent, 2nd moved to client
pub fn multi() -> Result<(Pipe, Pipe), Error>
{
let (tx_c, rx_p) = pipe()?;
let (tx_p, rx_c) = pipe()?;
Ok((Pipe::from_split(tx_p, rx_p), Pipe::from_split(tx_c, rx_c)))
}
use tokio::{
io::{
AsyncWrite,
AsyncRead,
},
};
use tokio::io::Error as AsyncError;
use std::{
pin::Pin,
task::{
Context,
Poll,
},
};
const POLL_IN: i16 = 1;
const POLL_OUT: i16 = 4;
impl AsyncWrite for WriteHalf
{
#[inline] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), AsyncError>>
{
// as far as i can tell this is a no-op with `write()`?
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), AsyncError>>
{
let fd = self.0;
if let Poll::Ready(res) = self.poll_flush(cx) {
unsafe{libc::close(fd)};
Poll::Ready(res)
} else {
Poll::Pending
}
}
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, AsyncError>>
{
use libc::{
poll,
write,
pollfd,
};
let mut fd = pollfd {
fd: self.0,
revents: 0,
events: POLL_OUT,
};
let poll = unsafe {
poll(&mut fd as *mut pollfd, 1, 0)
};
Poll::Ready(if poll < 0 {
Err(AsyncError::from_raw_os_error(errno::raw()))
} else if poll > 0 {
if fd.revents & POLL_OUT == POLL_OUT {
let wr = unsafe {
write(self.0, &buf[0] as *const u8 as *const libc::c_void, buf.len())
};
if wr < 0 {
Err(AsyncError::from_raw_os_error(errno::raw()))
} else {
Ok(wr as usize)
}
} else {
Err(AsyncError::from_raw_os_error(errno::raw()))
}
} else {
return Poll::Pending;
})
}
}
impl AsyncRead for ReadHalf
{
fn poll_read(self: Pin<&mut Self>, _cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, AsyncError>>
{
use libc::{
poll,
read,
pollfd,
};
let mut fd = pollfd {
fd: self.0,
revents: 0,
events: POLL_IN,
};
let poll = unsafe {
poll(&mut fd as *mut pollfd, 1, 0)
};
Poll::Ready(if poll < 0 {
Err(AsyncError::from_raw_os_error(errno::raw()))
} else if poll > 0 {
if fd.revents & POLL_IN == POLL_IN {
let wr = unsafe {
read(self.0, &mut buf[0] as *mut u8 as *mut libc::c_void, buf.len())
};
if wr < 0 {
Err(AsyncError::from_raw_os_error(errno::raw()))
} else {
Ok(wr as usize)
}
} else {
Err(AsyncError::from_raw_os_error(errno::raw()))
}
} else {
return Poll::Pending;
})
}
}
use std::ops::Drop;
impl Drop for WriteHalf
{
fn drop(&mut self)
{
unsafe {
libc::close(self.0);
}
}
}
impl Drop for ReadHalf
{
fn drop(&mut self)
{
unsafe {
libc::close(self.0);
}
}
}
use tokio::prelude::*;
use futures::Future;
impl AsyncWrite for Pipe
{
#[inline] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), AsyncError>>
{
let future = async {
self.tx.flush().await
};
tokio::pin!(future);
future.poll(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), AsyncError>>
{
let future = async {
self.tx.shutdown().await
};
tokio::pin!(future);
future.poll(cx)
}
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, AsyncError>>
{
let future = async {
self.tx.write(buf).await
};
tokio::pin!(future);
future.poll(cx)
}
}
impl AsyncRead for Pipe
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, AsyncError>>
{
let future = async {
self.rx.read(buf).await
};
tokio::pin!(future);
future.poll(cx)
}
}

Loading…
Cancel
Save