From b3430ca19da24a9c775de1e451b829e3456e39db Mon Sep 17 00:00:00 2001 From: Avril Date: Thu, 6 Aug 2020 21:17:38 +0100 Subject: [PATCH] fork::detach_closure() is now async safe --- src/log/custom/global_logfile.rs | 2 +- src/main.rs | 45 +++++++--- src/sys/fork.rs | 136 +++++++++++++++++++++---------- 3 files changed, 129 insertions(+), 54 deletions(-) diff --git a/src/log/custom/global_logfile.rs b/src/log/custom/global_logfile.rs index ae0e90b..dfa153f 100644 --- a/src/log/custom/global_logfile.rs +++ b/src/log/custom/global_logfile.rs @@ -134,7 +134,7 @@ impl Hook for LogFileHook internal.fix_perms(&mut file); - if let Err(_err) = write_log(&mut file, command).awmait { + if let Err(_err) = write_log(&mut file, command).await { //crate::warn!("Failed writing to logfile {:?}: {}", path, err); } } diff --git a/src/main.rs b/src/main.rs index be97e59..6ba2757 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,20 +75,41 @@ async fn main() -> Result<(), Box> { log::init(log::Level::Debug); - let child = sys::fork::detach_closure(None, None, |parent| { - - println!("Parent: {:?}, us: {}", parent, sys::get_pid()); - //let pipe = parent.pipe(); - //use tokio::prelude::*; - //pipe.write_all(b"Hello there").await.expect("io error c"); - //tokio::time::delay_for(tokio::time::Duration::from_secs(3)).await; - - //std::thread::sleep_ms(3000); - - println!("Exiting child"); - }).await?; + let mut child = sys::fork::detach_closure(None, None, |mut parent| { + async move { + println!("Parent: {:?}, us: {}", parent, sys::get_pid()); + let pipe = parent.pipe(); + use tokio::prelude::*; + pipe.write_all(b"Hello there").await.expect("io error c"); + let fut1 = async { + tokio::time::delay_for(tokio::time::Duration::from_secs(4)).await; + }; + let fut2 = async { + let mut buf = [0u8; 7]; + pipe.read_exact(&mut buf[..]).await.expect("Child read failed"); + println!("Child buf read"); + buf + }; + tokio::pin!(fut1); + tokio::pin!(fut2); + let (_, buf) = tokio::join!(fut1, fut2); + //std::thread::sleep_ms(3000); + + println!("Exiting child with msg: {}", std::str::from_utf8(&buf[..]).expect("C Corruption")); + } + })?.await??; println!("Child: {:?}", child); + { + let pipe = child.pipe(); + let mut buf = [0u8; 11]; + use tokio::prelude::*; + pipe.read_exact(&mut buf[..]).await.expect("Child read failed"); + + println!("READ!!!: {}", std::str::from_utf8(&buf[..]).expect("Corruption")); + tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await; + pipe.write_all(b"Also hi").await.expect("Child write failed"); + } println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string())); // end test diff --git a/src/sys/fork.rs b/src/sys/fork.rs index fa4d317..0811dbe 100644 --- a/src/sys/fork.rs +++ b/src/sys/fork.rs @@ -8,6 +8,10 @@ use libc::{ setgid, setuid, }; +use tokio::{ + runtime::{Runtime, Handle}, + task::JoinHandle, +}; use std::{ fmt, }; @@ -120,66 +124,116 @@ pub struct Parent { comm: Pipe, } +pub struct Fork(Option>>, Errno>>); + +impl Future for Fork +{ + type Output = Result>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll + { + let res = std::mem::replace(&mut self.0, None).unwrap(); + match res { + Ok(handle) => { + println!("Spawn ok"); + let future = async move { + println!("r"); + let uh = handle.await.unwrap_or(Err(Errno::new(Error::Unknown))); +println!("What?"); + uh + }; + tokio::pin!(future); + future.poll(cx) + }, + Err(err) => Poll::Ready(Err(err)) + } + } +} + /// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. /// +/// The signature is the same as +/// +/// ``` +/// async fn detach_closure Fu + Send + 'static, Fu: Future>(as_uid: Option, as_gid: Option, into: F) -> Result> +/// ``` +/// /// # Notes -/// This seems to corrupt the async runtime for the child, do not use it from the child. -pub async fn detach_closure(as_uid: Option, as_gid: Option, into: F) -> Result> { +/// This function spawns a speperate async runtime and blocks on it in the Child. Async/await should be safe here. +#[inline] pub fn detach_closure_test Fu + Send + 'static, Fu: Future>(as_uid: Option, as_gid: Option, into: F) -> Fork +{ + //this hangs? + + //Fork(Some(detach_closure_internal(as_uid, as_gid, into))) + todo!() +} + +/// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. +/// +/// # Notes +/// This function spawns a speperate async runtime and blocks on it in the Child. Async/await should be safe here. +pub fn detach_closure Fu + Send + 'static, Fu: Future>(as_uid: Option, as_gid: Option, into: F) -> Result>>, Errno> { // let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?; let (mut ttx,mut trx) = pipe::multi().map_err(|x| Error::from(x))?; let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?; + let child = unsafe{fork()}; if child == 0 { // Is child - - use tokio::prelude::*; - let complete = || { - async { - unsafe { - if let Ok(_) = ttx.write_u32(!0u32).await - { - into(Parent{pid: libc::getppid(),comm:comm_c}); + let _ =std::thread::spawn(move || { //Any unwind will be caught here + let mut rt = Runtime::new().unwrap_or_else(|_| std::process::exit(1)); //Immediately exit if runtime cannot be created. + rt.block_on(async move { + + use tokio::prelude::*; + let complete = || { + async { + unsafe { + if let Ok(_) = ttx.write_u32(!0u32).await + { + into(Parent{pid: libc::getppid(),comm:comm_c}).await; + } + } } - } - } - }; + }; - unsafe { - loop { - if let Some(as_uid) = as_uid { - // Set UID - if setuid(as_uid) != 0 { - let _ = ttx.write_u32(Error::SetUid.into()).await; - break; - } - } - if let Some(as_gid) = as_gid { - // Set GID - if setgid(as_gid) != 0 { - let _ = ttx.write_u32(Error::SetGid.into()).await; + unsafe { + loop { + if let Some(as_uid) = as_uid { + // Set UID + if setuid(as_uid) != 0 { + let _ = ttx.write_u32(Error::SetUid.into()).await; + break; + } + } + if let Some(as_gid) = as_gid { + // Set GID + if setgid(as_gid) != 0 { + let _ = ttx.write_u32(Error::SetGid.into()).await; + break; + } + } + + complete().await; break; } } - - complete().await; - break; - } - } + }); + }).join(); std::process::exit(0); } else if child > 0 { // Fork succeeded - use tokio::prelude::*; - - let err: u32 = unsafe{ - timeout!(trx.read_u32(), tokio::time::Duration::from_secs(1)).unwrap_or(Ok(Error::Unknown as u32)).unwrap_or(Error::Unknown as u32) - }; - if err == !0u32 { - Ok(Child{pid:child, comm: comm_p}) - } else { - Err(Error::from(err).into()) - } + Ok(tokio::spawn(async move { + use tokio::prelude::*; + let err: u32 = unsafe { + timeout!(trx.read_u32(), tokio::time::Duration::from_secs(1)).unwrap_or(Ok(Error::Unknown as u32)).unwrap_or(Error::Unknown as u32) + }; + if err == !0u32 { + Ok(Child{pid:child, comm: comm_p}) + } else { + Err(Error::from(err).into()) + } + })) } else { let rval = Error::Fork.into(); Err(rval)