Concurrent object Pool<T>

Fortune for lazy-rebuild's current commit: Future blessing − 末吉
rust-version
Avril 3 years ago
parent 01d842f72f
commit 24c0fcb584
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -6,6 +6,8 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
atomic_refcell = "0.1.7"
crossbeam-queue = "0.3.2"
cryptohelpers = { version = "1.8.2", features = ["full", "async"] }
futures = "0.3.17"
tokio = { version = "1.11.0", features = ["full"] }

@ -3,10 +3,12 @@ use std::convert::Infallible;
use std::path::PathBuf;
mod ext; use ext::*;
mod pool;
mod handle;
#[tokio::main]
async fn main() {
//TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default.
let (tx, mut rx) = handle::spawn(Default::default());
let _res = tokio::join![
tokio::spawn(async move {

@ -0,0 +1,138 @@
use super::*;
use std::sync::{Arc,Weak};
use std::mem::ManuallyDrop;
use crossbeam_queue::ArrayQueue;
use std::ops::{
Drop,
Deref, DerefMut,
};
use std::borrow::Borrow;
/// An owned handle to a rented value in a pool.
///
/// When the handle is dropped, the value will be placed back in the pool if the pool has not already been filled by a replacement(s) or dropped..
#[derive(Debug)]
pub struct Handle<T>(ManuallyDrop<T>, Weak<ArrayQueue<T>>);
impl<T> Handle<T>
{
/// Detach this instance from its owning pool.
///
/// The value will not be replaced when this handle is dropped.
pub fn detach(&mut self)
{
self.1 = Weak::new();
}
/// Detach this instance, and insert a new value into the pool in its place.
///
/// # Returns
/// `true` if the replacement succeeded.
/// `false` if there was no room in the pool, or if the pool has been dropped.
pub fn replace(&mut self) -> bool
where T: Default
{
match std::mem::replace(&mut self.1, Weak::new()).upgrade()
{
Some(v) => v.push(T::default()).is_ok(),
_ => false,
}
}
/// Remove the value from the pool.
pub fn into_inner(mut self) -> T
{
self.1 = Weak::new();
let val = unsafe {ManuallyDrop::take(&mut self.0)};
std::mem::forget(self);
val
}
/// Is the pool still alive?
pub fn has_owner(&self) -> bool
{
self.1.strong_count() > 0
}
}
impl<T> Deref for Handle<T>
{
type Target = T;
#[inline] fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for Handle<T>
{
#[inline] fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> Borrow<T> for Handle<T>
{
#[inline] fn borrow(&self) -> &T
{
&self.0
}
}
/// A self-replacing concurrent pool of rentable objects.
///
/// # State
/// Objects are ephemeral and may be dropped and replaced whenever.
/// Do not rely on the state of objects in the pool to remain consistent.
#[derive(Debug)]
pub struct Pool<T>
{
objects: Arc<ArrayQueue<T>>,
}
impl<T> Drop for Handle<T>
{
fn drop(&mut self) {
if let Some(owner) = self.1.upgrade() {
let value = unsafe {ManuallyDrop::take(&mut self.0)};
drop(owner.push(value));
} else {
unsafe {
ManuallyDrop::drop(&mut self.0);
}
}
}
}
impl<T: Default> Pool<T>
{
/// Create a new pool with a specific number of objects.
/// This number as a maximum capacity will not change.
pub fn with_capacity(cap: usize) -> Self
{
let objects = ArrayQueue::new(cap);
for x in
std::iter::repeat_with(T::default).take(cap)
{
assert!(objects.push(x).is_ok());
}
Self {
objects: Arc::new(objects),
}
}
/// Create a new pool constructed with the default number of objects.
#[inline] pub fn new() -> Self
{
Self::with_capacity(32)
}
/// Rent an object from the pool of objects.
/// If one is not available, a new one is constructed.
///
/// The object is moved from the pool to a handle.
/// It is therefore recommended to box large objects that are in a pool.
pub fn rent(&self) -> Handle<T>
{
if let Some(last) = self.objects.pop()
{
Handle(ManuallyDrop::new(last), Arc::downgrade(&self.objects))
} else {
Handle(ManuallyDrop::new(T::default()), Arc::downgrade(&self.objects))
}
}
}
Loading…
Cancel
Save