diff --git a/Cargo.toml b/Cargo.toml index 130e914..88960dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,8 @@ threaded = ["tokio/rt-threaded"] debug_logger = [] # Hot-reload config files -watcher = ["threaded"] +# When `threaded` is off, this should spawn in a child process with `fork`. +watcher = [] # FS watcher will have infinite backlog, instead of ignoring if it goes over its backlog. # This can help DoS, but potentially cause OOM. diff --git a/TODO b/TODO index 1418aee..20797f7 100644 --- a/TODO +++ b/TODO @@ -2,3 +2,5 @@ Log filtering rules in config file. For `Trace`s, `Level`s, sublevels, etc. Log normalise source path in terminal print Log have #[cfg(feature="threaded")] for deferring write to background worker + +Warning: I dunno how safe calling fork() in an async context is... Which is currently what we're doing. Make it return an actual future instead, and do no awaiting on the child. Set pipe to block for child, but not for parent, etc. \ No newline at end of file diff --git a/src/hot.rs b/src/hot.rs index d7f92b8..a745e3e 100644 --- a/src/hot.rs +++ b/src/hot.rs @@ -267,8 +267,9 @@ pub fn watch(path: impl AsRef) -> Oneesan Ok(event) => if let Err(err) = { #[cfg(feature="watcher_unlimited")] {tx.send(event)} #[cfg(not(feature="watcher_unlimited"))] {tx.try_send(event)} - } - {warn!("Watcher failed to pass event: {}", err)}, + } { + warn!("Watcher failed to pass event: {}", err); + }, Err(e) => error!("Watcher returned error: {}", e), } }).expect("Failed to initialise watcher"); diff --git a/src/log/custom/.#global_logfile.rs b/src/log/custom/.#global_logfile.rs new file mode 120000 index 0000000..cd5bf53 --- /dev/null +++ b/src/log/custom/.#global_logfile.rs @@ -0,0 +1 @@ +avril@flan-laptop.58785:1596643711 \ No newline at end of file diff --git a/src/log/custom/global_logfile.rs b/src/log/custom/global_logfile.rs index 40bd0b1..dfa153f 100644 --- a/src/log/custom/global_logfile.rs +++ b/src/log/custom/global_logfile.rs @@ -126,16 +126,16 @@ impl Hook for LogFileHook .write(true) .open(path).await { Ok(file) => file, - Err(err) => { - crate::warn!("Could not open logfile {:?} for writing: {}", path, err); + Err(_err) => { + //crate::warn!("Could not open logfile {:?} for writing: {}", path, err); continue; }, }; internal.fix_perms(&mut file); - if let Err(err) = write_log(&mut file, command).await { - crate::warn!("Failed writing to logfile {:?}: {}", path, err); + if let Err(_err) = write_log(&mut file, command).await { + //crate::warn!("Failed writing to logfile {:?}: {}", path, err); } } }); diff --git a/src/main.rs b/src/main.rs index 553d630..be97e59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,8 @@ mod hot; mod context; mod job; +mod stat; + //This is a test function, when we have a real job server, remove it. async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Box> { @@ -66,58 +68,7 @@ async fn do_thing_every() -> Result<(mpsc::Sender<()>, task::JoinHandle<()>), Bo Ok((tx, handle)) } -fn print_stats() -{ - use recolored::Colorize; - lazy_static! { - static ref AUTHORS: String = env!("CARGO_PKG_AUTHORS").replace( ":", ", "); - - }; - - #[cfg(debug_assertions)] - lazy_static!{ - static ref BUILD_IDENT: recolored::ColoredString = "debug".bright_blue(); - } - #[cfg(not(debug_assertions))] - lazy_static!{ - static ref BUILD_IDENT: recolored::ColoredString = "release".bright_red(); - } - - #[allow(unused_imports)] - use std::ops::Deref; - status!("This is the lolicron daemon version {} by {} ({} build)", env!("CARGO_PKG_VERSION"), &AUTHORS[..], BUILD_IDENT.deref()); - status!("---"); - status!("Compiled with ({} ({}), {} ({})):", "on".bright_red(), "default".red(), "off".bright_blue(), "default".blue()); - - #[cfg(nightly)] status!(" +nightly".bright_red()); - #[cfg(debug_assertions)] status!(" +debug_assertions".red()); - - status!("features:"); - - #[cfg(feature="threaded")] status!(" +threaded".red()); - #[cfg(not(feature="threaded"))] status!(" -threaded".bright_blue()); - - #[cfg(feature="watcher")] status!(" +watcher".red()); - #[cfg(not(feature="watcher"))] status!(" -watcher".bright_blue()); - - #[cfg(feature="debug_logger")] status!(" +debug_logger".red()); - #[cfg(not(feature="debug_logger"))] status!(" -debug_logger".bright_blue()); - - #[cfg(feature="watcher_unlimited")] status!(" +watcher_unlimited".bright_red()); - #[cfg(not(feature="watcher_unlimited"))] status!(" -watcher_unlimited".blue()); - - #[cfg(feature="watcher_timeout")] status!(" +watcher_timeout".red()); - #[cfg(not(feature="watcher_timeout"))] status!(" -watcher_timeout".bright_blue()); - - status!(""); - config::build::stat(); - - status!("GPl'd with <3"); - status!("Please enjoy"); - - status!("---"); -} #[tokio::main] async fn main() -> Result<(), Box> { @@ -125,23 +76,28 @@ async fn main() -> Result<(), Box> { log::init(log::Level::Debug); let child = sys::fork::detach_closure(None, None, |parent| { + println!("Parent: {:?}, us: {}", parent, sys::get_pid()); - std::thread::sleep_ms(3000); + //let pipe = parent.pipe(); + //use tokio::prelude::*; + //pipe.write_all(b"Hello there").await.expect("io error c"); + //tokio::time::delay_for(tokio::time::Duration::from_secs(3)).await; + + //std::thread::sleep_ms(3000); + println!("Exiting child"); }).await?; println!("Child: {:?}", child); - //println!("Waitpid: {:?}", child.wait().await.map_err(|x| x.to_string())); println!("Waitpid nb {:?}", child.await.map_err(|x| x.to_string())); // end test log::Logger::global().add_hook(log::custom::LogFileHook::new(log::Level::Debug, "test.log", "test.err.log", false)); debug!("Logger initialised"); //TODO: Parse config first - print_stats(); - - //debug!("{:?}",sys::user::get_users()); + stat::print_stats(); + //debug!("{:?}",sys::user::get_users()); #[cfg(feature="watcher")] { let oneesan = hot::watch("."); diff --git a/src/stat.rs b/src/stat.rs new file mode 100644 index 0000000..74c8813 --- /dev/null +++ b/src/stat.rs @@ -0,0 +1,56 @@ +//! Print splash shit + +use super::*; + +pub fn print_stats() +{ + use recolored::Colorize; + lazy_static! { + static ref AUTHORS: String = env!("CARGO_PKG_AUTHORS").replace( ":", ", "); + + }; + + #[cfg(debug_assertions)] + lazy_static!{ + static ref BUILD_IDENT: recolored::ColoredString = "debug".bright_blue(); + } + #[cfg(not(debug_assertions))] + lazy_static!{ + static ref BUILD_IDENT: recolored::ColoredString = "release".bright_red(); + } + + #[allow(unused_imports)] + use std::ops::Deref; + status!("This is the lolicron daemon version {} by {} ({} build)", env!("CARGO_PKG_VERSION"), &AUTHORS[..], BUILD_IDENT.deref()); + status!("---"); + status!("Compiled with ({} ({}), {} ({})):", "on".bright_red(), "default".red(), "off".bright_blue(), "default".blue()); + + #[cfg(nightly)] status!(" +nightly".bright_red()); + #[cfg(debug_assertions)] status!(" +debug_assertions".red()); + + status!("features:"); + + #[cfg(feature="threaded")] status!(" +threaded".red()); + #[cfg(not(feature="threaded"))] status!(" -threaded".bright_blue()); + + #[cfg(feature="watcher")] status!(" +watcher".red()); + #[cfg(not(feature="watcher"))] status!(" -watcher".bright_blue()); + + #[cfg(feature="debug_logger")] status!(" +debug_logger".red()); + #[cfg(not(feature="debug_logger"))] status!(" -debug_logger".bright_blue()); + + #[cfg(feature="watcher_unlimited")] status!(" +watcher_unlimited".bright_red()); + #[cfg(not(feature="watcher_unlimited"))] status!(" -watcher_unlimited".blue()); + + #[cfg(feature="watcher_timeout")] status!(" +watcher_timeout".red()); + #[cfg(not(feature="watcher_timeout"))] status!(" -watcher_timeout".bright_blue()); + + status!(""); + + config::build::stat(); + + status!("GPl'd with <3"); + status!("Please enjoy"); + + status!("---"); +} diff --git a/src/sys/fork.rs b/src/sys/fork.rs index 2dda10f..fa4d317 100644 --- a/src/sys/fork.rs +++ b/src/sys/fork.rs @@ -11,15 +11,9 @@ use libc::{ use std::{ fmt, }; -use crate::util::PhantomDrop; use cfg_if::cfg_if; -use super::pipe::{ - self, - unix_pipe, - pipe_read_value, - pipe_write_value, -}; +use super::pipe; /// Forking error #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] @@ -127,88 +121,59 @@ pub struct Parent { } /// Run a closure as a fork and then exit, optionally trying to set `uid` and `gid`. +/// +/// # Notes +/// This seems to corrupt the async runtime for the child, do not use it from the child. pub async fn detach_closure(as_uid: Option, as_gid: Option, into: F) -> Result> { - let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?; - + // let (rx, tx) = unix_pipe().map_inner(|x| Error::from(x))?; + let (mut ttx,mut trx) = pipe::multi().map_err(|x| Error::from(x))?; + let (comm_p, comm_c) = pipe::multi().map_err(|x| Error::from(x))?; let child = unsafe{fork()}; if child == 0 { // Is child - let complete = move || { - unsafe { - if let Ok(_) = pipe_write_value(tx, &!0u32) - { - //if let Ok(mut rt) = new_runtime() { - // rt.block_on(async move { - into(Parent{pid: libc::getppid(),comm:comm_c}); - // }); - // } + use tokio::prelude::*; + let complete = || { + async { + unsafe { + if let Ok(_) = ttx.write_u32(!0u32).await + { + into(Parent{pid: libc::getppid(),comm:comm_c}); + } } } }; - let complete_err = move |err: Error| { - unsafe{let _ = pipe_write_value(tx, &u32::from(err));} - }; - - { - let _guard = PhantomDrop::new((), move |_| { - unsafe { - libc::close(tx); - } - });// Close pipe sender when we're done here - - unsafe { - loop { - if let Some(as_uid) = as_uid { - // Set UID - if setuid(as_uid) != 0 { - complete_err(Error::SetUid); - break; - } + unsafe { + loop { + if let Some(as_uid) = as_uid { + // Set UID + if setuid(as_uid) != 0 { + let _ = ttx.write_u32(Error::SetUid.into()).await; + break; } - if let Some(as_gid) = as_gid { - // Set GID - if setgid(as_gid) != 0 { - complete_err(Error::SetGid); - break; - } + } + if let Some(as_gid) = as_gid { + // Set GID + if setgid(as_gid) != 0 { + let _ = ttx.write_u32(Error::SetGid.into()).await; + break; } - - complete(); - break; } + + complete().await; + break; } } std::process::exit(0); } else if child > 0 { // Fork succeeded - let _guard = PhantomDrop::new((), move |_| { - unsafe { - libc::close(rx); - } - }); - + use tokio::prelude::*; let err: u32 = unsafe{ - cfg_if! { - if #[cfg(feature="threaded")] { - use tokio::{ - task, - }; - let waiter = task::spawn_blocking(move || { - pipe_read_value(rx).map_inner(|e| Error::from(e)) - }); - - waiter.await.expect("Panic while waiting for child status")? - } else { - unsafe { - pipe_read_value(rx)? - } - } - } + timeout!(trx.read_u32(), tokio::time::Duration::from_secs(1)).unwrap_or(Ok(Error::Unknown as u32)).unwrap_or(Error::Unknown as u32) }; if err == !0u32 { Ok(Child{pid:child, comm: comm_p}) @@ -217,10 +182,6 @@ pub async fn detach_closure(as_uid: Option, as_gid: Opti } } else { let rval = Error::Fork.into(); - unsafe { - libc::close(tx); - libc::close(rx); - } Err(rval) } } @@ -250,8 +211,9 @@ impl Child }); waiter.await.expect("Waiter panicked") } else { + let pid = self.pid; let mut status: i32 = 0; - if unsafe{libc::waitpid(self.pid, &mut status as *mut i32, 1)} == pid { // We can't afford to block here + if unsafe{libc::waitpid(pid, &mut status as *mut i32, 1)} == pid { // We can't afford to block here Ok(status) } else { Err(Error::WaitPid.into()) diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 807d4d2..b960b3e 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -10,6 +10,7 @@ pub mod pipe; pub mod fork; pub mod errno; +#[allow(unused_imports)] use errno::ResultExt; /// Get pid of current process diff --git a/src/sys/pipe.rs b/src/sys/pipe.rs index 880b346..5123e8f 100644 --- a/src/sys/pipe.rs +++ b/src/sys/pipe.rs @@ -237,89 +237,99 @@ impl AsyncWrite for WriteHalf // as far as i can tell this is a no-op with `write()`? Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> + #[inline] fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - let fd = self.0; - if let Poll::Ready(res) = self.poll_flush(cx) { - unsafe{libc::close(fd)}; - Poll::Ready(res) - } else { - Poll::Pending + unsafe { + libc::close(self.0); } + + Poll::Ready(Ok(())) } - fn poll_write(self: Pin<&mut Self>, _cx: &mut Context, buf: &[u8]) -> Poll> + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { use libc::{ poll, write, pollfd, + c_void, }; - let mut fd = pollfd { - fd: self.0, - revents: 0, - events: POLL_OUT, - }; - let poll = unsafe { - poll(&mut fd as *mut pollfd, 1, 0) - }; - - Poll::Ready(if poll < 0 { - Err(AsyncError::from_raw_os_error(errno::raw())) - } else if poll > 0 { - if fd.revents & POLL_OUT == POLL_OUT { - let wr = unsafe { - write(self.0, &buf[0] as *const u8 as *const libc::c_void, buf.len()) + use tokio::task::yield_now; + + let future = async { + loop { + let mut fd = pollfd { + fd: self.0, + revents: 0, + events: POLL_OUT, }; - if wr < 0 { - Err(AsyncError::from_raw_os_error(errno::raw())) - } else { - Ok(wr as usize) + let poll = unsafe { + poll(&mut fd as *mut pollfd, 1, 0) + }; + if poll < 0 { + break Err(AsyncError::from_raw_os_error(errno::raw())); + } else if poll > 0 { + if fd.revents & POLL_OUT == POLL_OUT { + // Write ready + let wr = unsafe{write(self.0, &buf[0] as *const u8 as *const c_void, buf.len())}; + if wr < 0 { + break Err(AsyncError::from_raw_os_error(errno::raw())); + } + else { + break Ok(wr as usize); + } + } } - } else { - Err(AsyncError::from_raw_os_error(errno::raw())) + // Either no poll, or no POLLOUT event + yield_now().await; } - } else { - return Poll::Pending; - }) + }; + tokio::pin!(future); + future.poll(cx) } } impl AsyncRead for ReadHalf { - fn poll_read(self: Pin<&mut Self>, _cx: &mut Context, buf: &mut [u8]) -> Poll> + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { use libc::{ poll, read, pollfd, + c_void, }; - let mut fd = pollfd { - fd: self.0, - revents: 0, - events: POLL_IN, - }; - let poll = unsafe { - poll(&mut fd as *mut pollfd, 1, 0) - }; - - Poll::Ready(if poll < 0 { - Err(AsyncError::from_raw_os_error(errno::raw())) - } else if poll > 0 { - if fd.revents & POLL_IN == POLL_IN { - let wr = unsafe { - read(self.0, &mut buf[0] as *mut u8 as *mut libc::c_void, buf.len()) + use tokio::task::yield_now; + + let future = async { + loop { + let mut fd = pollfd { + fd: self.0, + revents: 0, + events: POLL_IN, + }; + let poll = unsafe { + poll(&mut fd as *mut pollfd, 1, 0) }; - if wr < 0 { - Err(AsyncError::from_raw_os_error(errno::raw())) - } else { - Ok(wr as usize) + if poll < 0 { + break Err(AsyncError::from_raw_os_error(errno::raw())); + } else if poll > 0 { + if fd.revents & POLL_IN == POLL_IN { + // Read ready + let wr = unsafe{read(self.0, &mut buf[0] as *mut u8 as *mut c_void, buf.len())}; + if wr < 0 { + break Err(AsyncError::from_raw_os_error(errno::raw())); + } + else { + break Ok(wr as usize); + } + } } - } else { - Err(AsyncError::from_raw_os_error(errno::raw())) + // Either no poll, or no POLLIN event + yield_now().await; } - } else { - return Poll::Pending; - }) + }; + tokio::pin!(future); + future.poll(cx) } } diff --git a/src/util/macros.rs b/src/util/macros.rs index c5be1d8..eeb4726 100644 --- a/src/util/macros.rs +++ b/src/util/macros.rs @@ -3,3 +3,49 @@ const _: &[bool; ((($ex) == true) as usize)] = &[true]; } } + +#[macro_export] macro_rules! timeout { + ($fut:expr, $dur:expr) => { + { + let dur = $dur; + tokio::select! { + output = $fut => { + Ok(output) + } + _ = tokio::time::delay_for(dur) => { + Err($crate::util::TimeoutError::from(dur)) + } + } + } + } +} + + +/// Returned from timeout macro +#[derive(Debug)] +pub struct TimeoutError(tokio::time::Duration); +impl std::error::Error for TimeoutError{} +impl std::fmt::Display for TimeoutError +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result + { + write!(f, "timeout of {} ms reached", self.0.as_millis()) + } +} +impl From for TimeoutError +{ + fn from(from: tokio::time::Duration) -> Self + { + TimeoutError(from) + } +} +impl TimeoutError +{ + /// Get the timeout that this error lapsed on + #[inline] pub fn timeout(&self) -> &tokio::time::Duration + { + &self.0 + } +} + +