You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
74 lines
2.7 KiB
74 lines
2.7 KiB
3 years ago
|
use super::*;
|
||
|
use std::sync::mpsc;
|
||
|
|
||
|
macro_rules! vec_default {
|
||
|
($num:expr) => {
|
||
|
{
|
||
|
let num = $num;
|
||
|
|
||
|
let mut v= Vec::with_capacity(num);
|
||
|
v.resize_with(num, Default::default);
|
||
|
v
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn std::error::Error>>
|
||
|
{
|
||
|
// - create and open output file
|
||
|
let output = open::output(output)?;
|
||
|
// - open and stat all input files in order (`job::create_from_file`).
|
||
|
let inputs = open::input(inputs)?;
|
||
|
if inputs.len() == 0 {
|
||
|
output.into_inner().set_len(0)?; // Just truncate the output and return
|
||
|
return Ok(());
|
||
|
}
|
||
|
let (jobs, size): (Vec<_>, _) = {
|
||
|
let mut off = 0;
|
||
|
((0..).zip(inputs.into_iter()).map(|(i, input)| job::create(i, input, &mut off)).collect(), off)
|
||
|
};
|
||
|
debug_assert!(u64::try_from(size).is_ok(), "Output file too large, would exceed unsigned 64bit integer");
|
||
|
|
||
|
// - `fallocate` the output file fd to the sum of all input file sizes
|
||
|
// - `mmap` the output file as writable
|
||
|
let output = output.complete(size)?;
|
||
|
|
||
|
// - create state
|
||
|
let mut complete: Vec<job::Status<usize>> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value
|
||
|
let (tx, recv_rx) = mpsc::channel();
|
||
|
// - move the output mapped file to the thread-safe refcounted `state::State`.
|
||
|
let state: state::State = state::State::new(output, tx);
|
||
|
// - spawn the task thread pool
|
||
|
let (job_tx, rx) = state::channel(jobs.len());
|
||
|
let pool = pool::pool(rx);
|
||
|
// - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`)
|
||
|
for job in jobs {
|
||
|
job_tx.send(job.start(state.clone()))?;
|
||
|
}
|
||
|
drop(job_tx);
|
||
|
// - Read the completion stream receiver until all file jobs have been signaled as completed
|
||
|
|
||
|
let mut done=0;
|
||
|
while let Ok((idx, sz)) = recv_rx.recv() {
|
||
|
*complete.get_mut(idx).unwrap() = job::Status::Complete(sz);
|
||
|
work::output_if_able(&mut complete);
|
||
|
done+=1;
|
||
|
if done == complete.len() {
|
||
|
drop(recv_rx);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
// - wait on all worker threads to complete.
|
||
|
pool.join();
|
||
|
// - TODO: ensure all data was written.
|
||
|
|
||
|
// - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage)
|
||
|
{
|
||
|
use std::io::Write;
|
||
|
let mut output = state.try_into_inner().unwrap().unmap();
|
||
|
output.set_len(u64::try_from(size).expect("File size is larger than u64::MAX"))?;
|
||
|
output.flush()?;
|
||
|
}
|
||
|
Ok(())
|
||
|
}
|