From a4674cea6dfbfc22b3ebe6793249bba0afbe3a79 Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 12 Feb 2021 02:39:08 +0000 Subject: [PATCH] deferred dropping --- shuffle3rs/Cargo.lock | 8 ++ shuffle3rs/Cargo.toml | 8 ++ shuffle3rs/src/defer_drop.rs | 148 +++++++++++++++++++++++++++++++++++ shuffle3rs/src/ext.rs | 120 ++++++++++++++++++++++++++++ shuffle3rs/src/main.rs | 7 ++ shuffle3rs/src/shuffle.rs | 9 ++- 6 files changed, 298 insertions(+), 2 deletions(-) create mode 100644 shuffle3rs/src/defer_drop.rs diff --git a/shuffle3rs/Cargo.lock b/shuffle3rs/Cargo.lock index 4ccc27e..b8cd917 100644 --- a/shuffle3rs/Cargo.lock +++ b/shuffle3rs/Cargo.lock @@ -17,6 +17,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.86" @@ -73,6 +79,8 @@ dependencies = [ name = "shuffle3rs" version = "0.1.0" dependencies = [ + "cfg-if", + "lazy_static", "rand", ] diff --git a/shuffle3rs/Cargo.toml b/shuffle3rs/Cargo.toml index 8e725ee..0421c4b 100644 --- a/shuffle3rs/Cargo.toml +++ b/shuffle3rs/Cargo.toml @@ -6,5 +6,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["deferred-drop"] + +# Move large objects to a seperate thread to be dropped. +deferred-drop = ["lazy_static"] + [dependencies] +cfg-if = "1.0.0" +lazy_static = {version = "1.4.0", optional = true} rand = "0.8.3" diff --git a/shuffle3rs/src/defer_drop.rs b/shuffle3rs/src/defer_drop.rs new file mode 100644 index 0000000..57d03d6 --- /dev/null +++ b/shuffle3rs/src/defer_drop.rs @@ -0,0 +1,148 @@ +use super::*; + +use std::{ + thread, + time::Duration, + sync::mpsc, + any::Any, + marker::{Send, Sync}, +}; + +pub type Defer = Box; + +/// A deferred dropping handle. +#[derive(Debug, Clone)] +#[repr(transparent)] +pub struct DeferredDropper(mpsc::Sender); + +impl DeferredDropper +{ + /// Drop a Boxed item on the background thread + /// + /// # Panics + /// If the background thread has panicked. + #[inline] pub fn drop_boxed(&self, item: Box) + { + self.0.send(item).unwrap(); + } + /// Drop an item on the background thread + /// + /// # Panics + /// If the background thread has panicked. + #[inline(always)] pub fn drop(&self, item: T) + { + self.drop_boxed(Box::new(item)) + } + + /// Send this deferring drop + /// + /// # Panics + /// If the background thread has panicked. + #[inline(always)] pub fn send(&self, item: impl Into) + { + self.0.send(item.into()).unwrap() + } +} + +/// Subscribe to the deferred dropper. Usually avoid doing this, and just use the `drop!` macro, or use the thread-local static dropper reference `HANDLE` directly. +fn defer_drop_sub() -> DeferredDropper +{ + use std::sync::Mutex; // Can we get rid of this somehow? I don't think we can. Either we mutex lock here, or we mutex lock on every (I think every, should look into that..) send. I think this is preferrable. + #[repr(transparent)] + struct Shim(Mutex>); + + unsafe impl Sync for Shim{} + + lazy_static! { + static ref TX: Shim = { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + for val in rx.into_iter() + .lag(Duration::from_millis(10)) + { + //let _ = thread::spawn(move || drop(val)).join(); // To catch panic? + drop(val); // What if this panics? + } + }); + Shim(Mutex::new(tx)) + }; + } + + DeferredDropper(TX.0.lock().unwrap().clone()) +} + +thread_local! { + pub static HANDLE: DeferredDropper = defer_drop_sub(); +} + +#[cfg(test)] +mod tests +{ + #[test] + fn mac() + { + use crate::*; + let sub = super::defer_drop_sub(); + + let large_vec = vec![String::from("hello world"); 1000]; + + + drop!(in sub; large_vec.clone()); + drop!(large_vec); + drop!(box Box::new("hello world?")); + drop!(in sub; box Box::new("hello world?")); + } + + #[test] + fn dropping_larges() + { + for joiner in std::iter::repeat_with(|| { + let large_vec = vec![String::from("hello world"); 1000]; + let h = { + let mut large_vec = large_vec.clone(); + std::thread::spawn(move || { + large_vec.sort(); + drop!(large_vec); + }) + }; + drop!(large_vec); + h + }).take(1000) + { + joiner.join().unwrap(); + } + std::thread::sleep(std::time::Duration::from_millis(500)); + } + #[test] + fn dropping_vec() + { + let sub = super::defer_drop_sub(); + + let large_vec = vec![String::from("hello world"); 1000]; + + + sub.drop(large_vec.clone()); + sub.drop_boxed(Box::new(large_vec)) //we can't just send boxed slice because MUH SIZED??? + //FUCK THIS! J)QI EJOAIJAOIW + /* + unsafe { + let raw = Box::into_raw(large_vec.into_boxed_slice()); + sub.send(Box::from_raw(raw as *mut (dyn std::any::Any + Send + 'static))).unwrap(); + }*/ + } + #[test] + fn clone_shim() + { + for joiner in std::iter::repeat_with(|| { + std::thread::spawn(move || { + let mut subs = Vec::new(); + for _ in 0..100 { + subs.push(super::defer_drop_sub()); + } + }) + }).take(1000) + { + joiner.join().unwrap(); + } + } +} diff --git a/shuffle3rs/src/ext.rs b/shuffle3rs/src/ext.rs index 00a40d3..04ebeb6 100644 --- a/shuffle3rs/src/ext.rs +++ b/shuffle3rs/src/ext.rs @@ -1,3 +1,6 @@ +use std::time::Duration; +use std::iter::{FusedIterator, DoubleEndedIterator}; + use rand::Rng; use crate::shuffle; @@ -32,3 +35,120 @@ impl ShuffleExt for [T] todo!() } } + +/// A lagged iterator +#[derive(Debug, Clone)] +pub struct Lag(Duration, I) +where I: Iterator; + + +impl Lag +where I: Iterator +{ + /// Set the lag duration + #[inline] pub fn set_duration(&mut self, dur: Duration) + { + self.0 = dur; + } + /// Get the lag duration + #[inline] pub fn duration(&self) -> Duration + { + self.0 + } +} +impl Lag +where I: Iterator +{ + /// Consume into the inner iterator + #[inline] pub fn into_inner(self) -> I + { + self.1 + } +} + +pub trait LagIterExt: Iterator +{ + fn lag(self, dur: Duration) -> Lag; +} + +impl LagIterExt for I +where I: Iterator +{ + #[inline] fn lag(self, dur: Duration) -> Lag + { + Lag(dur, self) + } +} + +impl Iterator for Lag +where I: Iterator +{ + type Item = T; + fn next(&mut self) -> Option + { + std::thread::sleep(self.0); + self.1.next() + } + fn size_hint(&self) -> (usize, Option) { + self.1.size_hint() + } +} + +impl FusedIterator for Lag +where I: Iterator + FusedIterator{} +impl ExactSizeIterator for Lag +where I: Iterator + ExactSizeIterator{} +impl DoubleEndedIterator for Lag +where I: Iterator + DoubleEndedIterator +{ + fn next_back(&mut self) -> Option + { + std::thread::sleep(self.0); + self.1.next() + } +} + +cfg_if!{ + + if #[cfg(feature="deferred-drop")] { + /// Drop this item on another thread + #[macro_export] macro_rules! drop { + (in $sub:ident; box $val:expr) => { + $sub.drop_boxed($val) + }; + (in $sub:ident; $val:expr) => { + $sub.drop($val) + }; + (box $val:expr) => { + { + $crate::defer_drop::HANDLE.with(move |sub| { + sub.drop($val) + }); + } + }; + ($val:expr) => { + { + $crate::defer_drop::HANDLE.with(move |sub| { + sub.drop($val) + }); + } + }; + + } + } else { + /// Drop this item on another thread + #[macro_export] macro_rules! drop { + (box $val:expr) => { + { + drop($val) + } + }; + ($val:expr) => { + { + drop($val) + } + }; + + } + } +} diff --git a/shuffle3rs/src/main.rs b/shuffle3rs/src/main.rs index b4d26cb..834bfa5 100644 --- a/shuffle3rs/src/main.rs +++ b/shuffle3rs/src/main.rs @@ -1,9 +1,16 @@ #![allow(dead_code)] +#[cfg(feature="deferred-drop")] #[macro_use] extern crate lazy_static; +#[macro_use] extern crate cfg_if; + #[macro_use] mod ext; use ext::*; + +#[cfg(feature="deferred-drop")] mod defer_drop; mod shuffle; + + fn main() { println!("Hello, world!"); } diff --git a/shuffle3rs/src/shuffle.rs b/shuffle3rs/src/shuffle.rs index b15b491..8d84275 100644 --- a/shuffle3rs/src/shuffle.rs +++ b/shuffle3rs/src/shuffle.rs @@ -1,7 +1,6 @@ use rand::Rng; use std::mem::swap; - /// Get a random element in this slice. pub fn element_in<'a, T, R: Rng + ?Sized>(slice: &'a (impl AsRef<[T]> + ?Sized), with: &mut R) -> &'a T { @@ -33,8 +32,14 @@ fn shuffle_slice(slice: &mut [T], with: &mut R) } } -//TODO: unshuffle +fn unshuffle_slice(slice: &mut [T], with: &mut R) +{ + let indecies: Vec<_> = (1..slice.len()).map(|idx| with.gen_range(0..idx)).collect(); + + todo!(); + drop!(indecies); +} /// Shuffle this slice with this `Rng`. #[inline(always)] pub fn shuffle(mut slice: impl AsMut<[T]>, with: &mut R) {