fork::detach_closure() is now async safe

master
Avril 4 years ago
parent 91c897d995
commit b3430ca19d
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -134,7 +134,7 @@ impl Hook for LogFileHook
internal.fix_perms(&mut file); internal.fix_perms(&mut file);
if let Err(_err) = write_log(&mut file, command).awmait { if let Err(_err) = write_log(&mut file, command).await {
//crate::warn!("Failed writing to logfile {:?}: {}", path, err); //crate::warn!("Failed writing to logfile {:?}: {}", path, err);
} }
} }

@ -75,20 +75,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::init(log::Level::Debug); log::init(log::Level::Debug);
let child = sys::fork::detach_closure(None, None, |parent| { let mut child = sys::fork::detach_closure(None, None, |mut parent| {
async move {
println!("Parent: {:?}, us: {}", parent, sys::get_pid()); println!("Parent: {:?}, us: {}", parent, sys::get_pid());
//let pipe = parent.pipe(); let pipe = parent.pipe();
//use tokio::prelude::*; use tokio::prelude::*;
//pipe.write_all(b"Hello there").await.expect("io error c"); pipe.write_all(b"Hello there").await.expect("io error c");
//tokio::time::delay_for(tokio::time::Duration::from_secs(3)).await; let fut1 = async {
tokio::time::delay_for(tokio::time::Duration::from_secs(4)).await;
//std::thread::sleep_ms(3000); };
let fut2 = async {
println!("Exiting child"); let mut buf = [0u8; 7];
}).await?; pipe.read_exact(&mut buf[..]).await.expect("Child read failed");
println!("Child buf read");
buf
};
tokio::pin!(fut1);
tokio::pin!(fut2);
let (_, buf) = tokio::join!(fut1, fut2);
//std::thread::sleep_ms(3000);
println!("Exiting child with msg: {}", std::str::from_utf8(&buf[..]).expect("C Corruption"));
}
})?.await??;
println!("Child: {:?}", child); println!("Child: {:?}", child);
{
let pipe = child.pipe();
let mut buf = [0u8; 11];
use tokio::prelude::*;
pipe.read_exact(&mut buf[..]).await.expect("Child read failed");
println!("READ!!!: {}", std::str::from_utf8(&buf[..]).expect("Corruption"));
tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
pipe.write_all(b"Also hi").await.expect("Child write failed");
}
println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string())); println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string()));
// end test // end test

@ -8,6 +8,10 @@ use libc::{
setgid, setgid,
setuid, setuid,
}; };
use tokio::{
runtime::{Runtime, Handle},
task::JoinHandle,
};
use std::{ use std::{
fmt, fmt,
}; };
@ -120,66 +124,116 @@ pub struct Parent {
comm: Pipe, comm: Pipe,
} }
pub struct Fork(Option<Result<JoinHandle<Result<Child, Errno<Error>>>, Errno<Error>>>);
impl Future for Fork
{
type Output = Result<Child, Errno<Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
{
let res = std::mem::replace(&mut self.0, None).unwrap();
match res {
Ok(handle) => {
println!("Spawn ok");
let future = async move {
println!("r");
let uh = handle.await.unwrap_or(Err(Errno::new(Error::Unknown)));
println!("What?");
uh
};
tokio::pin!(future);
future.poll(cx)
},
Err(err) => Poll::Ready(Err(err))
}
}
}
/// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`.
///
/// The signature is the same as
///
/// ```
/// async fn detach_closure<F: FnOnce(Parent) -> Fu + Send + 'static, Fu: Future>(as_uid: Option<u32>, as_gid: Option<u32>, into: F) -> Result<Child, Errno<Error>>
/// ```
///
/// # Notes
/// This function spawns a speperate async runtime and blocks on it in the Child. Async/await should be safe here.
#[inline] pub fn detach_closure_test<F: FnOnce(Parent) -> Fu + Send + 'static, Fu: Future>(as_uid: Option<u32>, as_gid: Option<u32>, into: F) -> Fork
{
//this hangs?
//Fork(Some(detach_closure_internal(as_uid, as_gid, into)))
todo!()
}
/// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. /// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`.
/// ///
/// # Notes /// # Notes
/// This seems to corrupt the async runtime for the child, do not use it from the child. /// This function spawns a speperate async runtime and blocks on it in the Child. Async/await should be safe here.
pub async fn detach_closure<F: FnOnce(Parent)>(as_uid: Option<u32>, as_gid: Option<u32>, into: F) -> Result<Child, Errno<Error>> { pub fn detach_closure<F: FnOnce(Parent) -> Fu + Send + 'static, Fu: Future>(as_uid: Option<u32>, as_gid: Option<u32>, into: F) -> Result<JoinHandle<Result<Child, Errno<Error>>>, Errno<Error>> {
// let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?; // let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?;
let (mut ttx,mut trx) = pipe::multi().map_err(|x| Error::from(x))?; let (mut ttx,mut trx) = pipe::multi().map_err(|x| Error::from(x))?;
let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?; let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?;
let child = unsafe{fork()}; let child = unsafe{fork()};
if child == 0 { if child == 0 {
// Is child // Is child
let _ =std::thread::spawn(move || { //Any unwind will be caught here
use tokio::prelude::*; let mut rt = Runtime::new().unwrap_or_else(|_| std::process::exit(1)); //Immediately exit if runtime cannot be created.
let complete = || { rt.block_on(async move {
async {
unsafe { use tokio::prelude::*;
if let Ok(_) = ttx.write_u32(!0u32).await let complete = || {
{ async {
into(Parent{pid: libc::getppid(),comm:comm_c}); unsafe {
if let Ok(_) = ttx.write_u32(!0u32).await
{
into(Parent{pid: libc::getppid(),comm:comm_c}).await;
}
}
} }
} };
}
};
unsafe { unsafe {
loop { loop {
if let Some(as_uid) = as_uid { if let Some(as_uid) = as_uid {
// Set UID // Set UID
if setuid(as_uid) != 0 { if setuid(as_uid) != 0 {
let _ = ttx.write_u32(Error::SetUid.into()).await; let _ = ttx.write_u32(Error::SetUid.into()).await;
break; break;
} }
} }
if let Some(as_gid) = as_gid { if let Some(as_gid) = as_gid {
// Set GID // Set GID
if setgid(as_gid) != 0 { if setgid(as_gid) != 0 {
let _ = ttx.write_u32(Error::SetGid.into()).await; let _ = ttx.write_u32(Error::SetGid.into()).await;
break;
}
}
complete().await;
break; break;
} }
} }
});
complete().await; }).join();
break;
}
}
std::process::exit(0); std::process::exit(0);
} else if child > 0 { } else if child > 0 {
// Fork succeeded // Fork succeeded
use tokio::prelude::*; Ok(tokio::spawn(async move {
use tokio::prelude::*;
let err: u32 = unsafe{ let err: u32 = unsafe {
timeout!(trx.read_u32(), tokio::time::Duration::from_secs(1)).unwrap_or(Ok(Error::Unknown as u32)).unwrap_or(Error::Unknown as u32) timeout!(trx.read_u32(), tokio::time::Duration::from_secs(1)).unwrap_or(Ok(Error::Unknown as u32)).unwrap_or(Error::Unknown as u32)
}; };
if err == !0u32 { if err == !0u32 {
Ok(Child{pid:child, comm: comm_p}) Ok(Child{pid:child, comm: comm_p})
} else { } else {
Err(Error::from(err).into()) Err(Error::from(err).into())
} }
}))
} else { } else {
let rval = Error::Fork.into(); let rval = Error::Fork.into();
Err(rval) Err(rval)

Loading…
Cancel
Save