From a5ad02a6de7b18da2738dba49f385d9f9ec3c711 Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 8 Apr 2022 04:02:49 +0100 Subject: [PATCH] Working primitive, slow, prototype. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for collect's current commit: Small blessing − 小吉 --- .gitignore | 10 +- Cargo.lock | 57 +++++++++++ Cargo.toml | 33 ++++++ collect.c | 289 ---------------------------------------------------- src/main.rs | 64 ++++++++++++ 5 files changed, 163 insertions(+), 290 deletions(-) create mode 100644 Cargo.lock create mode 100644 Cargo.toml delete mode 100644 collect.c create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore index c30ad65..667042e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,9 @@ -collect +/target +*~ + +# Profiling +*.png +*.svg +perf.* +vgcore.* + diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..5d26709 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,57 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + +[[package]] +name = "collect" +version = "0.1.0" +dependencies = [ + "bytes", + "jemallocator", + "libc", +] + +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + +[[package]] +name = "jemalloc-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69" +dependencies = [ + "jemalloc-sys", + "libc", +] + +[[package]] +name = "libc" +version = "0.2.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..8cc6b51 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "collect" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["jemalloc"] + +# TODO: mmap, memfd_create() ver + +# TODO: make `bytes` optional, and use Vec by default instead? idk... + +# Use jemalloc instead of system malloc. +# Seems to reduce overall memory usage at the cost of a very small speed drop. +jemalloc = ["jemallocator"] + +[profile.release] +opt-level = 3 +lto = "fat" +codegen-units = 1 +strip=true + +[profile.symbols] +inherits="release" +#incremental=true +strip=false + +[dependencies] +bytes = "1.1.0" +jemallocator = { version = "0.3.2", optional = true } +libc = "0.2.122" diff --git a/collect.c b/collect.c deleted file mode 100644 index fb45138..0000000 --- a/collect.c +++ /dev/null @@ -1,289 +0,0 @@ -// Collect all stdin into memory, then -#define _GNU_SOURCE - -#include -#include -#include -#include - -#include -#include -#include -#include - -#include - -#define LIKELY(expr) __builtin_expect(!!(expr), true) -#define UNLIKELY(expr) __builtin_expect(!!(expr), false) - -#define _$_if__L true -#define _$_if__U false -#define $if(l, expr) if(__builtin_expect(!!(expr), _$_if__ ## l)) - -#define F_STDIN 0 -#define F_STDOUT 1 -#define F_STDERR 2 - -typedef union arguments { - struct { - off_t pages_per_buffer; - } sized; - struct { - size_t buffsz; - } unsized; -} option_t; - -#define DEFAULT_OPTION ((option_t){ .sized = { .pages_per_buffer = 8 } }) - -static bool has_size(int fd, off_t* restrict size) -{ - struct stat st; - if( fstat(fd, &st) < 0 ) { perror("failed to stat stdin"); return false; } - else if (st.st_size > 0) { - // Non-zero size - *size = st.st_size; - return true; - } - fprintf(stderr, "returned sz (fd %d): %ld\n", fd, st.st_size); - return false; -} - -int collect_sized(off_t sz, const option_t* opt); -int collect_unsized(const option_t* opt); - -int main(void) -{ - off_t sz; - option_t args = DEFAULT_OPTION; - if(has_size(F_STDIN, &sz)) { - return collect_sized((size_t)sz, &args); - } else { - return collect_unsized(&args); - } -} - - -inline static -const void* map_input_buffer(int fd, size_t sz) -{ - void* map = mmap(NULL, sz, PROT_READ, MAP_PRIVATE, fd, 0); - - if(UNLIKELY(map == MAP_FAILED)) { - perror("input mmap()"); - return NULL; - } - - return map; -} - -inline static -bool unmap_mem(void* mem, size_t len) -{ - if(UNLIKELY( munmap(mem, len) != 0 )) { - perror("munmap()"); - return false; - } - return true; -} - -static int page_size() -{ - static int _page_size=0; - if(UNLIKELY(!_page_size)) return _page_size = getpagesize(); - return _page_size; -} - -inline static -bool alloc_pages(off_t pages, int *restrict _fd, size_t* restrict _size) -{ - int fd = memfd_create("collect-sized-buffer", O_RDWR); - $if(U, fd < 0) goto _e_memfd; - $if(U, fallocate(fd, 0, 0, __builtin_constant_p(_size) && !_size - ? pages * page_size() - : _size ? (off_t)( *_size = pages * page_size() ) - : pages * page_size()) != 0) goto _e_fallocate; - $if(L, _fd) *_fd = fd; - else close(fd); - - return true; - // +Unwind+ // -_e_fallocate: - perror("fallocate()"); - close(fd); - if(0) -_e_memfd: - perror("memfd_create()"); - // -Unwind- // - return false; -} - -struct map_fd { - void* map; - size_t len; - int fd; -}; - -static -bool map_pages(off_t pages, struct map_fd* restrict out) -{ - $if(U, !out) return alloc_pages(pages, NULL, NULL); - - $if(U, !alloc_pages(pages, &out->fd, &out->len)) goto _e_ap; - $if(U, (out->map = mmap(NULL, out->len, PROT_READ|PROT_WRITE, MAP_PRIVATE, out->fd, 0)) == MAP_FAILED) goto _e_map; - $if(U, madvise(out->map, out->len, MADV_MERGEABLE | MADV_WILLNEED)) goto _e_madv; - - return true; - - // +Unwind+ // -_e_madv: - perror("madv()"); - munmap(out->map, out->len); - if(0) -_e_map: - perror("mmap()"); - close(out->fd); - if(0) -_e_ap: - (void)0; // no perror() needed - // -Unwind- // - return false; -} - -inline static -void unmap_pages(struct map_fd in, int *restrict keep_fd) -{ - $if(U, munmap(in.map, in.len)) perror("munmap()"); - if(__builtin_constant_p(keep_fd) && keep_fd) *keep_fd = in.fd; - else { - if(!keep_fd) { - $if(U, close(in.fd)) perror("close()"); - } else *keep_fd = in.fd; - } -} - -int collect_sized(off_t isz, const option_t* gopt) -{ - register int rc=0; - __auto_type opt = gopt->sized; - const off_t real_max_size = page_size() * opt.pages_per_buffer; -// const off_t pages_per_isz = isz % page_size(); -// const off_t page_leftover_isz = isz / page_size(); - - struct map_fd buffer; - if(!map_pages(opt.pages_per_buffer, &buffer)) return 1; - - if(isz > real_max_size) { - // Multiple buffers needed - } else $if(U, isz == real_max_size) { - // Exactly one buffer (unlikely, but possible) - ssize_t r = splice(F_STDIN, NULL, - buffer.fd, NULL, - (size_t)isz, - SPLICE_F_MOVE); - switch(r) { - case -1: goto _e_splice; - case 0: /* TODO: splice reported end-of-input, should we ignore this? */ - rc = 10; - goto _cleanup_splice; - default: { - fprintf(stderr, "splice()'d %lu bytes into buffer (%ld size @ %d)\n", r, buffer.len, buffer.fd); - } - break; - } - //TODO: splice() all bytes from that buffer into STDOUT - rc = 0; - } else { - // Less than one buffer - ssize_t r = splice(F_STDIN, NULL, // TODO: XXX: WHY does splice() **ALWAYS** fail??? it literally never works??? - buffer.fd, NULL, - (size_t)isz, - SPLICE_F_MOVE); - switch(r) { - case -1: goto _e_splice; - case 0: /* TODO: splice reported end-of-input, should we ignore this? */ - rc = 10; - goto _cleanup_splice; - default: { - fprintf(stderr, "splice()'d %lu bytes into buffer (%ld size @ %d)\n", r, buffer.len, buffer.fd); - } - break; - } - // TODO: splice() isz bytes from buffer into stdout - rc = 0; - } - - // +Cleanup+ // -_cleanup_splice: if(0) -_e_splice: rc = (perror("splice()"), -1); - unmap_pages(buffer, NULL); - // -Cleanup- // - return rc; -} - -int collect_unsized(const option_t* opt) -{ - return 0; -} - -#if 0 -int collect_sized(off_t isz, const option_t* opt) -{ - const size_t sz = (size_t)isz; - fprintf(stderr, "size of input: %lu, max size of mapping: %lu (buffers %lu / lo %lu)\n", sz, opt->sized.maxsz, - sz % opt->sized.maxsz, - sz / opt->sized.maxsz); - - //fcntl(F_STDOUT, ... SOMETHING to make splice() work here... - //TODO :: XXX: : WHY can't we splice() here???? w/e.. -#if 1 - if( fallocate(F_STDOUT, 0 /* | FALLOC_FL_KEEP_SIZE*/, 0, isz) != 0) { - perror("fallocate(STDOUT)"); -// return 1; - } -#endif - - if( fcntl(F_STDOUT, F_SETFL, fcntl(F_STDOUT, F_GETFL) & ~O_APPEND) < 0 ) - { - perror("fcntl(stdout) + O_APPEND"); - return -O_APPEND; - } - ssize_t sprc = splice(F_STDIN, NULL, - F_STDOUT, NULL, //TODO: XXX: Why does this always fail? I've seen splice(1, 2) work before... - sz, - SPLICE_F_MOVE); - switch(sprc) { - case -1: perror("splice() whole buffer failed"); - return 1; - case 0: - fprintf(stderr, "splice() reported end-of-input. TODO: continue splicing, or ignore?\n"); - return 2; - default: - if((size_t)sprc == sz) return 0; - else if (sprc < sz) { - fprintf(stderr, "splice() moved only %ld / %lu bytes. TODO: move the other %lu bytes\n", - sprc, sz, - sz - (size_t)sprc); - return 3; - } else if(sprc > sz) fprintf(stderr, "splice() somehow moved %ld / %lu (+ %ld bytes more)\n", - sprc, sz, - (size_t)sprc - sz); - return -1; - } -#if 0 - // Map stdin - const void* stdin_map = map_input_buffer(F_STDIN, sz); - if(!stdin_map) goto e_map_input; - - - -cleanup: - - unmap_mem((void*)stdin_map, sz); - if(0) -e_map_input: - { fprintf(stderr, "failed to map stdin (%lu)\n", sz); rc = 1; } - return rc; -#endif -} - -#endif diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..34f4525 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,64 @@ +#[cfg(feature="jemalloc")] +extern crate jemallocator; + +#[cfg(feature="jemalloc")] +const _:() = { + #[global_allocator] + static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +}; + +use std::{ + io, + mem::MaybeUninit, + os::unix::prelude::*, + num::NonZeroUsize, +}; + +use bytes::{ + BytesMut, + Buf, + BufMut, +}; + +fn try_get_size(reader: &R) -> Option +where R: AsRawFd +{ + let fd = reader.as_raw_fd(); + use libc::{ + fstat64, + stat64, + }; + if fd < 0 { + return None; + } + let mut st: MaybeUninit = MaybeUninit::uninit(); + unsafe { + match fstat64(fd, st.as_mut_ptr()) { + 0 => { + NonZeroUsize::new(st.assume_init().st_size as usize) + }, + _ => None, + } + } +} + +fn main() -> io::Result<()> { + let (bytes, read) = { + let stdin = io::stdin(); + let mut bytes = match try_get_size(&stdin) { + Some(sz) => BytesMut::with_capacity(sz.into()), + None => BytesMut::new(), + }; + + let read = io::copy(&mut stdin.lock(), &mut (&mut bytes).writer())?; + (bytes.freeze(), read as usize) + }; + + let written = io::copy(&mut bytes.slice(..read).reader() , &mut io::stdout().lock())?; + + if read != written as usize { + return Err(io::Error::new(io::ErrorKind::BrokenPipe, format!("read {read} bytes, but only wrote {written}"))); + } + + Ok(()) +}