From fa63d6154f3f828c1cf137319f3ea94e3ab0ae77 Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 29 Dec 2020 23:07:10 +0000 Subject: [PATCH] added single-threaded mode --- Cargo.toml | 5 ++++ src/arg.rs | 2 +- src/main.rs | 76 +++++------------------------------------------------ src/par.rs | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/ser.rs | 51 +++++++++++++++++++++++++++++++++++ 5 files changed, 137 insertions(+), 70 deletions(-) create mode 100644 src/par.rs create mode 100644 src/ser.rs diff --git a/Cargo.toml b/Cargo.toml index 1ada125..dc938ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["threads"] + +threads = [] + [dependencies] libc = "0.2.81" memmap = "0.7.0" diff --git a/src/arg.rs b/src/arg.rs index 2de83fe..71f3d80 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -14,7 +14,7 @@ pub fn usage() -> ! eprintln!("The output file is required, if one is not provided, we will just print this message and exit with error code `2`. If there are no input files, it will just create an empty output and return with error code `0` (unless there were other errors)"); eprintln!("Output files are always clobbered regardless if there are any inputs, or if the concatenation operation succeeds / fails"); eprintln!(); - eprintln!(" (compiled with write parallelisation enabled)"); //TODO: we will eventually feature gate the threaded scheduler based solution. but for now, it is the only one implemented so we'll just print this always + #[cfg(feature="threads")] eprintln!(" (compiled with write parallelisation enabled)"); std::process::exit(2) } diff --git a/src/main.rs b/src/main.rs index 91ee20a..35278f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] use std::convert::TryFrom; -use std::sync::mpsc; #[macro_export] macro_rules! unwrap { (return $($code:expr)?; $err:expr) => { @@ -22,23 +21,13 @@ use std::sync::mpsc; }; } -macro_rules! vec_default { - ($num:expr) => { - { - let num = $num; - - let mut v= Vec::with_capacity(num); - v.resize_with(num, Default::default); - v - } - } -} mod state; mod pool; mod map; mod job; + mod work; mod arg; @@ -46,64 +35,13 @@ mod open; mod consts; -fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box> -{ - // todo: - // - create and open output file - let output = open::output(output)?; - // - open and stat all input files in order (`job::create_from_file`). - let inputs = open::input(inputs)?; - if inputs.len() == 0 { - output.into_inner().set_len(0)?; // Just truncate the output and return - return Ok(()); - } - let (jobs, size): (Vec<_>, _) = { - let mut off = 0; - ((0..).zip(inputs.into_iter()).map(|(i, input)| job::create(i, input, &mut off)).collect(), off) - }; - debug_assert!(u64::try_from(size).is_ok(), "Output file too large, would exceed unsigned 64bit integer"); - - // - `fallocate` the output file fd to the sum of all input file sizes - // - `mmap` the output file as writable - let output = output.complete(size)?; - - // - create state - let mut complete: Vec> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value - let (tx, recv_rx) = mpsc::channel(); - // - move the output mapped file to the thread-safe refcounted `state::State`. - let state: state::State = state::State::new(output, tx); - // - spawn the task thread pool - let (job_tx, rx) = state::channel(jobs.len()); - let pool = pool::pool(rx); - // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`) - for job in jobs { - job_tx.send(job.start(state.clone()))?; - } - drop(job_tx); - // - Read the completion stream receiver until all file jobs have been signaled as completed +#[cfg(feature="threads")] mod par; +#[cfg(not(feature="threads"))] mod ser; - let mut done=0; - while let Ok((idx, sz)) = recv_rx.recv() { - *complete.get_mut(idx).unwrap() = job::Status::Complete(sz); - work::output_if_able(&mut complete); - done+=1; - if done == complete.len() { - drop(recv_rx); - break; - } - } - // - wait on all worker threads to complete. - pool.join(); - // - TODO: ensure all data was written. - - // - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage) - { - use std::io::Write; - let mut output = state.try_into_inner().unwrap().unmap(); - output.set_len(u64::try_from(size).expect("File size is larger than u64::MAX"))?; - output.flush()?; - } - Ok(()) +#[inline] fn work(op: arg::Operation) -> Result<(), Box> +{ + #[cfg(feature="threads")] return par::work(op); + #[cfg(not(feature="threads"))] return ser::work(op); } fn main() { diff --git a/src/par.rs b/src/par.rs new file mode 100644 index 0000000..5d1c5b9 --- /dev/null +++ b/src/par.rs @@ -0,0 +1,73 @@ +use super::*; +use std::sync::mpsc; + +macro_rules! vec_default { + ($num:expr) => { + { + let num = $num; + + let mut v= Vec::with_capacity(num); + v.resize_with(num, Default::default); + v + } + } +} + +pub fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box> +{ + // - create and open output file + let output = open::output(output)?; + // - open and stat all input files in order (`job::create_from_file`). + let inputs = open::input(inputs)?; + if inputs.len() == 0 { + output.into_inner().set_len(0)?; // Just truncate the output and return + return Ok(()); + } + let (jobs, size): (Vec<_>, _) = { + let mut off = 0; + ((0..).zip(inputs.into_iter()).map(|(i, input)| job::create(i, input, &mut off)).collect(), off) + }; + debug_assert!(u64::try_from(size).is_ok(), "Output file too large, would exceed unsigned 64bit integer"); + + // - `fallocate` the output file fd to the sum of all input file sizes + // - `mmap` the output file as writable + let output = output.complete(size)?; + + // - create state + let mut complete: Vec> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value + let (tx, recv_rx) = mpsc::channel(); + // - move the output mapped file to the thread-safe refcounted `state::State`. + let state: state::State = state::State::new(output, tx); + // - spawn the task thread pool + let (job_tx, rx) = state::channel(jobs.len()); + let pool = pool::pool(rx); + // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`) + for job in jobs { + job_tx.send(job.start(state.clone()))?; + } + drop(job_tx); + // - Read the completion stream receiver until all file jobs have been signaled as completed + + let mut done=0; + while let Ok((idx, sz)) = recv_rx.recv() { + *complete.get_mut(idx).unwrap() = job::Status::Complete(sz); + work::output_if_able(&mut complete); + done+=1; + if done == complete.len() { + drop(recv_rx); + break; + } + } + // - wait on all worker threads to complete. + pool.join(); + // - TODO: ensure all data was written. + + // - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage) + { + use std::io::Write; + let mut output = state.try_into_inner().unwrap().unmap(); + output.set_len(u64::try_from(size).expect("File size is larger than u64::MAX"))?; + output.flush()?; + } + Ok(()) +} diff --git a/src/ser.rs b/src/ser.rs new file mode 100644 index 0000000..c1e9e58 --- /dev/null +++ b/src/ser.rs @@ -0,0 +1,51 @@ +use super::*; +use std::io::{self, Read}; + +fn read_into(mut job: impl Read, mut output: impl AsMut<[u8]>) -> io::Result +{ + let output = output.as_mut(); + let mut read=0; + while read < output.len() { + match job.read(&mut output[read..])? { + 0 => break, + current => read+=current, + } + } + Ok(read) +} + +pub fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box> +{ + // - create and open output file + let output = open::output(output)?; + // - open and stat all input files in order (`job::create_from_file`). + let inputs = open::input(inputs)?; + if inputs.len() == 0 { + output.into_inner().set_len(0)?; // Just truncate the output and return + return Ok(()); + } + let msz = usize::try_from(inputs.iter().map(|x| x.1.len()).sum::()).expect("Size too large"); + + let (mut file, _fsz) = { + let mut output = output.complete(msz)?; + let mut off=0; + for open::Input(file, stat) in inputs + { + let sz = usize::try_from(stat.len()).expect("Size too large"); + let next = read_into(file, &mut output.as_slice_mut()[off.. (off+sz)])?; + assert_eq!(sz, next, "Failed to read whole file"); + println!("{}", next); + off += next; + } + (output.unmap(), off) + }; + debug_assert_eq!(_fsz, msz); + + { + use std::io::Write; + file.set_len(msz as u64)?; + file.flush()?; + } + + Ok(()) +}