added completion stream support

serial
Avril 4 years ago
parent 0ecee59258
commit 295f0a1bcd
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -8,12 +8,15 @@ use std::io::{
self, self,
Read, Read,
}; };
use std::{fmt,error};
#[derive(Debug)] #[derive(Debug)]
pub struct Prelude pub struct Prelude
{ {
file: File, file: File,
stat: Metadata, stat: Metadata,
file_num: usize,
offset: usize, offset: usize,
} }
@ -30,6 +33,7 @@ impl Prelude{
fd: self.file, fd: self.file,
stat: self.stat, stat: self.stat,
offset: self.offset, offset: self.offset,
file_num: self.file_num,
state, state,
} }
} }
@ -41,6 +45,7 @@ pub struct Job
fd: File, fd: File,
stat: Metadata, stat: Metadata,
file_num: usize,
/// We grab the slice of memory we write to from here /// We grab the slice of memory we write to from here
state: state::State, state: state::State,
/// From this offset /// From this offset
@ -83,6 +88,12 @@ impl Job
self.state.slice(self.start() .. self.end()) self.state.slice(self.start() .. self.end())
} }
} }
/// Complete this job
pub fn complete(self, size: usize) -> Result<(), CompletionError>
{
self.state.send_complete(self.file_num, size).map_err(|_| CompletionError)
}
} }
impl Read for Job impl Read for Job
@ -97,7 +108,7 @@ impl Read for Job
/// ///
/// `sz` is the offset of the *end* of the last job. (or 0 for the first). /// `sz` is the offset of the *end* of the last job. (or 0 for the first).
/// `sz` is then updated with this file's size for this method to be used again on the next file. /// `sz` is then updated with this file's size for this method to be used again on the next file.
pub fn create_from_file(file: impl AsRef<Path>, sz: &mut usize) -> io::Result<Prelude> pub fn create_from_file(file_num: usize, file: impl AsRef<Path>, sz: &mut usize) -> io::Result<Prelude>
{ {
let file = OpenOptions::new() let file = OpenOptions::new()
.read(true) .read(true)
@ -107,9 +118,24 @@ pub fn create_from_file(file: impl AsRef<Path>, sz: &mut usize) -> io::Result<Pr
let offset = *sz; let offset = *sz;
let prelude = Prelude { let prelude = Prelude {
file, stat, offset, file, stat, offset, file_num
}; };
*sz += prelude.len(); *sz += prelude.len();
Ok(prelude) Ok(prelude)
} }
/// Error returned when main thread's completion receiver was dropped. This should be fatal.
#[derive(Debug)]
pub struct CompletionError;
impl error::Error for CompletionError{}
impl fmt::Display for CompletionError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "unable to send completion signal because main thread's completion receiver was dropped.")
}
}

@ -36,7 +36,9 @@ fn main() {
// //
// - spawn the task thread pool // - spawn the task thread pool
// - move the output mapped file to the thread-safe refcounted `state::State`. // - move the output mapped file to the thread-safe refcounted `state::State`.
// - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file (`job::Prelude::start`) // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`)
// - Read the completion stream receiver until all file jobs have been signaled as completed
// - wait on all worker threads to complete. // - wait on all worker threads to complete.
// - ensure all data was written. // - ensure all data was written.
} }

@ -14,9 +14,9 @@ fn work(recv: state::PendingReceiver<Job>) -> Result<(), Box<dyn std::error::Err
{ {
while let Some(mut job) = recv.recv()? while let Some(mut job) = recv.recv()?
{ {
work::work_on(&mut job)?; let written = work::work_on(&mut job)?;
job.complete(written)?;
} }
Ok(()) Ok(())
} }

@ -11,10 +11,11 @@ use std::cell::UnsafeCell;
use std::{slice::SliceIndex, ops::RangeBounds}; use std::{slice::SliceIndex, ops::RangeBounds};
#[derive(Debug)] #[derive(Debug)]
#[repr(transparent)]
struct StateInner struct StateInner
{ {
map: UnsafeCell<super::map::MemoryMapMut>, map: UnsafeCell<super::map::MemoryMapMut>,
completion: mpsc::Sender<(usize, usize)>,
} }
// SAFETY: The whole point of this is internal mutablility across thread boundaries. // SAFETY: The whole point of this is internal mutablility across thread boundaries.
@ -26,9 +27,15 @@ pub struct State(Arc<StateInner>);
impl State impl State
{ {
/// Create a new state from this map /// Create a new state from this map
#[inline] pub fn new(map: super::map::MemoryMapMut) -> Self #[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self
{
Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion}))
}
/// Send a completion signal for the file of this index and this size.
pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>>
{ {
Self(Arc::new(StateInner{map: UnsafeCell::new(map)})) self.0.completion.send((idx, size))
} }
/// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one. /// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one.

Loading…
Cancel
Save