added jitter, lag

new
Avril 4 years ago
parent dacbea8023
commit 5a2c30520f
Signed by: flanchan
GPG Key ID: 284488987C31F630

55
Cargo.lock generated

@ -519,6 +519,7 @@ dependencies = [
"lazy_static", "lazy_static",
"memmap", "memmap",
"pin-project", "pin-project",
"rand 0.8.3",
"rustc_version", "rustc_version",
"serde", "serde",
"serde_cbor", "serde_cbor",
@ -598,8 +599,8 @@ dependencies = [
"base64 0.12.3", "base64 0.12.3",
"crypto-mac", "crypto-mac",
"hmac", "hmac",
"rand", "rand 0.7.3",
"rand_core", "rand_core 0.5.1",
"sha2", "sha2",
"subtle", "subtle",
] ]
@ -684,9 +685,21 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [ dependencies = [
"rand_chacha", "rand_chacha 0.2.2",
"rand_core", "rand_core 0.5.1",
"rand_hc", "rand_hc 0.2.0",
]
[[package]]
name = "rand"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha 0.3.0",
"rand_core 0.6.2",
"rand_hc 0.3.0",
] ]
[[package]] [[package]]
@ -696,7 +709,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core", "rand_core 0.5.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core 0.6.2",
] ]
[[package]] [[package]]
@ -708,13 +731,31 @@ dependencies = [
"getrandom 0.1.15", "getrandom 0.1.15",
] ]
[[package]]
name = "rand_core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom 0.2.2",
]
[[package]] [[package]]
name = "rand_hc" name = "rand_hc"
version = "0.2.0" version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [ dependencies = [
"rand_core", "rand_core 0.5.1",
]
[[package]]
name = "rand_hc"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core 0.6.2",
] ]
[[package]] [[package]]

@ -19,6 +19,7 @@ jemallocator = "0.3.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
memmap = "0.7.0" memmap = "0.7.0"
pin-project = "1.0.2" pin-project = "1.0.2"
rand = "0.8.3"
serde = {version = "1.0.118", features= ["derive"]} serde = {version = "1.0.118", features= ["derive"]}
serde_cbor = "0.11.1" serde_cbor = "0.11.1"
serde_json = "1.0.60" serde_json = "1.0.60"

@ -58,7 +58,7 @@ fn defer_drop_sub() -> DeferredDropper
static ref TX: Shim = { static ref TX: Shim = {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
for val in rx.into_iter().lag(Duration::from_millis(10)) for val in rx.into_iter().lag_sync(Duration::from_millis(10))
{ {
//let _ = thread::spawn(move || drop(val)).join(); // To catch panic? //let _ = thread::spawn(move || drop(val)).join(); // To catch panic?
drop(val); // What if this panics? drop(val); // What if this panics?

@ -27,7 +27,7 @@ impl Hasher for Sha256TopHasher
{ {
#[inline] fn finish(&self) -> u64 { #[inline] fn finish(&self) -> u64 {
let mut bytes = [0u8; std::mem::size_of::<u64>()]; let mut bytes = [0u8; std::mem::size_of::<u64>()];
crate::slice::copy_bytes(self.0.as_ref(), &mut bytes[..]); bytes::memcpy(self.0.as_ref(), &mut bytes[..]);
u64::from_le_bytes(bytes) u64::from_le_bytes(bytes)
} }
#[inline] fn write(&mut self, bytes: &[u8]) { #[inline] fn write(&mut self, bytes: &[u8]) {

@ -1,6 +1,7 @@
//! Lagging iterators / (todo) streams. //! Lagging iterators / (todo) streams.
use super::*; use super::*;
use std::time::Duration; use std::time::Duration;
use tokio::time;
/// A lagged iterator /// A lagged iterator
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -31,15 +32,30 @@ where I: Iterator<Item=T>
} }
} }
pub trait LagStreamExt: Stream + Sized
{
/// Throttle a `Stream` by this duration.
fn lag(self, dur: Duration) -> time::Throttle<Self>;
}
impl<T> LagStreamExt for T
where T: Stream
{
#[inline] fn lag(self, dur: Duration) -> time::Throttle<Self>
{
time::throttle(dur, self)
}
}
pub trait LagIterExt: Iterator pub trait LagIterExt: Iterator
{ {
fn lag(self, dur: Duration) -> Lag<Self, Self::Item>; fn lag_sync(self, dur: Duration) -> Lag<Self, Self::Item>;
} }
impl<I> LagIterExt for I impl<I> LagIterExt for I
where I: Iterator where I: Iterator
{ {
#[inline] fn lag(self, dur: Duration) -> Lag<Self, Self::Item> #[inline] fn lag_sync(self, dur: Duration) -> Lag<Self, Self::Item>
{ {
Lag(dur, self) Lag(dur, self)
} }
@ -72,3 +88,20 @@ where I: Iterator<Item=T> + DoubleEndedIterator
self.1.next() self.1.next()
} }
} }
pub trait JitterExt<T>
{
/// Produce a random value between `self.0` and `self.1` inclusive
fn jitter(self) -> T;
}
impl<T> JitterExt<T> for (T, T)
where T: rand::distributions::uniform::SampleUniform
{
fn jitter(self) -> T
{
util::jitter(self.0, self.1)
}
}

