You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
3.5 KiB

3 years ago
use super::*;
use job::Job;
use std::thread;
use std::thread::JoinHandle;
use std::ops::Drop;
use std::iter::FromIterator;
#[inline] fn num_threads() -> usize
{
num_cpus::get()+1
}
fn work(recv: state::PendingReceiver<Job>) -> Result<(), Box<dyn std::error::Error>>
{
3 years ago
while let Some(mut job) = recv.recv()?
3 years ago
{
let written = work::work_on(&mut job)?;
job.complete(written)?;
3 years ago
}
Ok(())
}
/// Set up the thread pool with this receiver
///
/// # Notes
/// This method does not spawn the threads itself, the iterator must be consumed to spawn them.
/// Collect the returned iterator into a container to spawn the threads.
/// ```
/// # use crate::{state, pool};
/// let (tx, rx) = state::channel(16);
/// {
/// let handles: pool::Pool = pool::spawn_for_lazy(rx).collect();
/// // threads are now spawned, do work with job sender `tx`
/// } // when `handles` is dropped, it will wait on the worker threads
/// ```
fn spawn_for_lazy(recv: state::PendingReceiver<Job>) -> impl Iterator<Item = JoinHandle<()>>
{
(0..num_threads()).map(move |_| {
let recv = recv.clone();
thread::spawn(move || {
let _ = work(recv);
})
})
}
/// Spawn the thread pool and return handles to the spawned threads as a `Pool`.
3 years ago
#[inline] fn spawn_for_pool(recv: state::PendingReceiver<Job>) -> Pool
3 years ago
{
spawn_for_lazy(recv).collect()
}
/// Spawn and then immediately wait on a thread pool to complete.
///
3 years ago
/// Has the same behaviour as `drop(spawn_for_pool(recv))` but skips a potential heap allocation.
3 years ago
#[inline] pub fn spawn_for_and_join(recv: state::PendingReceiver<Job>)
{
join_all(spawn_for_lazy(recv));
}
/// Join an iterator of `JoinHandle`s and discard their results.
#[inline]
pub fn join_all<T>(i: impl IntoIterator<Item = JoinHandle<T>>)
{
for x in i {
let _ = x.join(); //ignore worker panics
}
}
3 years ago
/// Vector type used for thread pool waiting.
///
/// This container is compiled will an estimate of enough stack storage for a reasonably sized thread pool.
/// The number of processors of the CPU this binary was built on is used as this estimate, and will only heap allocate when spawning more threads that number.
pub type PoolContainer = smallvec::SmallVec<[JoinHandle<()>; crate::consts::CPUS]>;
3 years ago
/// A pool container that ensures worker threads are waited on before termination.
3 years ago
#[derive(Debug)]
3 years ago
pub struct Pool(PoolContainer);
3 years ago
impl Pool
{
/// Join the whole pool
#[inline] pub fn join(self)
{
drop(self)
}
3 years ago
/// Detach the pool but do not join its worker threads.
3 years ago
#[inline] pub fn detach(mut self)
{
3 years ago
self.0 = Default::default();
3 years ago
}
/// The number of threads in the pool
#[inline] pub fn num_threads(&self) -> usize
{
self.0.len()
}
}
impl FromIterator<JoinHandle<()>> for Pool
{
fn from_iter<I: IntoIterator<Item = JoinHandle<()>>>(iter: I) -> Self
{
Self(iter.into_iter().collect())
}
}
impl IntoIterator for Pool
{
type Item= JoinHandle<()>;
3 years ago
type IntoIter = smallvec::IntoIter<[JoinHandle<()>; crate::consts::CPUS]>;
3 years ago
fn into_iter(mut self) -> Self::IntoIter
{
3 years ago
std::mem::replace(&mut self.0, Default::default()).into_iter()
3 years ago
}
}
impl Drop for Pool
{
fn drop(&mut self)
{
if self.0.len() > 0 {
3 years ago
join_all(std::mem::replace(&mut self.0, Default::default()));
3 years ago
}
}
}
3 years ago
/// Spawn a thread pool.
///
/// The pool can be spawned in different ways, this uses the default one.
/// The others are not made public except [`spawn_for_and_join`], as their behaviour differences are very subtle.
#[inline(always)] pub fn pool(recv: state::PendingReceiver<Job>) -> Pool
{
spawn_for_pool(recv)
}