Attempted to add new feature flag: `memfile-size-output`: Which will pre- or post- set the size of `stdout` to the correct buffer size, allowing consumers of `collect`"s stdout pipe to know the size just like a file. However, `ftruncate()` always fails on stdout. Before re-enabling the feature, we must find out how to set the size of a pipe, if/when you even can, and what syscall(s) you need to do it with.

Fortune for collect's current commit: Small blessing − 小吉
exec
Avril 3 years ago
parent b2bf26f245
commit bc2357c6b6
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -48,6 +48,14 @@ memfile = ["bitflags", "lazy_static"]
# *NOTE*: Requires `getpagesz()` to be available in libc. # *NOTE*: Requires `getpagesz()` to be available in libc.
memfile-preallocate = ["memfile"] memfile-preallocate = ["memfile"]
# Set the size of `stdout` when it is known, so consumers can know exactly the size of the input.
# XXX: Currently doesn't work. TODO: Find out how to make `stdout` `ftruncate()`-able; or some other way to set its size.
memfile-size-output = ["memfile"]
# Pre-set the `memfile-size-output`.
# TODO: Maybe make this a seperate feature? See comment about pre-setting in `work::memfd()`...
# memfile-size-output-preset = ["memfile-size-output"]
# Use jemalloc instead of system malloc. # Use jemalloc instead of system malloc.
# #
# Decreases memory-handling function calls, resulting in less "used" memory and faster allocation speeds at the "cost" of mapping a huge amount of virtual memory. # Decreases memory-handling function calls, resulting in less "used" memory and faster allocation speeds at the "cost" of mapping a huge amount of virtual memory.

@ -1,5 +1,13 @@
//! Extensions //! Extensions
use super::*; use super::*;
use std::{
mem::{
self,
ManuallyDrop,
},
marker::PhantomData,
ops,
};
/// Essentially equivelant bound as `eyre::StdError` (private trait) /// Essentially equivelant bound as `eyre::StdError` (private trait)
/// ///
@ -108,3 +116,128 @@ impl<T, E: EyreError> FlattenEyreResult<T, E> for Result<Option<T>, E>
} }
} }
} }
#[derive(Debug)]
enum RunOnceInternal<F>
{
Live(ManuallyDrop<F>),
Dead,
}
impl<F: Clone> Clone for RunOnceInternal<F>
{
#[inline]
fn clone(&self) -> Self {
match self {
Self::Live(l) => Self::Live(l.clone()),
_ => Self::Dead
}
}
}
impl<F> RunOnceInternal<F>
{
/// Take `F` now, unless it doesn't need to be dropped.
///
/// # Returns
/// * if `!needs_drop::<F>()`, `None` is always returned.
/// * if `self` is `Dead`, `None` is returned.
/// * if `self` is `Live(f)`, `Some(f)` is returned, and `self` is set to `Dead`.
#[inline(always)]
fn take_now_for_drop(&mut self) -> Option<F>
{
if mem::needs_drop::<F>() {
self.take_now()
} else {
None
}
}
/// If `Live`, return the value inside and set to `Dead`.
/// Otherwise, return `None`.
#[inline(always)]
fn take_now(&mut self) -> Option<F>
{
if let Self::Live(live) = self {
let val = unsafe {
ManuallyDrop::take(live)
};
*self = Self::Dead;
Some(val)
} else {
None
}
}
}
impl<F> ops::Drop for RunOnceInternal<F>
{
#[inline]
fn drop(&mut self) {
if mem::needs_drop::<F>() {
if let Self::Live(func) = self {
unsafe { ManuallyDrop::drop(func) };
}
}
}
}
/// Holds a 0 argument closure that will only be ran *once*.
#[derive(Debug, Clone)]
pub struct RunOnce<F, T>(PhantomData<fn () -> T>, RunOnceInternal<F>);
unsafe impl<T, F> Send for RunOnce<F, T>
where F: FnOnce() -> T + Send {}
impl<F, T> RunOnce<F, T>
where F: FnOnce() -> T
{
pub const fn new(func: F) -> Self
{
Self(PhantomData, RunOnceInternal::Live(ManuallyDrop::new(func)))
}
pub const fn never() -> Self
{
Self(PhantomData, RunOnceInternal::Dead)
}
#[inline]
pub fn try_take(&mut self) -> Option<F>
{
match &mut self.1 {
RunOnceInternal::Live(func) => {
Some(unsafe { ManuallyDrop::take(func) })
},
_ => None
}
}
#[inline]
pub fn try_run(&mut self) -> Option<T>
{
self.try_take().map(|func| func())
}
#[inline]
pub fn run(mut self) -> T
{
self.try_run().expect("Function has already been consumed")
}
#[inline]
pub fn take(mut self) -> F
{
self.try_take().expect("Function has already been consumed")
}
#[inline]
pub fn is_runnable(&self) -> bool
{
if let RunOnceInternal::Dead = &self.1 {
false
} else {
true
}
}
}