@ -1,3 +1,4 @@
use super::*;
use std::iter::FusedIterator; use std::iter::FusedIterator;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::borrow::Borrow; use std::borrow::Borrow;
@ -6,6 +7,13 @@ use futures::prelude::*;
use std::cell::RefCell; use std::cell::RefCell;
use std::ptr; use std::ptr;
/// General utilities
pub mod util;
/// Functions for manipulating byte slices
pub mod bytes;
// Internal modules
mod iters; mod iters;
pub use iters::*; pub use iters::*;
@ -18,15 +26,13 @@ pub use hashers::*;
mod hex; mod hex;
pub use hex::*; pub use hex::*;
/// Functions for manipulating byte slices
pub mod bytes;
mod lag; mod lag;
pub use lag::*; pub use lag::*;
mod defer_drop; mod defer_drop;
pub use defer_drop::*; pub use defer_drop::*;
// The extension traits are defined in this file, no need to re-export anything from here.
pub mod chunking; pub mod chunking;
/// How many elements should `precollect` allocate on the stack before spilling to the heap. /// How many elements should `precollect` allocate on the stack before spilling to the heap.

@ -0,0 +1,14 @@
//! Misc. Utilites
use super::*;
/// Get a random value between these two inclusive
pub fn jitter<T>(min: T, max: T) -> T
where T: rand::distributions::uniform::SampleUniform
{
use rand::Rng;
let mut thread = rand::thread_rng();
let dist = rand::distributions::Uniform::new_inclusive(min, max);
thread.sample(dist)
}

@ -19,8 +19,6 @@ static GLOBAL: Jemalloc = Jemalloc;
mod ext; mod ext;
use ext::*; use ext::*;
mod slice;
// Real stuff // Real stuff
mod service; mod service;

@ -8,6 +8,19 @@ use tokio::sync::{
id_type!(pub CommandID: "ID of a sent command"); id_type!(pub CommandID: "ID of a sent command");
bitflags! {
/// The requirements this command has
#[derive(Default)]
#[allow(non_snake_case)]
pub(super) struct CommandFlags: u64
{
/// Requires nothing
const NONE = 0;
/// Requires global service lock
const GLOBAL_LOCK = 1<<0;
}
}
/// A response from the service for a sent command /// A response from the service for a sent command
pub type Response = Box<dyn Any + Send + 'static>; pub type Response = Box<dyn Any + Send + 'static>;
@ -17,6 +30,7 @@ pub enum CommandKind
{ {
} }
//TODO: Add metadata map entry of `CommandFlags` for each disctiminant of `CommandKind` somehow. (maybe a proc-macro?)
/// A command to send to a running service. /// A command to send to a running service.
/// ///
@ -25,6 +39,7 @@ pub enum CommandKind
pub struct Command pub struct Command
{ {
id: CommandID, id: CommandID,
flags: CommandFlags,
//kind: CommandKind, // `CommandKind` -(sent to)> <running service> -(handle returned from send func)> `Command` //kind: CommandKind, // `CommandKind` -(sent to)> <running service> -(handle returned from send func)> `Command`
resp: oneshot::Receiver<Option<Response>>, resp: oneshot::Receiver<Option<Response>>,

@ -49,8 +49,15 @@ pub struct ServiceSettings
pub req_dispatch_force_timeout: Option<Duration>, pub req_dispatch_force_timeout: Option<Duration>,
/// How long to wait before processing batches of requests /// How long to wait before processing batches of requests
pub req_dispatch_delay: Option<Duration>, pub req_dispatch_delay: Option<Duration>,
/// Random delay between request batch processing /// Random **millisecond** delay bounds between request batch processing
pub req_dispatch_jitter: Option<(Duration, Duration)>, pub req_dispatch_jitter: Option<(i64, i64)>,
/// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks.
/// This may result in commands being processed out-of-order sometimes.
///
/// If this is `false`, the lock will be acquired for each block that contains a locking command, then each command will be processed in order within the current task, instead of being filtered and dispatched to the lock-held and the non-lock-held task.
// For filtering, we can create a type that implements `From<Vec<T>>` and use it as the chunk collection type, maybe, possibly. Idk.
// Or we can just make a multiplexing mpsc wrapper i guess...
pub req_dispatch_internal_filter: bool, // Default: true
} }
//TODO: impl Default for ServiceSettings //TODO: impl Default for ServiceSettings

@ -15,6 +15,8 @@ use tokio::{
pub mod command; pub mod command;
use command::{ use command::{
CommandKind, CommandKind,
CommandFlags,
CommandID, CommandID,
}; };
@ -41,7 +43,6 @@ struct ChannelInner
opt: Box<config::ServiceSettings>, opt: Box<config::ServiceSettings>,
} }
/// The side of a `command::Command` that the running service sees. /// The side of a `command::Command` that the running service sees.
#[derive(Debug)] #[derive(Debug)]
struct Request struct Request
@ -51,6 +52,12 @@ struct Request
/// The actual command sent by the user. /// The actual command sent by the user.
kind: CommandKind, kind: CommandKind,
/// The metadata flags of this `CommandKind`.
///
/// This is looked up by the sender (user), not the receiver (service) to save service batch processing time.
/// Although the lookup should be extremely fast.
metadata: CommandFlags,
/// Optional response sender. /// Optional response sender.
/// ///
/// Just dropping this is the same as sending `None` as far as the user sees. /// Just dropping this is the same as sending `None` as far as the user sees.

@ -1,6 +0,0 @@
use super::*;
pub use crate::ext::bytes::{
memcpy as copy_bytes,
memmove as move_bytes,
};
Loading…
Cancel
Save