You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
3.5 KiB

use super::*;
use job::Job;
use std::thread;
use std::thread::JoinHandle;
use std::ops::Drop;
use std::iter::FromIterator;
#[inline] fn num_threads() -> usize
{
num_cpus::get()+1
}
fn work(recv: state::PendingReceiver<Job>) -> Result<(), Box<dyn std::error::Error>>
{
while let Some(mut job) = recv.recv()?
{
let written = work::work_on(&mut job)?;
job.complete(written)?;
}
Ok(())
}
/// Set up the thread pool with this receiver
///
/// # Notes
/// This method does not spawn the threads itself, the iterator must be consumed to spawn them.
/// Collect the returned iterator into a container to spawn the threads.
/// ```
/// # use crate::{state, pool};
/// let (tx, rx) = state::channel(16);
/// {
/// let handles: pool::Pool = pool::spawn_for_lazy(rx).collect();
/// // threads are now spawned, do work with job sender `tx`
/// } // when `handles` is dropped, it will wait on the worker threads
/// ```
fn spawn_for_lazy(recv: state::PendingReceiver<Job>) -> impl Iterator<Item = JoinHandle<()>>
{
(0..num_threads()).map(move |_| {
let recv = recv.clone();
thread::spawn(move || {
let _ = work(recv);
})
})
}
/// Spawn the thread pool and return handles to the spawned threads as a `Pool`.
#[inline] fn spawn_for_pool(recv: state::PendingReceiver<Job>) -> Pool
{
spawn_for_lazy(recv).collect()
}
/// Spawn and then immediately wait on a thread pool to complete.
///
/// Has the same behaviour as `drop(spawn_for_pool(recv))` but skips a potential heap allocation.
#[inline] pub fn spawn_for_and_join(recv: state::PendingReceiver<Job>)
{
join_all(spawn_for_lazy(recv));
}
/// Join an iterator of `JoinHandle`s and discard their results.
#[inline]
pub fn join_all<T>(i: impl IntoIterator<Item = JoinHandle<T>>)
{
for x in i {
let _ = x.join(); //ignore worker panics
}
}
/// Vector type used for thread pool waiting.
///
/// This container is compiled will an estimate of enough stack storage for a reasonably sized thread pool.
/// The number of processors of the CPU this binary was built on is used as this estimate, and will only heap allocate when spawning more threads that number.
pub type PoolContainer = smallvec::SmallVec<[JoinHandle<()>; crate::consts::CPUS]>;
/// A pool container that ensures worker threads are waited on before termination.
#[derive(Debug)]
pub struct Pool(PoolContainer);
impl Pool
{
/// Join the whole pool
#[inline] pub fn join(self)
{
drop(self)
}
/// Detach the pool but do not join its worker threads.
#[inline] pub fn detach(mut self)
{
self.0 = Default::default();
}
/// The number of threads in the pool
#[inline] pub fn num_threads(&self) -> usize
{
self.0.len()
}
}
impl FromIterator<JoinHandle<()>> for Pool
{
fn from_iter<I: IntoIterator<Item = JoinHandle<()>>>(iter: I) -> Self
{
Self(iter.into_iter().collect())
}
}
impl IntoIterator for Pool
{
type Item= JoinHandle<()>;
type IntoIter = smallvec::IntoIter<[JoinHandle<()>; crate::consts::CPUS]>;
fn into_iter(mut self) -> Self::IntoIter
{
std::mem::replace(&mut self.0, Default::default()).into_iter()
}
}
impl Drop for Pool
{
fn drop(&mut self)
{
if self.0.len() > 0 {
join_all(std::mem::replace(&mut self.0, Default::default()));
}
}
}
/// Spawn a thread pool.
///
/// The pool can be spawned in different ways, this uses the default one.
/// The others are not made public except [`spawn_for_and_join`], as their behaviour differences are very subtle.
#[inline(always)] pub fn pool(recv: state::PendingReceiver<Job>) -> Pool
{
spawn_for_pool(recv)
}