From dd041b6571672df07b3c799a545408e40f21e22c Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 6 Aug 2020 02:58:36 +0100 Subject: [PATCH] fork::Child impl Future --- src/main.rs | 3 +- src/sys/fork.rs | 94 ++++++++++++++-- src/sys/mod.rs | 25 +++++ src/sys/pipe.rs | 287 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 399 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1ae70f4..553d630 100644 --- a/src/main.rs +++ b/src/main.rs @@ -131,7 +131,8 @@ async fn main() -> Result<(), Box> { }).await?; println!("Child: {:?}", child); - println!("Waitpid: {:?}", child.wait().await.map_err(|x| x.to_string())); + //println!("Waitpid: {:?}", child.wait().await.map_err(|x| x.to_string())); + println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string())); // end test log::Logger::global().add_hook(log::custom::LogFileHook::new(log::Level::Debug, "test.log", "test.err.log", false)); diff --git a/src/sys/fork.rs b/src/sys/fork.rs index 9803149..2dda10f 100644 --- a/src/sys/fork.rs +++ b/src/sys/fork.rs @@ -108,27 +108,30 @@ impl std::fmt::Display for Error } } +use pipe::Pipe; + /// Represents the detached child #[derive(Debug)] pub struct Child { pid: i32, + + comm: Pipe, } #[derive(Debug)] pub struct Parent { pid: i32, -} + comm: Pipe, +} /// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. -/// -/// # Notes -/// This fork does not set up the async runtime. Using Tokio from the child process will require manually setting up the runtime pub async fn detach_closure(as_uid: Option, as_gid: Option, into: F) -> Result> { let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?; - + + let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?; let child = unsafe{fork()}; if child == 0 { // Is child @@ -137,7 +140,11 @@ pub async fn detach_closure(as_uid: Option, as_gid: Opti unsafe { if let Ok(_) = pipe_write_value(tx, &!0u32) { - into(Parent{pid: libc::getppid()}); + //if let Ok(mut rt) = new_runtime() { + // rt.block_on(async move { + into(Parent{pid: libc::getppid(),comm:comm_c}); + // }); + // } } } }; @@ -204,7 +211,7 @@ pub async fn detach_closure(as_uid: Option, as_gid: Opti } }; if err == !0u32 { - Ok(Child{pid:child}) + Ok(Child{pid:child, comm: comm_p}) } else { Err(Error::from(err).into()) } @@ -222,10 +229,12 @@ impl Child { /// Call `waitpid` on this child. Returns the status code if possible, or `Error` if error. /// + /// `Child` implements `Future`, and waiting on that directly is probably preferable to calling this method as it does no blocking. + /// /// # Notes /// - When `threaded` feature is disabled, this function will not block at all, instead returning error if there is not status available, this is to prevent deadlock. /// - This function (at present) will return `Error` when the child process exits, too. This is because we don't have `errno` access yet, I'm working on it. - pub async fn waitpid(&self) -> Result> + #[deprecated] pub async fn waitpid(&self) -> Result> { cfg_if! { if #[cfg(feature="threaded")] { @@ -253,12 +262,15 @@ impl Child /// Wait for the child process to end, ignoring any other signals. /// + /// `Child` implements `Future`, and waiting on that directly is probably preferable to calling this method as it does no blocking. + /// /// # Notes /// - When `threaded` feature is disabled, this function will return immediately, this is an error-handling bug because we don't have `errno` access as of yet. /// - This function will call `waitpid` untill it returns status `ECHILD` (child has exited). Other status values are ignored, if any. - pub async fn wait(&self) -> Result<(), Errno> + #[deprecated] pub async fn wait(&self) -> Result<(), Errno> { loop { + #[allow(deprecated)] match self.waitpid().await { Ok(v) => { // keep going until we get `no child process' @@ -275,3 +287,67 @@ impl Child } } } + +use futures::Future; + + +use std::{ + pin::Pin, + task::{ + Context, + Poll, + }, +}; +impl Future for Child +{ + type Output = Result<(), Errno>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll + { + let pid = self.pid; + let future = async { + loop { + let mut status: i32 = 0; + if unsafe{libc::waitpid(pid, &mut status as *mut i32, 1)} == pid { + if status == 0 { + tokio::task::yield_now().await; + } else { + //We got a signal, but we don't care. + debug!("Got status {} from child {}, ignoring. ", status, pid); + tokio::task::yield_now().await; + } + } else { + let err: Errno = Errno::from(Error::WaitPid).into(); + if err.error() == 0 { + tokio::task::yield_now().await; + } else if err.error() == 10 { + // Child exited + break Ok(()); + }else { + break Err(err); + } + } + } + }; + tokio::pin!(future); + future.poll(_cx) + } +} + +// -- boilerplate + +impl Child { + /// Get the pipe for talking to parent + pub fn pipe(&mut self) -> &mut Pipe + { + &mut self.comm + } +} + +impl Parent { + /// Get the pipe for talking to child + pub fn pipe(&mut self) -> &mut Pipe + { + &mut self.comm + } +} diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 904ba2d..807d4d2 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -2,6 +2,8 @@ use super::*; +use cfg_if::cfg_if; + pub mod user; pub mod pipe; @@ -15,3 +17,26 @@ use errno::ResultExt; { unsafe{libc::getpid()} } + +/// Start the async runtime manually +pub fn new_runtime() -> Result +{ + use tokio::runtime::Builder; + + + cfg_if! { + if #[cfg(feature="threaded")] { + Builder::new() + .enable_all() + .threaded_scheduler() + .thread_name("sys invoked runtime") + .build() + } else { + Builder::new() + .enable_all() + .basic_scheduler() + .thread_name("sys invoked runtime") + .build() + } + } +} diff --git a/src/sys/pipe.rs b/src/sys/pipe.rs index aa30b25..880b346 100644 --- a/src/sys/pipe.rs +++ b/src/sys/pipe.rs @@ -1,4 +1,5 @@ //! `pipe()` related operations +//! The async pipe is kinda broke, but that's fine because we don't really need it anyways. use super::*; use errno::Errno; @@ -103,3 +104,289 @@ pub(super) unsafe fn pipe_read_value(fd: i32) -> Result> Err(Error::Broken.into()) } } + +#[derive(Debug, PartialEq, Eq)] +pub struct WriteHalf(i32); +#[derive(Debug, PartialEq, Eq)] +pub struct ReadHalf(i32); + +#[derive(Debug, PartialEq, Eq)] +pub struct Pipe +{ + tx: WriteHalf, + rx: ReadHalf, +} + +impl Pipe +{ + /// Create from a split + pub fn from_split(tx: WriteHalf, rx: ReadHalf) -> Self + { + Self{ + tx, + rx, + } + } + + /// Create a new pipe + pub fn new() -> Result + { + let (tx,rx) = pipe()?; + Ok(Self::from_split(tx,rx)) + } + + /// Split into write and read half + pub fn split(self) -> (WriteHalf, ReadHalf) + { + (self.tx, self.rx) + } +} + +const GETFL: i32 = 3; +const SETFL: i32 = 4; +const NOBLOCK: i32 = 2048; + +fn set_non_blocking(fd: i32) +{ + use libc::fcntl; + + unsafe { + fcntl(fd, SETFL, fcntl(fd, GETFL, 0) | NOBLOCK); + } +} + +impl WriteHalf +{ + /// Create from a raw file descriptor + pub unsafe fn from_raw(fd: i32) -> Self + { + set_non_blocking(fd); + Self(fd) + } + + /// Consume and return the output file descriptor + pub fn into_raw(self) -> i32 + { + self.0 + } +} + +impl ReadHalf +{ + /// Create from a raw file descriptor + pub unsafe fn from_raw(fd: i32) -> Self + { + set_non_blocking(fd); + Self(fd) + } + + /// Consume and return the output file descriptor + pub fn into_raw(self) -> i32 + { + self.0 + } +} + +/// Create a new pipe's `Read` and `Write` halfs +pub fn pipe() -> Result<(WriteHalf, ReadHalf), Error> +{ + let (rx, tx) = unix_pipe().map_err(|x| x.into_inner())?; + + if rx <= 0 || tx <= 0 { + return Err(Error::Create); + } + unsafe { + Ok((WriteHalf::from_raw(tx), ReadHalf::from_raw(rx))) + } +} + +/// Create 2 linked together pipes. +/// +/// # Usage +/// Useful for `fork()`. 1st is parent, 2nd moved to client +pub fn multi() -> Result<(Pipe, Pipe), Error> +{ + let (tx_c, rx_p) = pipe()?; + let (tx_p, rx_c) = pipe()?; + + Ok((Pipe::from_split(tx_p, rx_p), Pipe::from_split(tx_c, rx_c))) +} + +use tokio::{ + io::{ + AsyncWrite, + AsyncRead, + }, +}; +use tokio::io::Error as AsyncError; +use std::{ + pin::Pin, + task::{ + Context, + Poll, + }, +}; + +const POLL_IN: i16 = 1; +const POLL_OUT: i16 = 4; + +impl AsyncWrite for WriteHalf +{ + #[inline] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> + { + // as far as i can tell this is a no-op with `write()`? + Poll::Ready(Ok(())) + } + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> + { + let fd = self.0; + if let Poll::Ready(res) = self.poll_flush(cx) { + unsafe{libc::close(fd)}; + Poll::Ready(res) + } else { + Poll::Pending + } + } + fn poll_write(self: Pin<&mut Self>, _cx: &mut Context, buf: &[u8]) -> Poll> + { + use libc::{ + poll, + write, + pollfd, + }; + let mut fd = pollfd { + fd: self.0, + revents: 0, + events: POLL_OUT, + }; + let poll = unsafe { + poll(&mut fd as *mut pollfd, 1, 0) + }; + + Poll::Ready(if poll < 0 { + Err(AsyncError::from_raw_os_error(errno::raw())) + } else if poll > 0 { + if fd.revents & POLL_OUT == POLL_OUT { + let wr = unsafe { + write(self.0, &buf[0] as *const u8 as *const libc::c_void, buf.len()) + }; + if wr < 0 { + Err(AsyncError::from_raw_os_error(errno::raw())) + } else { + Ok(wr as usize) + } + } else { + Err(AsyncError::from_raw_os_error(errno::raw())) + } + } else { + return Poll::Pending; + }) + } +} + +impl AsyncRead for ReadHalf +{ + fn poll_read(self: Pin<&mut Self>, _cx: &mut Context, buf: &mut [u8]) -> Poll> + { + use libc::{ + poll, + read, + pollfd, + }; + let mut fd = pollfd { + fd: self.0, + revents: 0, + events: POLL_IN, + }; + let poll = unsafe { + poll(&mut fd as *mut pollfd, 1, 0) + }; + + Poll::Ready(if poll < 0 { + Err(AsyncError::from_raw_os_error(errno::raw())) + } else if poll > 0 { + if fd.revents & POLL_IN == POLL_IN { + let wr = unsafe { + read(self.0, &mut buf[0] as *mut u8 as *mut libc::c_void, buf.len()) + }; + if wr < 0 { + Err(AsyncError::from_raw_os_error(errno::raw())) + } else { + Ok(wr as usize) + } + } else { + Err(AsyncError::from_raw_os_error(errno::raw())) + } + } else { + return Poll::Pending; + }) + } +} + +use std::ops::Drop; + +impl Drop for WriteHalf +{ + fn drop(&mut self) + { + unsafe { + libc::close(self.0); + } + + } +} + +impl Drop for ReadHalf +{ + fn drop(&mut self) + { + unsafe { + libc::close(self.0); + } + + } +} + + +use tokio::prelude::*; +use futures::Future; +impl AsyncWrite for Pipe +{ + #[inline] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> + { + let future = async { + self.tx.flush().await + }; + tokio::pin!(future); + future.poll(cx) + } + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> + { + + let future = async { + self.tx.shutdown().await + }; + tokio::pin!(future); + future.poll(cx) + } + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> + { + let future = async { + self.tx.write(buf).await + }; + tokio::pin!(future); + future.poll(cx) + } +} + +impl AsyncRead for Pipe +{ + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> + { + let future = async { + self.rx.read(buf).await + }; + tokio::pin!(future); + future.poll(cx) + } +}