added single-threaded mode

serial
Avril 4 years ago
parent 94d0778895
commit fa63d6154f
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -7,6 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["threads"]
threads = []
[dependencies]
libc = "0.2.81"
memmap = "0.7.0"

@ -14,7 +14,7 @@ pub fn usage() -> !
eprintln!("The output file is required, if one is not provided, we will just print this message and exit with error code `2`. If there are no input files, it will just create an empty output and return with error code `0` (unless there were other errors)");
eprintln!("Output files are always clobbered regardless if there are any inputs, or if the concatenation operation succeeds / fails");
eprintln!();
eprintln!(" (compiled with write parallelisation enabled)"); //TODO: we will eventually feature gate the threaded scheduler based solution. but for now, it is the only one implemented so we'll just print this always
#[cfg(feature="threads")] eprintln!(" (compiled with write parallelisation enabled)");
std::process::exit(2)
}

@ -1,7 +1,6 @@
#![allow(dead_code)]
use std::convert::TryFrom;
use std::sync::mpsc;
#[macro_export] macro_rules! unwrap {
(return $($code:expr)?; $err:expr) => {
@ -22,23 +21,13 @@ use std::sync::mpsc;
};
}
macro_rules! vec_default {
($num:expr) => {
{
let num = $num;
let mut v= Vec::with_capacity(num);
v.resize_with(num, Default::default);
v
}
}
}
mod state;
mod pool;
mod map;
mod job;
mod work;
mod arg;
@ -46,64 +35,13 @@ mod open;
mod consts;
fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn std::error::Error>>
{
// todo:
// - create and open output file
let output = open::output(output)?;
// - open and stat all input files in order (`job::create_from_file`).
let inputs = open::input(inputs)?;
if inputs.len() == 0 {
output.into_inner().set_len(0)?; // Just truncate the output and return
return Ok(());
}
let (jobs, size): (Vec<_>, _) = {
let mut off = 0;
((0..).zip(inputs.into_iter()).map(|(i, input)| job::create(i, input, &mut off)).collect(), off)
};
debug_assert!(u64::try_from(size).is_ok(), "Output file too large, would exceed unsigned 64bit integer");
#[cfg(feature="threads")] mod par;
#[cfg(not(feature="threads"))] mod ser;
// - `fallocate` the output file fd to the sum of all input file sizes
// - `mmap` the output file as writable
let output = output.complete(size)?;
// - create state
let mut complete: Vec<job::Status<usize>> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value
let (tx, recv_rx) = mpsc::channel();
// - move the output mapped file to the thread-safe refcounted `state::State`.
let state: state::State = state::State::new(output, tx);
// - spawn the task thread pool
let (job_tx, rx) = state::channel(jobs.len());
let pool = pool::pool(rx);
// - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`)
for job in jobs {
job_tx.send(job.start(state.clone()))?;
}
drop(job_tx);
// - Read the completion stream receiver until all file jobs have been signaled as completed
let mut done=0;
while let Ok((idx, sz)) = recv_rx.recv() {
*complete.get_mut(idx).unwrap() = job::Status::Complete(sz);
work::output_if_able(&mut complete);
done+=1;
if done == complete.len() {
drop(recv_rx);
break;
}
}
// - wait on all worker threads to complete.
pool.join();
// - TODO: ensure all data was written.
// - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage)
{
use std::io::Write;
let mut output = state.try_into_inner().unwrap().unmap();
output.set_len(u64::try_from(size).expect("File size is larger than u64::MAX"))?;
output.flush()?;
}
Ok(())
#[inline] fn work(op: arg::Operation) -> Result<(), Box<dyn std::error::Error>>
{
#[cfg(feature="threads")] return par::work(op);
#[cfg(not(feature="threads"))] return ser::work(op);
}
fn main() {

@ -0,0 +1,73 @@
use super::*;
use std::sync::mpsc;
macro_rules! vec_default {
($num:expr) => {
{
let num = $num;
let mut v= Vec::with_capacity(num);
v.resize_with(num, Default::default);
v
}
}
}
pub fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn std::error::Error>>
{
// - create and open output file
let output = open::output(output)?;
// - open and stat all input files in order (`job::create_from_file`).
let inputs = open::input(inputs)?;
if inputs.len() == 0 {
output.into_inner().set_len(0)?; // Just truncate the output and return
return Ok(());
}
let (jobs, size): (Vec<_>, _) = {
let mut off = 0;
((0..).zip(inputs.into_iter()).map(|(i, input)| job::create(i, input, &mut off)).collect(), off)
};
debug_assert!(u64::try_from(size).is_ok(), "Output file too large, would exceed unsigned 64bit integer");
// - `fallocate` the output file fd to the sum of all input file sizes
// - `mmap` the output file as writable
let output = output.complete(size)?;
// - create state
let mut complete: Vec<job::Status<usize>> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value
let (tx, recv_rx) = mpsc::channel();
// - move the output mapped file to the thread-safe refcounted `state::State`.
let state: state::State = state::State::new(output, tx);
// - spawn the task thread pool
let (job_tx, rx) = state::channel(jobs.len());
let pool = pool::pool(rx);
// - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`)
for job in jobs {
job_tx.send(job.start(state.clone()))?;
}
drop(job_tx);
// - Read the completion stream receiver until all file jobs have been signaled as completed
let mut done=0;
while let Ok((idx, sz)) = recv_rx.recv() {
*complete.get_mut(idx).unwrap() = job::Status::Complete(sz);
work::output_if_able(&mut complete);
done+=1;
if done == complete.len() {
drop(recv_rx);
break;
}
}
// - wait on all worker threads to complete.
pool.join();
// - TODO: ensure all data was written.
// - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage)
{
use std::io::Write;
let mut output = state.try_into_inner().unwrap().unmap();
output.set_len(u64::try_from(size).expect("File size is larger than u64::MAX"))?;
output.flush()?;
}
Ok(())
}

@ -0,0 +1,51 @@
use super::*;
use std::io::{self, Read};
fn read_into(mut job: impl Read, mut output: impl AsMut<[u8]>) -> io::Result<usize>
{
let output = output.as_mut();
let mut read=0;
while read < output.len() {
match job.read(&mut output[read..])? {
0 => break,
current => read+=current,
}
}
Ok(read)
}
pub fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn std::error::Error>>
{
// - create and open output file
let output = open::output(output)?;
// - open and stat all input files in order (`job::create_from_file`).
let inputs = open::input(inputs)?;
if inputs.len() == 0 {
output.into_inner().set_len(0)?; // Just truncate the output and return
return Ok(());
}
let msz = usize::try_from(inputs.iter().map(|x| x.1.len()).sum::<u64>()).expect("Size too large");
let (mut file, _fsz) = {
let mut output = output.complete(msz)?;
let mut off=0;
for open::Input(file, stat) in inputs
{
let sz = usize::try_from(stat.len()).expect("Size too large");
let next = read_into(file, &mut output.as_slice_mut()[off.. (off+sz)])?;
assert_eq!(sz, next, "Failed to read whole file");
println!("{}", next);
off += next;
}
(output.unmap(), off)
};
debug_assert_eq!(_fsz, msz);
{
use std::io::Write;
file.set_len(msz as u64)?;
file.flush()?;
}
Ok(())
}
Loading…
Cancel
Save