From 795cbfb6a0adf52013483ab8e68d79c8633cbddf Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 29 Dec 2020 18:24:30 +0000 Subject: [PATCH] job sending oke --- src/main.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index a80d626..b1045ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use std::convert::TryFrom; +use std::sync::mpsc; #[macro_export] macro_rules! unwrap { (return $($code:expr)?; $err:expr) => { @@ -68,15 +69,25 @@ fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value - let state: state::State = {todo!()}; - - // - spawn the task thread pool + let (tx, recv_rx) = mpsc::channel(); // - move the output mapped file to the thread-safe refcounted `state::State`. + let state: state::State = state::State::new(output, tx); + // - spawn the task thread pool + let (job_tx, rx) = state::channel(jobs.len()); + let pool = pool::pool(rx); // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`) - + for job in jobs { + job_tx.send(job.start(state.clone()))?; + } // - Read the completion stream receiver until all file jobs have been signaled as completed + while let Ok((idx, sz)) = recv_rx.recv() { + *complete.get_mut(idx).unwrap() = job::Status::Complete(sz); + //TODO: Dispatch if in order + } // - wait on all worker threads to complete. + pool.join(); // - ensure all data was written. + // - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage) {