From 3d157e51d88d15554f2c191998cca728aa0db005 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 5 Aug 2020 18:36:48 +0100 Subject: [PATCH] fork::detach_closure; async waiting on child ok --- Cargo.lock | 10 ++ Cargo.toml | 3 +- src/main.rs | 10 ++ src/sys/fork.rs | 322 ++++++++++++++++++++++++++++++++++++++++++++++++ src/sys/mod.rs | 10 ++ src/sys/pipe.rs | 3 + 6 files changed, 357 insertions(+), 1 deletion(-) create mode 100644 src/sys/fork.rs create mode 100644 src/sys/pipe.rs diff --git a/Cargo.lock b/Cargo.lock index 9a19dc2..b8d78e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -395,6 +395,7 @@ dependencies = [ "generational-arena", "lazy_static", "libc", + "malloc-array", "mopa", "notify", "once_cell", @@ -405,6 +406,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "malloc-array" +version = "1.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f994770c7bb3f8f7db7c4160665fc8814c8c705c10ae59a3d7354f0b8838f5c" +dependencies = [ + "libc", +] + [[package]] name = "memchr" version = "2.3.3" diff --git a/Cargo.toml b/Cargo.toml index 9556ea4..130e914 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ watcher_unlimited = ["watcher"] watcher_timeout = ["watcher"] [dependencies] -tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "fs"]} +tokio = {version = "0.2", features=["time", "macros", "io-driver", "sync", "rt-core", "fs", "blocking"]} notify = "5.0.0-pre.3" futures= "0.3" sexp = "1.1" @@ -42,6 +42,7 @@ cfg-if = "0.1" generational-arena = "0.2" mopa = "0.2" readable-perms = "0.1" +malloc-array = "1.4" [build-dependencies] rustc_version = "0.2" diff --git a/src/main.rs b/src/main.rs index 541424a..4f1e572 100644 --- a/src/main.rs +++ b/src/main.rs @@ -122,6 +122,16 @@ fn print_stats() #[tokio::main] async fn main() -> Result<(), Box> { + let child = sys::fork::detach_closure(None, None, |parent| { + println!("Parent: {:?}, us: {}", parent, sys::get_pid()); + std::thread::sleep_ms(3000); + println!("Exiting child"); + }).await?; + + println!("Child: {:?}", child); + println!("Waitpid: {:?}", child.wait().await?); + // end test + log::init(log::Level::Debug); log::Logger::global().add_hook(log::custom::LogFileHook::new(log::Level::Debug, "test.log", "test.err.log", false)); diff --git a/src/sys/fork.rs b/src/sys/fork.rs new file mode 100644 index 0000000..59557fa --- /dev/null +++ b/src/sys/fork.rs @@ -0,0 +1,322 @@ +//! Forking utils + +use super::*; + +use libc::{ + fork, + setgid, + setuid, +}; +use std::{ + fmt, +}; +use crate::util::PhantomDrop; +use cfg_if::cfg_if; + +/// Forking error +#[derive(Debug)] +#[repr(u32)] +pub enum Error { + Fork = 1, + SetUid = 2, + SetGid = 3, + + Pipe = 4, + PipeRead = 5, + PipeWrite = 6, + PipeBroken = 7, + WaitPid = 8, + + Unknown = 0, +} + +impl Error +{ + #[inline] fn from_u32(from: u32) -> Self + { + use Error::*; + match from { + x if x == Fork as u32 => Fork, + x if x == SetUid as u32 => SetUid, + x if x == SetGid as u32 => SetGid, + x if x == Pipe as u32 => Pipe, + x if x == PipeRead as u32 => PipeRead, + x if x == PipeWrite as u32 => PipeWrite, + x if x == PipeBroken as u32 => PipeBroken, + x if x == WaitPid as u32 => WaitPid, + _ => Self::Unknown, + } + } +} + +impl From for Error +{ + #[inline] fn from(from: u32) -> Self + { + Self::from_u32(from) + } +} + +impl From for u32 +{ + #[inline] fn from(from: Error) -> Self + { + from as Self + } +} + +impl std::error::Error for Error{} +impl std::fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Fork => write!(f, "fork() failed"), + Self::Pipe => write!(f, "pipe() failed"), + Self::PipeBroken => write!(f, "broken pipe with child"), + Self::PipeRead => write!(f, "pipe read failed"), + Self::PipeWrite => write!(f, "pipe write failed"), + Self::SetGid => write!(f, "child reported setgid() failed"), + Self::SetUid => write!(f, "child reported setuid() failed"), + Self::WaitPid => write!(f, "waitpid() failed unexpectedly"), + _ => write!(f, "child reported unknown error"), + } + } +} + +/// Represents the detached child +#[derive(Debug)] +pub struct Child +{ + pid: i32, +} + +#[derive(Debug)] +pub struct Parent { + pid: i32, +} + +/// Create with `pipe()` +fn unix_pipe() -> Result<(i32,i32), Error> +{ + use libc::{ + pipe, + }; + let mut pipe_fd: [libc::c_int; 2] = [0;2]; + if unsafe{pipe(&mut pipe_fd[0] as *mut libc::c_int)} == 0{ + Ok((pipe_fd[0], pipe_fd[1])) + } else { + Err(Error::Pipe) + } +} + +fn pipe_write(output: i32, buf: impl AsRef<[u8]>) -> Result +{ + let buf = buf.as_ref(); + let len = buf.len(); + + let read = unsafe{libc::write(output, &buf[0] as *const u8 as *const libc::c_void, len)}; + + if read < 0 { + Err(Error::PipeWrite) + } else { + Ok(read as usize) + } +} + + +fn pipe_read(input: i32, mut buf: impl AsMut<[u8]>) -> Result +{ + let buf = buf.as_mut(); + let len = buf.len(); + + let read = unsafe{libc::read(input, &mut buf[0] as *mut u8 as *mut libc::c_void, len)}; + + if read < 0 { + Err(Error::PipeRead) + } else { + Ok(read as usize) + } +} + +unsafe fn pipe_write_value(fd: i32, value: &T) -> Result<(), Error> +{ + let sz = std::mem::size_of_val(value); + + let write = pipe_write(fd, std::slice::from_raw_parts(value as *const T as *const u8, sz))?; + if write == sz { + Ok(()) + } else { + Err(Error::PipeBroken) + } +} + +unsafe fn pipe_read_value(fd: i32) -> Result +{ + use malloc_array::heap; + + let mut buf = heap![unsafe 0u8; std::mem::size_of::()]; + + let read = pipe_read(fd, &mut buf)?; + + if read == buf.len() { + let ptr = buf.reinterpret::(); + Ok(ptr.as_ptr().read()) + } else { + Err(Error::PipeBroken) + } +} + +/// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. +/// +/// # Notes +/// This fork does not set up the async runtime. Using Tokio from the child process will require manually setting up the runtime +pub async fn detach_closure(as_uid: Option, as_gid: Option, into: F) -> Result { + + let (rx, tx) = unix_pipe()?; + + let child = unsafe{fork()}; + if child == 0 { + // Is child + + let complete = move || { + unsafe { + if let Ok(_) = pipe_write_value(tx, &!0u32) + { + into(Parent{pid: libc::getppid()}); + } + } + }; + + let complete_err = move |err: Error| { + unsafe{let _ = pipe_write_value(tx, &u32::from(err));} + }; + + { + let _guard = PhantomDrop::new((), move |_| { + unsafe { + libc::close(tx); + } + });// Close pipe sender when we're done here + + unsafe { + loop { + if let Some(as_uid) = as_uid { + // Set UID + if setuid(as_uid) != 0 { + complete_err(Error::SetUid); + break; + } + } + if let Some(as_gid) = as_gid { + // Set GID + if setgid(as_gid) != 0 { + complete_err(Error::SetGid); + break; + } + } + + complete(); + break; + } + } + } + std::process::exit(0); + } else if child > 0 { + // Fork succeeded + let _guard = PhantomDrop::new((), move |_| { + unsafe { + libc::close(rx); + } + }); + + + let err: u32 = unsafe{ + cfg_if! { + if #[cfg(feature="threaded")] { + use tokio::{ + task, + }; + let waiter = task::spawn_blocking(move || { + pipe_read_value(rx) + }); + + waiter.await.expect("Panic while waiting for child status")? + } else { + unsafe { + pipe_read_value(rx)? + } + } + } + }; + if err == !0u32 { + Ok(Child{pid:child}) + } else { + Err(err.into()) + } + } else { + unsafe { + libc::close(tx); + libc::close(rx); + } + Err(Error::Fork) + } +} + +impl Child +{ + /// Call `waitpid` on this child. Returns the status code if possible, or `Error` if error. + /// + /// # Notes + /// - When `threaded` feature is disabled, this function will not block at all, instead returning error if there is not status available, this is to prevent deadlock. + /// - This function (at present) will return `Error` when the child process exits, too. This is because we don't have `errno` access yet, I'm working on it. + pub async fn waitpid(&self) -> Result + { + cfg_if! { + if #[cfg(feature="threaded")] { + let pid = self.pid; + let waiter = tokio::task::spawn_blocking(move || { + let mut status: i32 = 0; + (unsafe{libc::waitpid(pid, &mut status as *mut i32, 0)}, status) + }); + if let (0, status) = waiter.await.expect("Waiter panicked") { + Ok(status) + } else { + Err(Error::WaitPid) + } + } else { + let mut status: i32 = 0; + if unsafe{libc::waitpid(self.pid, &mut status as *mut i32, 1)} == 0 { // We can't afford to block here + Ok(status) + } else { + Err(Error::WaitPid) + } + + } + } + } + + /// Wait for the child process to end, ignoring any other signals + /// + /// # Notes + /// - When `threaded` feature is disabled, this function will return immediately, this is an error-handling bug because we don't have `errno` access as of yet. + /// - This function will call `waitpid` untill it returns status `ECHILD` (child has exited). Other status values are ignored, if any. + pub async fn wait(&self) -> Result<(), Error> + { + loop { + match self.waitpid().await { + Ok(v) => { + // we don't want this, we want it to end + debug!("Got status {} from child {}, ignoring", v, self.pid); + }, + Err(e) => { + if let Error::WaitPid = e { + return Ok(()); + } else { + return Err(e); + } + } + } + } + } +} diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 8882a03..c17bddc 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -1,3 +1,13 @@ +//! For POSIX and system calls + use super::*; pub mod user; + +pub mod fork; + +/// Get pid of current process +#[inline] pub fn get_pid() -> i32 +{ + unsafe{libc::getpid()} +} diff --git a/src/sys/pipe.rs b/src/sys/pipe.rs new file mode 100644 index 0000000..ba6558f --- /dev/null +++ b/src/sys/pipe.rs @@ -0,0 +1,3 @@ +//! `pipe()` wrapper + +use super::*;