From 24c0fcb584f5a08d35d1badcafd5b53e45f04f08 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 22 Sep 2021 17:46:44 +0100 Subject: [PATCH] Concurrent object Pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for lazy-rebuild's current commit: Future blessing − 末吉 --- Cargo.toml | 2 + src/main.rs | 2 + src/pool.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 src/pool.rs diff --git a/Cargo.toml b/Cargo.toml index d668c45..789a1a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +atomic_refcell = "0.1.7" +crossbeam-queue = "0.3.2" cryptohelpers = { version = "1.8.2", features = ["full", "async"] } futures = "0.3.17" tokio = { version = "1.11.0", features = ["full"] } diff --git a/src/main.rs b/src/main.rs index a672905..16b3611 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,10 +3,12 @@ use std::convert::Infallible; use std::path::PathBuf; mod ext; use ext::*; +mod pool; mod handle; #[tokio::main] async fn main() { + //TODO: Is one hashing handler enough? Or should we have a pool of handlers, with a capacity of the number of threads or something? If we do that, we should disable tokio's threaded scheduler by default. let (tx, mut rx) = handle::spawn(Default::default()); let _res = tokio::join![ tokio::spawn(async move { diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..4004e0b --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,138 @@ +use super::*; +use std::sync::{Arc,Weak}; +use std::mem::ManuallyDrop; +use crossbeam_queue::ArrayQueue; +use std::ops::{ + Drop, + Deref, DerefMut, +}; +use std::borrow::Borrow; + +/// An owned handle to a rented value in a pool. +/// +/// When the handle is dropped, the value will be placed back in the pool if the pool has not already been filled by a replacement(s) or dropped.. +#[derive(Debug)] +pub struct Handle(ManuallyDrop, Weak>); + +impl Handle +{ + /// Detach this instance from its owning pool. + /// + /// The value will not be replaced when this handle is dropped. + pub fn detach(&mut self) + { + self.1 = Weak::new(); + } + /// Detach this instance, and insert a new value into the pool in its place. + /// + /// # Returns + /// `true` if the replacement succeeded. + /// `false` if there was no room in the pool, or if the pool has been dropped. + pub fn replace(&mut self) -> bool + where T: Default + { + match std::mem::replace(&mut self.1, Weak::new()).upgrade() + { + Some(v) => v.push(T::default()).is_ok(), + _ => false, + } + } + /// Remove the value from the pool. + pub fn into_inner(mut self) -> T + { + self.1 = Weak::new(); + let val = unsafe {ManuallyDrop::take(&mut self.0)}; + std::mem::forget(self); + val + } + /// Is the pool still alive? + pub fn has_owner(&self) -> bool + { + self.1.strong_count() > 0 + } +} + +impl Deref for Handle +{ + type Target = T; + #[inline] fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Handle +{ + #[inline] fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Borrow for Handle +{ + #[inline] fn borrow(&self) -> &T + { + &self.0 + } +} + +/// A self-replacing concurrent pool of rentable objects. +/// +/// # State +/// Objects are ephemeral and may be dropped and replaced whenever. +/// Do not rely on the state of objects in the pool to remain consistent. +#[derive(Debug)] +pub struct Pool +{ + objects: Arc>, +} + +impl Drop for Handle +{ + fn drop(&mut self) { + if let Some(owner) = self.1.upgrade() { + let value = unsafe {ManuallyDrop::take(&mut self.0)}; + drop(owner.push(value)); + } else { + unsafe { + ManuallyDrop::drop(&mut self.0); + } + } + } +} + +impl Pool +{ + /// Create a new pool with a specific number of objects. + /// This number as a maximum capacity will not change. + pub fn with_capacity(cap: usize) -> Self + { + let objects = ArrayQueue::new(cap); + for x in + std::iter::repeat_with(T::default).take(cap) + { + assert!(objects.push(x).is_ok()); + } + Self { + objects: Arc::new(objects), + } + } + /// Create a new pool constructed with the default number of objects. + #[inline] pub fn new() -> Self + { + Self::with_capacity(32) + } + /// Rent an object from the pool of objects. + /// If one is not available, a new one is constructed. + /// + /// The object is moved from the pool to a handle. + /// It is therefore recommended to box large objects that are in a pool. + pub fn rent(&self) -> Handle + { + if let Some(last) = self.objects.pop() + { + Handle(ManuallyDrop::new(last), Arc::downgrade(&self.objects)) + } else { + Handle(ManuallyDrop::new(T::default()), Arc::downgrade(&self.objects)) + } + } +}