From 295f0a1bcdd82e40b0448250105791d1d9723273 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 28 Dec 2020 14:31:07 +0000 Subject: [PATCH] added completion stream support --- src/job.rs | 30 ++++++++++++++++++++++++++++-- src/main.rs | 4 +++- src/pool.rs | 4 ++-- src/state.rs | 13 ++++++++++--- 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/job.rs b/src/job.rs index 03960c6..ee7f605 100644 --- a/src/job.rs +++ b/src/job.rs @@ -8,12 +8,15 @@ use std::io::{ self, Read, }; +use std::{fmt,error}; #[derive(Debug)] pub struct Prelude { file: File, stat: Metadata, + + file_num: usize, offset: usize, } @@ -30,6 +33,7 @@ impl Prelude{ fd: self.file, stat: self.stat, offset: self.offset, + file_num: self.file_num, state, } } @@ -41,6 +45,7 @@ pub struct Job fd: File, stat: Metadata, + file_num: usize, /// We grab the slice of memory we write to from here state: state::State, /// From this offset @@ -83,6 +88,12 @@ impl Job self.state.slice(self.start() .. self.end()) } } + + /// Complete this job + pub fn complete(self, size: usize) -> Result<(), CompletionError> + { + self.state.send_complete(self.file_num, size).map_err(|_| CompletionError) + } } impl Read for Job @@ -97,7 +108,7 @@ impl Read for Job /// /// `sz` is the offset of the *end* of the last job. (or 0 for the first). /// `sz` is then updated with this file's size for this method to be used again on the next file. -pub fn create_from_file(file: impl AsRef, sz: &mut usize) -> io::Result +pub fn create_from_file(file_num: usize, file: impl AsRef, sz: &mut usize) -> io::Result { let file = OpenOptions::new() .read(true) @@ -107,9 +118,24 @@ pub fn create_from_file(file: impl AsRef, sz: &mut usize) -> io::Result) -> fmt::Result + { + write!(f, "unable to send completion signal because main thread's completion receiver was dropped.") + } +} + + diff --git a/src/main.rs b/src/main.rs index 178a94d..0333241 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,7 +36,9 @@ fn main() { // // - spawn the task thread pool // - move the output mapped file to the thread-safe refcounted `state::State`. - // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file (`job::Prelude::start`) + // - dispatch jobs to the pool with their fds, stats, and calculated output offsets; along with a reference to the output mapped file and a sender for the completion stream (`job::Prelude::start`) + + // - Read the completion stream receiver until all file jobs have been signaled as completed // - wait on all worker threads to complete. // - ensure all data was written. } diff --git a/src/pool.rs b/src/pool.rs index bc554f8..e74cd72 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -14,9 +14,9 @@ fn work(recv: state::PendingReceiver) -> Result<(), Box, + + completion: mpsc::Sender<(usize, usize)>, } // SAFETY: The whole point of this is internal mutablility across thread boundaries. @@ -26,9 +27,15 @@ pub struct State(Arc); impl State { /// Create a new state from this map - #[inline] pub fn new(map: super::map::MemoryMapMut) -> Self + #[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self + { + Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion})) + } + + /// Send a completion signal for the file of this index and this size. + pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>> { - Self(Arc::new(StateInner{map: UnsafeCell::new(map)})) + self.0.completion.send((idx, size)) } /// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one.