You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
128 lines
3.2 KiB
128 lines
3.2 KiB
use super::*;
|
|
use job::Job;
|
|
use std::thread;
|
|
use std::thread::JoinHandle;
|
|
use std::ops::Drop;
|
|
use std::iter::FromIterator;
|
|
|
|
#[inline] fn num_threads() -> usize
|
|
{
|
|
num_cpus::get()+1
|
|
}
|
|
|
|
fn work(recv: state::PendingReceiver<Job>) -> Result<(), Box<dyn std::error::Error>>
|
|
{
|
|
while let Some(mut job) = recv.recv()?
|
|
{
|
|
work::work_on(&mut job)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Set up the thread pool with this receiver
|
|
///
|
|
/// # Notes
|
|
/// This method does not spawn the threads itself, the iterator must be consumed to spawn them.
|
|
/// Collect the returned iterator into a container to spawn the threads.
|
|
/// ```
|
|
/// # use crate::{state, pool};
|
|
/// let (tx, rx) = state::channel(16);
|
|
/// {
|
|
/// let handles: pool::Pool = pool::spawn_for_lazy(rx).collect();
|
|
/// // threads are now spawned, do work with job sender `tx`
|
|
/// } // when `handles` is dropped, it will wait on the worker threads
|
|
/// ```
|
|
fn spawn_for_lazy(recv: state::PendingReceiver<Job>) -> impl Iterator<Item = JoinHandle<()>>
|
|
{
|
|
(0..num_threads()).map(move |_| {
|
|
let recv = recv.clone();
|
|
thread::spawn(move || {
|
|
let _ = work(recv);
|
|
})
|
|
})
|
|
}
|
|
|
|
/// Spawn the thread pool and return handles to the spawned threads as a `Pool`.
|
|
#[inline] pub fn spawn_for_pool(recv: state::PendingReceiver<Job>) -> Pool
|
|
{
|
|
spawn_for_lazy(recv).collect()
|
|
}
|
|
|
|
/// Spawn and then immediately wait on a thread pool to complete.
|
|
///
|
|
/// Has the same behaviour as `drop(spawn_for_pool(recv))` but skips a heap allocation.
|
|
#[inline] pub fn spawn_for_and_join(recv: state::PendingReceiver<Job>)
|
|
{
|
|
join_all(spawn_for_lazy(recv));
|
|
}
|
|
|
|
/// Join an iterator of `JoinHandle`s and discard their results.
|
|
#[inline]
|
|
pub fn join_all<T>(i: impl IntoIterator<Item = JoinHandle<T>>)
|
|
{
|
|
for x in i {
|
|
let _ = x.join(); //ignore worker panics
|
|
}
|
|
}
|
|
|
|
/// Vector type used for thread pool waiting.
|
|
///
|
|
/// This container is compiled will an estimate of enough stack storage for a reasonably sized thread pool.
|
|
/// The number of processors of the CPU this binary was built on is used as this estimate, and will only heap allocate when spawning more threads that number.
|
|
pub type PoolContainer = smallvec::SmallVec<[JoinHandle<()>; crate::consts::CPUS]>;
|
|
|
|
/// A pool container that ensures worker threads are waited on before termination.
|
|
#[derive(Debug)]
|
|
pub struct Pool(PoolContainer);
|
|
|
|
impl Pool
|
|
{
|
|
/// Join the whole pool
|
|
#[inline] pub fn join(self)
|
|
{
|
|
drop(self)
|
|
}
|
|
|
|
/// Detach the pool but do not join its worker threads.
|
|
#[inline] pub fn detach(mut self)
|
|
{
|
|
self.0 = Default::default();
|
|
}
|
|
|
|
/// The number of threads in the pool
|
|
#[inline] pub fn num_threads(&self) -> usize
|
|
{
|
|
self.0.len()
|
|
}
|
|
}
|
|
|
|
impl FromIterator<JoinHandle<()>> for Pool
|
|
{
|
|
fn from_iter<I: IntoIterator<Item = JoinHandle<()>>>(iter: I) -> Self
|
|
{
|
|
Self(iter.into_iter().collect())
|
|
}
|
|
}
|
|
|
|
impl IntoIterator for Pool
|
|
{
|
|
type Item= JoinHandle<()>;
|
|
type IntoIter = smallvec::IntoIter<[JoinHandle<()>; crate::consts::CPUS]>;
|
|
|
|
fn into_iter(mut self) -> Self::IntoIter
|
|
{
|
|
std::mem::replace(&mut self.0, Default::default()).into_iter()
|
|
}
|
|
}
|
|
|
|
impl Drop for Pool
|
|
{
|
|
fn drop(&mut self)
|
|
{
|
|
if self.0.len() > 0 {
|
|
join_all(std::mem::replace(&mut self.0, Default::default()));
|
|
}
|
|
}
|
|
}
|