@ -201,12 +201,135 @@ mod work {
.unwrap_or_else(|e| format!("<unknown: {e}>")) .unwrap_or_else(|e| format!("<unknown: {e}>"))
} }
#[cfg_attr(feature="logging", instrument(skip_all, err, fields(i = ?i.as_raw_fd())))]
#[inline]
fn truncate_file<S>(i: impl AsRawFd, to: S) -> eyre::Result<()>
where S: TryInto<u64>,
<S as TryInto<u64>>::Error: EyreError
{
truncate_file_raw(i, to.try_into().wrap_err(eyre!("Size too large"))?)?;
Ok(())
}
fn truncate_file_raw(i: impl AsRawFd, to: impl Into<u64>) -> io::Result<()>
{
use libc::ftruncate;
let fd = i.as_raw_fd();
let to = {
let to = to.into();
#[cfg(feature="logging")]
let span_size_chk = debug_span!("chk_size", size = ?to);
#[cfg(feature="logging")]
let _span = span_size_chk.enter();
if_trace!{
if to > i64::MAX as u64 {
error!("Size too large (over max by {}) (max {})", to - (i64::MAX as u64), i64::MAX);
} else {
trace!("Setting {fd} size to {to}");
}
}
if cfg!(debug_assertions) {
i64::try_from(to).map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Size too large for ftruncate() offset"))?
} else {
to as i64
}
};
match unsafe { ftruncate(fd, to) } {
-1 => Err(io::Error::last_os_error()),
_ => Ok(())
}
}
//TODO: How to `ftruncate()` stdout only once... If try_get_size succeeds, we want to do it then. If it doesn't, we want to do it when `stdin` as been consumed an we know the size of the memory-file... `RunOnce` won't work unless we can give it an argument....
#[allow(unused_mut)]
let mut set_stdout_len = {
cfg_if! {
if #[cfg(feature="memfile-size-output")] {
if_trace!(warn!("Feature `memfile-size-output` is not yet stable and will cause crash."));
const STDOUT: memfile::fd::RawFileDescriptor = unsafe { memfile::fd::RawFileDescriptor::new_unchecked(libc::STDOUT_FILENO) }; //TODO: Get this from `std::io::Stdout.as_raw_fd()` instead.
use std::sync::atomic::{self, AtomicUsize};
#[cfg(feature="logging")]
let span_ro = debug_span!("run_once", stdout = ?STDOUT);
static LEN_HOLDER: AtomicUsize = AtomicUsize::new(0);
let mut set_len = RunOnce::new(move || {
#[cfg(feature="logging")]
let _span = span_ro.enter();
let len = LEN_HOLDER.load(atomic::Ordering::Acquire);
if_trace!(debug!("Attempting single `ftruncate()` on `STDOUT_FILENO` -> {len}"));
truncate_file(STDOUT, len)
.wrap_err(eyre!("Failed to set length of stdout ({STDOUT}) to {len}"))
});
move |len: usize| {
#[cfg(feature="logging")]
let span_ssl = info_span!("set_stdout_len", len = ?len);
#[cfg(feature="logging")]
let _span = span_ssl.enter();
if_trace!(trace!("Setting static-stored len for RunOnce"));
LEN_HOLDER.store(len, atomic::Ordering::Release);
if_trace!(trace!("Calling RunOnce for `set_stdout_len`"));
match set_len.try_run() {
Some(result) => result
.with_section(|| len.header("Attempted length set was"))
.with_warning(|| libc::off_t::MAX.header("Max length is"))
.with_note(|| STDOUT.header("STDOUT_FILENO is")),
None => {
if_trace!(warn!("Already called `set_stdout_len()`"));
Ok(())
},
}
}
} else {
|len: usize| -> Result<(), std::convert::Infallible> {
#[cfg(feature="logging")]
let span_ssl = info_span!("set_stdout_len", len = ?len);
#[cfg(feature="logging")]
let _span = span_ssl.enter();
if_trace!(info!("Feature `memfile-size-output` is disabled; ignoring."));
let _ = len;
Ok(())
}
}
}
};
let (mut file, read) = { let (mut file, read) = {
let stdin = io::stdin(); let stdin = io::stdin();
let buffsz = try_get_size(&stdin); let buffsz = try_get_size(&stdin);
if_trace!(debug!("Attempted determining input size: {:?}", buffsz)); if_trace!(debug!("Attempted determining input size: {:?}", buffsz));
let buffsz = buffsz.or_else(DEFAULT_BUFFER_SIZE); let buffsz = if cfg!(feature="memfile-size-output") {
//TODO: XXX: Even if this actually works, is it safe to do this? Won't the consumer try to read `value` bytes before we've written them? Perhaps remove pre-setting entirely...
match buffsz {
y @ Some(ref value) => {
let value = value.get();
set_stdout_len(value).wrap_err("Failed to set stdout len to that of stdin")
.with_section(|| value.header("Stdin len was calculated as"))
.with_warning(|| "This is a pre-setting")?;
y
},
n => n,
}
} else { buffsz }.or_else(DEFAULT_BUFFER_SIZE);
if_trace!(if let Some(buf) = buffsz.as_ref() { if_trace!(if let Some(buf) = buffsz.as_ref() {
trace!("Failed to determine input size: preallocating to {}", buf); trace!("Failed to determine input size: preallocating to {}", buf);
} else { } else {
@ -305,6 +428,11 @@ mod work {
}; };
if_trace!(info!("collected {} from stdin. starting write.", read)); if_trace!(info!("collected {} from stdin. starting write.", read));
// TODO: XXX: Currently causes crash. But if we can get this to work, leaving this in is definitely safe (as opposed to the pre-setting (see above.))
set_stdout_len(read)
.wrap_err(eyre!("Failed to `ftruncate()` stdout after collection of {read} bytes"))
.with_note(|| "Was not pre-set")?;
let written = let written =
io::copy(&mut file, &mut io::stdout().lock()) io::copy(&mut file, &mut io::stdout().lock())
.with_section(|| read.header("Bytes read from stdin")) .with_section(|| read.header("Bytes read from stdin"))

@ -109,10 +109,19 @@ impl fmt::Display for BadFDError
pub type FileNo = RawFd; pub type FileNo = RawFd;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)] #[repr(transparent)]
pub struct RawFileDescriptor(NonNegativeI32); pub struct RawFileDescriptor(NonNegativeI32);
impl fmt::Debug for RawFileDescriptor
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "RawFileDescriptor({})", self.0.get())
}
}
impl RawFileDescriptor impl RawFileDescriptor
{ {
pub const STDIN: Self = Self(unsafe { NonNegativeI32::new_unchecked(0) }); pub const STDIN: Self = Self(unsafe { NonNegativeI32::new_unchecked(0) });

Loading…
Cancel
Save