From 411367f326d03c1d1783b76ef906e28883d0c5b7 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 26 Dec 2020 19:38:01 +0000 Subject: [PATCH] initial commit --- .gitignore | 2 + Cargo.lock | 66 ++++++++++++++++++++++++++++ Cargo.toml | 12 ++++++ src/job.rs | 7 +++ src/main.rs | 10 +++++ src/pool.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/state.rs | 80 ++++++++++++++++++++++++++++++++++ 7 files changed, 297 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/job.rs create mode 100644 src/main.rs create mode 100644 src/pool.rs create mode 100644 src/state.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e2a3069 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +*~ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..62fa319 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,66 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "hermit-abi" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" + +[[package]] +name = "mapcat" +version = "0.1.0" +dependencies = [ + "memmap", + "num_cpus", +] + +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..006a1b1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mapcat" +description = "memmap() based `cat`" +version = "0.1.0" +authors = ["Avril "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +memmap = "0.7.0" +num_cpus = "1.13.0" diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..7d19477 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,7 @@ +use super::*; + +#[derive(Debug)] +pub struct Job +{ + //todo +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..efaae78 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,10 @@ + +#![allow(dead_code)] + +mod state; +mod pool; +mod job; + +fn main() { + println!("Hello, world!"); +} diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..a533c39 --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,120 @@ +use super::*; +use job::Job; +use std::thread; +use std::thread::JoinHandle; +use std::ops::Drop; +use std::iter::FromIterator; + +#[inline] fn num_threads() -> usize +{ + num_cpus::get()+1 +} + +fn work(recv: state::PendingReceiver) -> Result<(), Box> +{ + while let Some(job) = recv.recv()? + { + //todo: work on job + } + + Ok(()) +} + +/// Set up the thread pool with this receiver +/// +/// # Notes +/// This method does not spawn the threads itself, the iterator must be consumed to spawn them. +/// Collect the returned iterator into a container to spawn the threads. +/// ``` +/// # use crate::{state, pool}; +/// let (tx, rx) = state::channel(16); +/// { +/// let handles: pool::Pool = pool::spawn_for_lazy(rx).collect(); +/// // threads are now spawned, do work with job sender `tx` +/// } // when `handles` is dropped, it will wait on the worker threads +/// ``` +fn spawn_for_lazy(recv: state::PendingReceiver) -> impl Iterator> +{ + (0..num_threads()).map(move |_| { + let recv = recv.clone(); + thread::spawn(move || { + let _ = work(recv); + }) + }) +} + +/// Spawn the thread pool and return handles to the spawned threads as a `Pool`. +#[inline] pub fn spawn_for_pool(recv: state::PendingReceiver) -> Pool +{ + spawn_for_lazy(recv).collect() +} + +/// Spawn and then immediately wait on a thread pool to complete. +/// +/// Has the same behaviour as `drop(spawn_for_pool(recv))` but skips a heap allocation. +#[inline] pub fn spawn_for_and_join(recv: state::PendingReceiver) +{ + join_all(spawn_for_lazy(recv)); +} + +/// Join an iterator of `JoinHandle`s and discard their results. +#[inline] +pub fn join_all(i: impl IntoIterator>) +{ + for x in i { + let _ = x.join(); //ignore worker panics + } +} + +/// A pool container that ensures worker threads are waited on before termination. +pub struct Pool(Vec>); + +impl Pool +{ + /// Join the whole pool + #[inline] pub fn join(self) + { + drop(self) + } + + /// Detach the pool and do not join its worker threads. + #[inline] pub fn detach(mut self) + { + self.0 = Vec::default(); + } + + /// The number of threads in the pool + #[inline] pub fn num_threads(&self) -> usize + { + self.0.len() + } +} + +impl FromIterator> for Pool +{ + fn from_iter>>(iter: I) -> Self + { + Self(iter.into_iter().collect()) + } +} + +impl IntoIterator for Pool +{ + type Item= JoinHandle<()>; + type IntoIter = std::vec::IntoIter>; + + fn into_iter(mut self) -> Self::IntoIter + { + std::mem::replace(&mut self.0, Vec::default()).into_iter() + } +} + +impl Drop for Pool +{ + fn drop(&mut self) + { + if self.0.len() > 0 { + join_all(std::mem::replace(&mut self.0, Vec::default())); + } + } +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..461e804 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,80 @@ +use std::sync::{ + mpsc, + Mutex, + Arc, + + PoisonError, +}; +use std::fmt; +use std::error; + +/// A multi-consumer message receiver +#[derive(Debug)] +pub struct PendingReceiver +{ + recv: Arc>>, +} + +impl Clone for PendingReceiver +{ + fn clone(&self) -> Self + { + Self{ + recv: Arc::clone(&self.recv), + } + } +} + +impl PendingReceiver +{ + /// Try to receive a message. + pub fn recv(&self) -> Result, PendingReceiverError> + { + Ok(self.recv.lock()?.recv().ok()) + } +} + +/// Create an `mpmc` channel. +pub fn channel(cap: usize) -> (mpsc::SyncSender, PendingReceiver) +{ + let (tx, rx) = mpsc::sync_channel(cap); + + (tx, PendingReceiver{ + recv: Arc::new(Mutex::new(rx)) + }) +} + +#[derive(Debug)] +pub enum PendingReceiverError +{ + Poisoned, + Closed, +} + +impl error::Error for PendingReceiverError{} +impl fmt::Display for PendingReceiverError +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Poisoned => write!(f, "poisoned"), + Self::Closed => write!(f, "receiver closed"), + } + } +} + +impl From> for PendingReceiverError +{ + #[inline] fn from(_from: PoisonError) -> Self + { + Self::Poisoned + } +} + +impl From for PendingReceiverError +{ + fn from(_from: mpsc::RecvError) -> Self + { + Self::Closed + } +}