diff --git a/Cargo.lock b/Cargo.lock index f696585..3bd25aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,6 +519,7 @@ dependencies = [ "lazy_static", "memmap", "pin-project", + "rand 0.8.3", "rustc_version", "serde", "serde_cbor", @@ -598,8 +599,8 @@ dependencies = [ "base64 0.12.3", "crypto-mac", "hmac", - "rand", - "rand_core", + "rand 0.7.3", + "rand_core 0.5.1", "sha2", "subtle", ] @@ -684,9 +685,21 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ - "rand_chacha", - "rand_core", - "rand_hc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc 0.2.0", +] + +[[package]] +name = "rand" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" +dependencies = [ + "libc", + "rand_chacha 0.3.0", + "rand_core 0.6.2", + "rand_hc 0.3.0", ] [[package]] @@ -696,7 +709,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.5.1", +] + +[[package]] +name = "rand_chacha" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.2", ] [[package]] @@ -708,13 +731,31 @@ dependencies = [ "getrandom 0.1.15", ] +[[package]] +name = "rand_core" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" +dependencies = [ + "getrandom 0.2.2", +] + [[package]] name = "rand_hc" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", +] + +[[package]] +name = "rand_hc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" +dependencies = [ + "rand_core 0.6.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e7cb250..79a15b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ jemallocator = "0.3.2" lazy_static = "1.4.0" memmap = "0.7.0" pin-project = "1.0.2" +rand = "0.8.3" serde = {version = "1.0.118", features= ["derive"]} serde_cbor = "0.11.1" serde_json = "1.0.60" diff --git a/src/ext/defer_drop.rs b/src/ext/defer_drop.rs index b4fdf4f..8157e69 100644 --- a/src/ext/defer_drop.rs +++ b/src/ext/defer_drop.rs @@ -58,7 +58,7 @@ fn defer_drop_sub() -> DeferredDropper static ref TX: Shim = { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - for val in rx.into_iter().lag(Duration::from_millis(10)) + for val in rx.into_iter().lag_sync(Duration::from_millis(10)) { //let _ = thread::spawn(move || drop(val)).join(); // To catch panic? drop(val); // What if this panics? diff --git a/src/ext/hashers.rs b/src/ext/hashers.rs index f546b0f..a797952 100644 --- a/src/ext/hashers.rs +++ b/src/ext/hashers.rs @@ -27,7 +27,7 @@ impl Hasher for Sha256TopHasher { #[inline] fn finish(&self) -> u64 { let mut bytes = [0u8; std::mem::size_of::()]; - crate::slice::copy_bytes(self.0.as_ref(), &mut bytes[..]); + bytes::memcpy(self.0.as_ref(), &mut bytes[..]); u64::from_le_bytes(bytes) } #[inline] fn write(&mut self, bytes: &[u8]) { diff --git a/src/ext/lag.rs b/src/ext/lag.rs index 5632ee5..f429483 100644 --- a/src/ext/lag.rs +++ b/src/ext/lag.rs @@ -1,6 +1,7 @@ //! Lagging iterators / (todo) streams. use super::*; use std::time::Duration; +use tokio::time; /// A lagged iterator #[derive(Debug, Clone)] @@ -31,15 +32,30 @@ where I: Iterator } } +pub trait LagStreamExt: Stream + Sized +{ + /// Throttle a `Stream` by this duration. + fn lag(self, dur: Duration) -> time::Throttle; +} + +impl LagStreamExt for T +where T: Stream +{ + #[inline] fn lag(self, dur: Duration) -> time::Throttle + { + time::throttle(dur, self) + } +} + pub trait LagIterExt: Iterator { - fn lag(self, dur: Duration) -> Lag; + fn lag_sync(self, dur: Duration) -> Lag; } impl LagIterExt for I where I: Iterator { - #[inline] fn lag(self, dur: Duration) -> Lag + #[inline] fn lag_sync(self, dur: Duration) -> Lag { Lag(dur, self) } @@ -72,3 +88,20 @@ where I: Iterator + DoubleEndedIterator self.1.next() } } + + +pub trait JitterExt +{ + /// Produce a random value between `self.0` and `self.1` inclusive + fn jitter(self) -> T; +} + +impl JitterExt for (T, T) +where T: rand::distributions::uniform::SampleUniform +{ + fn jitter(self) -> T + { + util::jitter(self.0, self.1) + } +} + diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 1e4e679..832c4c3 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -1,3 +1,4 @@ +use super::*; use std::iter::FusedIterator; use std::collections::BTreeSet; use std::borrow::Borrow; @@ -6,6 +7,13 @@ use futures::prelude::*; use std::cell::RefCell; use std::ptr; +/// General utilities +pub mod util; + +/// Functions for manipulating byte slices +pub mod bytes; + +// Internal modules mod iters; pub use iters::*; @@ -18,15 +26,13 @@ pub use hashers::*; mod hex; pub use hex::*; -/// Functions for manipulating byte slices -pub mod bytes; - mod lag; pub use lag::*; mod defer_drop; pub use defer_drop::*; +// The extension traits are defined in this file, no need to re-export anything from here. pub mod chunking; /// How many elements should `precollect` allocate on the stack before spilling to the heap. diff --git a/src/ext/util.rs b/src/ext/util.rs new file mode 100644 index 0000000..27a65cd --- /dev/null +++ b/src/ext/util.rs @@ -0,0 +1,14 @@ +//! Misc. Utilites +use super::*; + +/// Get a random value between these two inclusive +pub fn jitter(min: T, max: T) -> T + where T: rand::distributions::uniform::SampleUniform +{ + use rand::Rng; + + let mut thread = rand::thread_rng(); + let dist = rand::distributions::Uniform::new_inclusive(min, max); + + thread.sample(dist) +} diff --git a/src/main.rs b/src/main.rs index 52749e4..23a61fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,6 @@ static GLOBAL: Jemalloc = Jemalloc; mod ext; use ext::*; -mod slice; - // Real stuff mod service; diff --git a/src/service/command.rs b/src/service/command.rs index 5be3e98..ecb6636 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -8,6 +8,19 @@ use tokio::sync::{ id_type!(pub CommandID: "ID of a sent command"); +bitflags! { + /// The requirements this command has + #[derive(Default)] + #[allow(non_snake_case)] + pub(super) struct CommandFlags: u64 + { + /// Requires nothing + const NONE = 0; + /// Requires global service lock + const GLOBAL_LOCK = 1<<0; + } +} + /// A response from the service for a sent command pub type Response = Box; @@ -17,6 +30,7 @@ pub enum CommandKind { } +//TODO: Add metadata map entry of `CommandFlags` for each disctiminant of `CommandKind` somehow. (maybe a proc-macro?) /// A command to send to a running service. /// @@ -25,6 +39,7 @@ pub enum CommandKind pub struct Command { id: CommandID, + flags: CommandFlags, //kind: CommandKind, // `CommandKind` -(sent to)> -(handle returned from send func)> `Command` resp: oneshot::Receiver>, diff --git a/src/service/config.rs b/src/service/config.rs index eaa28c9..ae60eda 100644 --- a/src/service/config.rs +++ b/src/service/config.rs @@ -49,8 +49,15 @@ pub struct ServiceSettings pub req_dispatch_force_timeout: Option, /// How long to wait before processing batches of requests pub req_dispatch_delay: Option, - /// Random delay between request batch processing - pub req_dispatch_jitter: Option<(Duration, Duration)>, + /// Random **millisecond** delay bounds between request batch processing + pub req_dispatch_jitter: Option<(i64, i64)>, + /// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks. + /// This may result in commands being processed out-of-order sometimes. + /// + /// If this is `false`, the lock will be acquired for each block that contains a locking command, then each command will be processed in order within the current task, instead of being filtered and dispatched to the lock-held and the non-lock-held task. + // For filtering, we can create a type that implements `From>` and use it as the chunk collection type, maybe, possibly. Idk. + // Or we can just make a multiplexing mpsc wrapper i guess... + pub req_dispatch_internal_filter: bool, // Default: true } //TODO: impl Default for ServiceSettings diff --git a/src/service/mod.rs b/src/service/mod.rs index 9d893ab..e8764c6 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -15,6 +15,8 @@ use tokio::{ pub mod command; use command::{ CommandKind, + CommandFlags, + CommandID, }; @@ -41,7 +43,6 @@ struct ChannelInner opt: Box, } - /// The side of a `command::Command` that the running service sees. #[derive(Debug)] struct Request @@ -51,6 +52,12 @@ struct Request /// The actual command sent by the user. kind: CommandKind, + /// The metadata flags of this `CommandKind`. + /// + /// This is looked up by the sender (user), not the receiver (service) to save service batch processing time. + /// Although the lookup should be extremely fast. + metadata: CommandFlags, + /// Optional response sender. /// /// Just dropping this is the same as sending `None` as far as the user sees. diff --git a/src/slice.rs b/src/slice.rs deleted file mode 100644 index 0542c51..0000000 --- a/src/slice.rs +++ /dev/null @@ -1,6 +0,0 @@ -use super::*; - -pub use crate::ext::bytes::{ - memcpy as copy_bytes, - memmove as move_bytes, -};