job sending oke

serial
Avril 4 years ago
parent a65590d86a
commit 795cbfb6a0
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,6 +1,7 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::mpsc;
#[macro_export] macro_rules! unwrap { #[macro_export] macro_rules! unwrap {
(return $($code:expr)?; $err:expr) => { (return $($code:expr)?; $err:expr) => {
@ -68,15 +69,25 @@ fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn st
// - create state // - create state
let mut complete: Vec<job::Status<usize>> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value let mut complete: Vec<job::Status<usize>> = vec_default!(jobs.len()); // when we receive completion from the stream (which yeilds `(usize, usize)`), the first item is the *index* in this vector to set, and the second is the value
let state: state::State = {todo!()}; let (tx, recv_rx) = mpsc::channel();
// - spawn the task thread pool
// - move the output mapped file to the thread-safe refcounted `state::State`. // - move the output mapped file to the thread-safe refcounted `state::State`.
let state: state::State = state::State::new(output, tx);
// - spawn the task thread pool
let (job_tx, rx) = state::channel(jobs.len());
let pool = pool::pool(rx);
// - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`) // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`)
for job in jobs {
job_tx.send(job.start(state.clone()))?;
}
// - Read the completion stream receiver until all file jobs have been signaled as completed // - Read the completion stream receiver until all file jobs have been signaled as completed
while let Ok((idx, sz)) = recv_rx.recv() {
*complete.get_mut(idx).unwrap() = job::Status::Complete(sz);
//TODO: Dispatch if in order
}
// - wait on all worker threads to complete. // - wait on all worker threads to complete.
pool.join();
// - ensure all data was written. // - ensure all data was written.
// - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage) // - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage)
{ {

Loading…
Cancel
Save