initial commit

serial
Avril 4 years ago
commit 411367f326
Signed by: flanchan
GPG Key ID: 284488987C31F630

2
.gitignore vendored

@ -0,0 +1,2 @@
/target
*~

66
Cargo.lock generated

@ -0,0 +1,66 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "hermit-abi"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
dependencies = [
"libc",
]
[[package]]
name = "libc"
version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "mapcat"
version = "0.1.0"
dependencies = [
"memmap",
"num_cpus",
]
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

@ -0,0 +1,12 @@
[package]
name = "mapcat"
description = "memmap() based `cat`"
version = "0.1.0"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
memmap = "0.7.0"
num_cpus = "1.13.0"

@ -0,0 +1,7 @@
use super::*;
#[derive(Debug)]
pub struct Job
{
//todo
}

@ -0,0 +1,10 @@
#![allow(dead_code)]
mod state;
mod pool;
mod job;
fn main() {
println!("Hello, world!");
}

@ -0,0 +1,120 @@
use super::*;
use job::Job;
use std::thread;
use std::thread::JoinHandle;
use std::ops::Drop;
use std::iter::FromIterator;
#[inline] fn num_threads() -> usize
{
num_cpus::get()+1
}
fn work(recv: state::PendingReceiver<Job>) -> Result<(), Box<dyn std::error::Error>>
{
while let Some(job) = recv.recv()?
{
//todo: work on job
}
Ok(())
}
/// Set up the thread pool with this receiver
///
/// # Notes
/// This method does not spawn the threads itself, the iterator must be consumed to spawn them.
/// Collect the returned iterator into a container to spawn the threads.
/// ```
/// # use crate::{state, pool};
/// let (tx, rx) = state::channel(16);
/// {
/// let handles: pool::Pool = pool::spawn_for_lazy(rx).collect();
/// // threads are now spawned, do work with job sender `tx`
/// } // when `handles` is dropped, it will wait on the worker threads
/// ```
fn spawn_for_lazy(recv: state::PendingReceiver<Job>) -> impl Iterator<Item = JoinHandle<()>>
{
(0..num_threads()).map(move |_| {
let recv = recv.clone();
thread::spawn(move || {
let _ = work(recv);
})
})
}
/// Spawn the thread pool and return handles to the spawned threads as a `Pool`.
#[inline] pub fn spawn_for_pool(recv: state::PendingReceiver<Job>) -> Pool
{
spawn_for_lazy(recv).collect()
}
/// Spawn and then immediately wait on a thread pool to complete.
///
/// Has the same behaviour as `drop(spawn_for_pool(recv))` but skips a heap allocation.
#[inline] pub fn spawn_for_and_join(recv: state::PendingReceiver<Job>)
{
join_all(spawn_for_lazy(recv));
}
/// Join an iterator of `JoinHandle`s and discard their results.
#[inline]
pub fn join_all<T>(i: impl IntoIterator<Item = JoinHandle<T>>)
{
for x in i {
let _ = x.join(); //ignore worker panics
}
}
/// A pool container that ensures worker threads are waited on before termination.
pub struct Pool(Vec<JoinHandle<()>>);
impl Pool
{
/// Join the whole pool
#[inline] pub fn join(self)
{
drop(self)
}
/// Detach the pool and do not join its worker threads.
#[inline] pub fn detach(mut self)
{
self.0 = Vec::default();
}
/// The number of threads in the pool
#[inline] pub fn num_threads(&self) -> usize
{
self.0.len()
}
}
impl FromIterator<JoinHandle<()>> for Pool
{
fn from_iter<I: IntoIterator<Item = JoinHandle<()>>>(iter: I) -> Self
{
Self(iter.into_iter().collect())
}
}
impl IntoIterator for Pool
{
type Item= JoinHandle<()>;
type IntoIter = std::vec::IntoIter<JoinHandle<()>>;
fn into_iter(mut self) -> Self::IntoIter
{
std::mem::replace(&mut self.0, Vec::default()).into_iter()
}
}
impl Drop for Pool
{
fn drop(&mut self)
{
if self.0.len() > 0 {
join_all(std::mem::replace(&mut self.0, Vec::default()));
}
}
}

@ -0,0 +1,80 @@
use std::sync::{
mpsc,
Mutex,
Arc,
PoisonError,
};
use std::fmt;
use std::error;
/// A multi-consumer message receiver
#[derive(Debug)]
pub struct PendingReceiver<T>
{
recv: Arc<Mutex<mpsc::Receiver<T>>>,
}
impl<T> Clone for PendingReceiver<T>
{
fn clone(&self) -> Self
{
Self{
recv: Arc::clone(&self.recv),
}
}
}
impl<T> PendingReceiver<T>
{
/// Try to receive a message.
pub fn recv(&self) -> Result<Option<T>, PendingReceiverError>
{
Ok(self.recv.lock()?.recv().ok())
}
}
/// Create an `mpmc` channel.
pub fn channel<T>(cap: usize) -> (mpsc::SyncSender<T>, PendingReceiver<T>)
{
let (tx, rx) = mpsc::sync_channel(cap);
(tx, PendingReceiver{
recv: Arc::new(Mutex::new(rx))
})
}
#[derive(Debug)]
pub enum PendingReceiverError
{
Poisoned,
Closed,
}
impl error::Error for PendingReceiverError{}
impl fmt::Display for PendingReceiverError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Poisoned => write!(f, "poisoned"),
Self::Closed => write!(f, "receiver closed"),
}
}
}
impl<T> From<PoisonError<T>> for PendingReceiverError
{
#[inline] fn from(_from: PoisonError<T>) -> Self
{
Self::Poisoned
}
}
impl From<mpsc::RecvError> for PendingReceiverError
{
fn from(_from: mpsc::RecvError) -> Self
{
Self::Closed
}
}
Loading…
Cancel
Save