serial
Avril 4 years ago
parent 795cbfb6a0
commit 94d0778895
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -79,17 +79,24 @@ fn work(arg::Operation{output, inputs}: arg::Operation) -> Result<(), Box<dyn st
for job in jobs { for job in jobs {
job_tx.send(job.start(state.clone()))?; job_tx.send(job.start(state.clone()))?;
} }
drop(job_tx);
// - Read the completion stream receiver until all file jobs have been signaled as completed // - Read the completion stream receiver until all file jobs have been signaled as completed
let mut done=0;
while let Ok((idx, sz)) = recv_rx.recv() { while let Ok((idx, sz)) = recv_rx.recv() {
*complete.get_mut(idx).unwrap() = job::Status::Complete(sz); *complete.get_mut(idx).unwrap() = job::Status::Complete(sz);
//TODO: Dispatch if in order work::output_if_able(&mut complete);
done+=1;
if done == complete.len() {
drop(recv_rx);
break;
}
} }
// - wait on all worker threads to complete. // - wait on all worker threads to complete.
pool.join(); pool.join();
// - ensure all data was written. // - TODO: ensure all data was written.
// - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage) // - truncate the output file to the correct size (sum of all the input sizes) if needed (we do not truncate before fallocate()ing it, so if a previous larger file existed there, it will have trailing garbage)
{ {
use std::io::Write; use std::io::Write;
let mut output = state.try_into_inner().unwrap().unmap(); let mut output = state.try_into_inner().unwrap().unmap();

@ -15,7 +15,7 @@ struct StateInner
{ {
map: UnsafeCell<super::map::MemoryMapMut>, map: UnsafeCell<super::map::MemoryMapMut>,
completion: mpsc::Sender<(usize, usize)>, completion: Mutex<mpsc::Sender<(usize, usize)>>,
} }
// SAFETY: The whole point of this is internal mutablility across thread boundaries. // SAFETY: The whole point of this is internal mutablility across thread boundaries.
@ -29,13 +29,13 @@ impl State
/// Create a new state from this map /// Create a new state from this map
#[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self #[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self
{ {
Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion})) Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion: Mutex::new(completion)}))
} }
/// Send a completion signal for the file of this index and this size. /// Send a completion signal for the file of this index and this size.
pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>> pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>>
{ {
self.0.completion.send((idx, size)) self.0.completion.lock().unwrap().send((idx, size))
} }
/// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one. /// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one.

@ -1,6 +1,7 @@
use super::*; use super::*;
use job::Job; use job::Job;
use std::io::{self, Read}; use std::io::{self, Read};
use std::fmt;
pub fn work_on(job: &mut Job) -> io::Result<usize> pub fn work_on(job: &mut Job) -> io::Result<usize>
{ {
@ -19,3 +20,21 @@ pub fn work_on(job: &mut Job) -> io::Result<usize>
} }
Ok(read) Ok(read)
} }
pub fn output_if_able<T, D>(ar: &mut T)
where T: AsMut<[job::Status<D>]> + ?Sized,
D: fmt::Display
{
let slice = ar.as_mut();
for item in slice.iter_mut()
{
if item.is_pending() {
return
} else {
if let Some(item) = item.take() {
println!("{}", item);
}
}
}
}

Loading…
Cancel
Save