`ThrottleAdaptor`: Added throttleing `Write` impl.

Added Config -> State -> Adaptor pipeline.

TODO: `impl Read for ThrottleAdaptor<S: Read, ...>`

TODO: Arg parsing into `Config`

Fortune for throttle's current commit: Small blessing − 小吉
master
Avril 3 years ago
parent 07764d4deb
commit 0a065ed2b6
Signed by: flanchan
GPG Key ID: 284488987C31F630

41
Cargo.lock generated

@ -2,6 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -61,11 +67,46 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "stackalloc"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4f5c9dd3feb8a4adc8eae861e5f48862a92f9a9f38cf8fc99b92fc6ec016121"
dependencies = [
"cc",
"rustc_version",
]
[[package]] [[package]]
name = "throttle" name = "throttle"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"rand", "rand",
"stackalloc",
] ]
[[package]] [[package]]

@ -1,5 +1,6 @@
[package] [package]
name = "throttle" name = "throttle"
description="pipes stdin to stdout with a throttle (slowdown) rate specified by user"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
@ -7,3 +8,4 @@ edition = "2021"
[dependencies] [dependencies]
rand = "0.8.5" rand = "0.8.5"
stackalloc = "1.1.1"

@ -0,0 +1,82 @@
//! Configuration
use super::*;
use prov::prelude::*;
/// Application state
pub struct State
{
pub throttle_generator: Option<Box<dyn DynThrottleProvider<'static> + 'static>>,
pub buffer_size: Box<dyn BufferProvider + 'static>,
}
impl State
{
/// Convert into an instance that can be used to create a `StateThrottleAdaptor`
#[inline(always)]
pub fn with_stream<S>(self, stream: S) -> (Self, S)
{
(self, stream)
}
}
/// Configuration for application
#[derive(Debug, Clone)]
pub struct Config
{
pub throttle: Option<(Duration, Option<Duration>)>,
pub buffer_size: Option<(usize, Option<usize>)>,
}
impl From<Config> for State
{
#[inline]
fn from(from: Config) -> Self
{
Self {
throttle_generator: if let Some(throttle) = from.throttle {
Some(match throttle {
(Duration::ZERO, Some(Duration::ZERO) | None) => Box::new(NoThrottleProvider),
(low, Some(high)) if low == high => Box::new(SingleThrottleProvider::from(low)),
(low, Some(high)) => Box::new(UniformThrottleProvider::from(low..high)),
(low, _) => Box::new(SingleThrottleProvider::from(low)),
})
} else {
None
},
buffer_size: if let Some(size) = from.buffer_size {
match size {
(0, Some(0) | None) => Box::new(NoBufferProvider),
(low, Some(high)) if low == high => Box::new(low),
(low, Some(high)) => Box::new(UniformBufferProvider::from(low..high)),
(low, _) => Box::new(low)
}
} else {
Box::new(DefaultBufferSize)
}
}
}
}
impl Default for Config
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
impl Config
{
/// Create an empty new config
#[inline(always)]
pub const fn new() -> Self
{
Self {
throttle: None,
buffer_size: None,
}
}
}

@ -1,4 +1,5 @@
mod conf;
mod prov; mod prov;
mod stream; mod stream;

@ -1,9 +1,117 @@
//! Throttle provider //! Throttle provider
use std::time::Duration;
use std::iter; use std::iter;
use std::num::NonZeroUsize;
/// Buffer size for `DefaultBufferSize`
pub const DEFAULT_BUFFER_SIZE: usize = 4096;
/// `BufferProvider` that only provides the `DEFAULT_BUFFER_SIZE`.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub struct DefaultBufferSize;
/// `BufferProvider` that never provides a chunking buffer size.
///
/// Therefore, buffers are always consumed as-is.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NoBufferProvider;
impl BufferProvider for NoBufferProvider
{
#[inline(always)]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
None
}
}
impl BufferProvider for DefaultBufferSize
{
#[inline(always)]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
NonZeroUsize::new(DEFAULT_BUFFER_SIZE)
}
}
/// The type that dictates the amount of time to wait between each read.
pub type Duration = std::time::Duration;
pub trait BufferProvider
{
/// Try to get the next buffer size.
///
/// # Implementations
/// * *Should* always return `Some` *at least* once with no upper bound on the number of `Some`s that can be returned.
/// * *Should* return `Some` until there are no more left (*can* be infinite.)
/// * After a `None`, there **must never** be another non-`None` value.
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize>;
}
/// Iterator adaptor for `BufferProvider`'s `gen_next_buffer_size()`.
#[derive(Debug, Clone)]
pub struct BufferProviderIter<T: ?Sized>(T);
impl<T: ?Sized + BufferProvider> Iterator for BufferProviderIter<T>
{
type Item = NonZeroUsize;
#[inline]
fn next(&mut self) -> Option<Self::Item>
{
self.0.get_next_buffer_size()
}
}
pub trait BufferProviderIterExt<'a>
{
type Iter: iter::Iterator<Item= NonZeroUsize> + 'a;
fn get_all_buffer_sizes(self) -> Self::Iter;
}
pub trait BufferProviderDynIterExt<'a>
{
fn gen_all_buffer_sizes(self: Box<Self>) -> Box<dyn Iterator<Item = NonZeroUsize> +'a>;
}
impl<'a, T: BufferProvider + 'a> BufferProviderDynIterExt<'a> for T
{
#[inline]
fn gen_all_buffer_sizes(self: Box<Self>) -> Box<dyn Iterator<Item = NonZeroUsize> +'a> {
Box::new(BufferProviderIter(*self))
}
}
impl<'a, T: BufferProvider + 'a> BufferProviderIterExt<'a> for T
{
type Iter = BufferProviderIter<T>;
#[inline(always)]
fn get_all_buffer_sizes(self) -> Self::Iter {
BufferProviderIter(self)
}
}
impl BufferProvider for usize
{
#[inline(always)]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
NonZeroUsize::new(*self)
}
}
impl BufferProvider for (usize, usize)
{
#[inline]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
use rand::prelude::*;
NonZeroUsize::new(rand::thread_rng().gen_range(self.0..self.1))
}
}
pub trait DurationProvider pub trait DurationProvider
{ {
/// Try to get the next `Duration` from this provider. If there is not one available, `None` will be returned.
///
/// # Implementations
/// * *Should* always return `Some` *at least* once with no upper bound on the number of `Some`s that can be returned.
/// * *Should* return `Some` until there are no more left (*can* be infinite.)
/// * After a `None`, there **must never** be another non-`None` value.
fn get_next_duration(&mut self) -> Option<Duration>; fn get_next_duration(&mut self) -> Option<Duration>;
} }
@ -87,20 +195,47 @@ pub trait ThrottleProvider
} }
pub trait DynThrottleProvider pub trait DynThrottleProvider<'a>
{ {
fn get_timeout(&self) -> Box<dyn DurationProvider + '_>; fn get_timeout(&self) -> Box<dyn DurationProvider + 'a>;
} }
impl<T: ?Sized> DynThrottleProvider for T
where T: ThrottleProvider impl<'a, T: ?Sized> DynThrottleProvider<'a> for T
where T: ThrottleProvider,
T::Timer: 'a
{ {
#[inline(always)] #[inline(always)]
fn get_timeout(&self) -> Box<dyn DurationProvider + '_> fn get_timeout(&self) -> Box<dyn DurationProvider + 'a>
{ {
Box::new(ThrottleProvider::get_timeout(&self)) Box::new(ThrottleProvider::get_timeout(&self))
} }
} }
impl<'a> DurationProvider for Box<dyn DurationProvider + 'a>
{
#[inline(always)]
fn get_next_duration(&mut self) -> Option<Duration> {
(**self).get_next_duration()
}
}
impl<'a> ThrottleProvider for Box<dyn DynThrottleProvider<'a> + 'a>
{
type Timer = Box<dyn DurationProvider + 'a>;
#[inline(always)]
fn get_timeout(&self) -> Self::Timer {
DynThrottleProvider::get_timeout(self)
}
}
impl<'a> BufferProvider for Box<dyn BufferProvider + 'a>
{
#[inline(always)]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
(**self).get_next_buffer_size()
}
}
impl<'a, T: ?Sized> ThrottleProvider for &'a T impl<'a, T: ?Sized> ThrottleProvider for &'a T
where T: ThrottleProvider where T: ThrottleProvider
{ {
@ -119,10 +254,114 @@ impl ThrottleProvider for Duration
} }
} }
/// Provides a buffer size adaptor that returns a buffer size from a uniformly-distributed `BufferProvider` range each time `get_next_buffer_size()` is called.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub struct UniformBufferProvider<T: ?Sized, R: std::ops::RangeBounds<T>>(R, std::marker::PhantomData<T>);
impl<T: ?Sized, R> From<R> for UniformBufferProvider<T, R>
where T: BufferProvider,
R: std::ops::RangeBounds<T>
{
#[inline(always)]
fn from(from: R) -> Self
{
Self(from, std::marker::PhantomData)
}
}
impl<T: ?Sized, R: Clone> BufferProvider for UniformBufferProvider<T, R>
where T: BufferProvider + rand::distributions::uniform::SampleUniform,
R: std::ops::RangeBounds<T>+ rand::distributions::uniform::SampleRange<T>,
//for<'r> &'r R: ,
{
#[inline]
fn get_next_buffer_size(&mut self) -> Option<NonZeroUsize> {
use rand::prelude::*;
rand::thread_rng().gen_range(self.0.clone()).get_next_buffer_size()
}
}
/// Provides a duration adaptor that returns a `Duration` from a uniformly-distributed `T` range each time `get_next_duration()` is called.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub struct UniformDurationProvider<T: ?Sized, R: std::ops::RangeBounds<T>>(R, std::marker::PhantomData<T>);
impl<T: ?Sized, R> From<R> for UniformDurationProvider<T, R>
where T: DurationProvider,
R: std::ops::RangeBounds<T>
{
#[inline(always)]
fn from(from: R) -> Self
{
Self(from, std::marker::PhantomData)
}
}
impl<T: ?Sized, R: Clone> DurationProvider for UniformDurationProvider<T, R>
where T: DurationProvider + rand::distributions::uniform::SampleUniform,
R: std::ops::RangeBounds<T>+ rand::distributions::uniform::SampleRange<T>,
//for<'r> &'r R: rand::distributions::uniform::SampleRange<T>
{
#[inline]
fn get_next_duration(&mut self) -> Option<Duration> {
use rand::prelude::*;
rand::thread_rng().gen_range(self.0.clone()).get_next_duration()
}
}
/// Provides a throttle adaptor that returns a uniformly-distributed `T` each time `get_timeout()` is called. /// Provides a throttle adaptor that returns a uniformly-distributed `T` each time `get_timeout()` is called.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct UniformThrottleProvider<T: ?Sized, R: std::ops::RangeBounds<T>>(R, std::marker::PhantomData<T>); pub struct UniformThrottleProvider<T: ?Sized, R: std::ops::RangeBounds<T>>(R, std::marker::PhantomData<T>);
/// Provides a single `DurationProvider` on each call to `get_timeout()`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SingleThrottleProvider<D>(D);
/// Always provides no timeout duration.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NoThrottleProvider;
/// Never provides a timeout duration
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NoDurationProvider;
impl DurationProvider for NoDurationProvider
{
#[inline(always)]
fn get_next_duration(&mut self) -> Option<Duration> {
None
}
}
impl ThrottleProvider for NoThrottleProvider
{
type Timer = NoDurationProvider;
#[inline(always)]
fn get_timeout(&self) -> Self::Timer {
NoDurationProvider
}
}
impl<D: Clone + DurationProvider> ThrottleProvider for SingleThrottleProvider<D>
{
type Timer = D;
#[inline(always)]
fn get_timeout(&self) -> Self::Timer {
self.0.clone()
}
}
impl From<Duration> for SingleThrottleProvider<Duration>
{
#[inline(always)]
fn from(from: Duration) -> Self
{
Self(from)
}
}
impl<T: ?Sized, R> From<R> for UniformThrottleProvider<T, R> impl<T: ?Sized, R> From<R> for UniformThrottleProvider<T, R>
where T: DurationProvider, where T: DurationProvider,
R: std::ops::RangeBounds<T> R: std::ops::RangeBounds<T>
@ -134,15 +373,48 @@ where T: DurationProvider,
} }
} }
impl<T, R> ThrottleProvider for UniformThrottleProvider<T, R> impl<T, R: Clone> ThrottleProvider for UniformThrottleProvider<T, R>
where T: DurationProvider + rand::distributions::uniform::SampleUniform, where T: DurationProvider + rand::distributions::uniform::SampleUniform,
R: std::ops::RangeBounds<T>, R: std::ops::RangeBounds<T> + rand::distributions::uniform::SampleRange<T>,
for <'r> &'r R: rand::distributions::uniform::SampleRange<T>, //for <'r> &'r R: rand::distributions::uniform::SampleRange<T>,
{ {
type Timer = T; type Timer = T;
#[inline] #[inline]
fn get_timeout(&self) -> Self::Timer { fn get_timeout(&self) -> Self::Timer {
use rand::prelude::*; use rand::prelude::*;
rand::thread_rng().gen_range(&self.0) rand::thread_rng().gen_range(self.0.clone())
} }
} }
pub mod prelude
{
pub use super::{
ThrottleProvider,
DynThrottleProvider,
Duration,
DurationProvider,
BufferProvider,
DurationProviderExt as _,
DurationProviderIterExt as _,
BufferProviderIterExt as _,
BufferProviderDynIterExt as _,
DurationProviderIter,
BufferProviderIter,
UniformDurationProvider,
UniformThrottleProvider,
UniformBufferProvider,
SingleThrottleProvider,
DefaultBufferSize,
NoThrottleProvider,
NoDurationProvider,
NoBufferProvider,
};
}

@ -1,9 +1,114 @@
//! Stream adaptors //! Stream adaptors
use super::*; use super::*;
use std::{
io::{
self,
Read,
Write,
},
};
use prov::prelude::*;
/// Adaptor over a reader/writer that can chunk and throttle either way.
#[derive(Debug)]
pub struct ThrottleAdaptor<S: ?Sized, T: ThrottleProvider, B: BufferProvider>
{
timeout_provider: T,
buffer_size_provier: B,
stream: S
}
/// `ThrottleAdaptor` with configuration created from a `conf::State`
pub type StateThrottleAdaptor<S> = ThrottleAdaptor<S, Box<dyn DynThrottleProvider<'static> + 'static>, Box<dyn BufferProvider + 'static>>;
impl<S> StateThrottleAdaptor<S>
{
/// Create a new instance from `conf::State`
#[inline]
pub fn new_from_state(state: conf::State, stream: S) -> Self
{
Self {
timeout_provider: state.throttle_generator.unwrap_or_else(|| Box::new(NoThrottleProvider)),
buffer_size_provier: state.buffer_size,
stream,
}
}
}
impl<S: Read + Write> From<(conf::State, S)> for StateThrottleAdaptor<S>
{
#[inline(always)]
fn from((state, stream): (conf::State, S)) -> Self
{
Self::new_from_state(state, stream)
}
}
impl<S, T: ThrottleProvider, B: BufferProvider> ThrottleAdaptor<S, T, B>
{
/// Create a new adaptor instance from these providers over this stream
#[inline]
pub fn new(throttle: T, buffer: B, stream: S) -> Self
{
Self {
timeout_provider: throttle,
buffer_size_provier: buffer,
stream,
}
}
}
#[inline]
fn write_buf_with_timeout(writer: &mut (impl Write + ?Sized), timeout: &mut (impl DurationProvider + ?Sized), mut buf: &[u8]) -> io::Result<usize>
{
let mut sz = 0;
while !buf.is_empty() {
// Wait
if let Some(dur) = timeout.get_next_duration() {
std::thread::sleep(dur);
}
// Write
let w = writer.write(buf)?;
sz += w;
if w != buf.len() {
// Update buffer
buf = &buf[w..];
} else {
// Whole buffer written, we're done
break;
}
}
Ok(sz)
}
pub struct ThrottleAdaptor<R, T> impl<W: ?Sized + Write, T: ThrottleProvider, B: BufferProvider> Write for ThrottleAdaptor<W, T, B>
{ {
reader: R, fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
timeout_provider: T let buffsz = self.buffer_size_provier.get_next_buffer_size().map(|x| x.get());
let mut timeout = self.timeout_provider.get_timeout();
Ok(match buffsz {
// Chunk buffer
Some(buffsz) => {
buf.chunks(buffsz).map(|buf| {
write_buf_with_timeout(&mut self.stream, &mut timeout, buf)
}).try_fold(0usize, |a, b| b.map(|b| b + a))?
},
_ => write_buf_with_timeout(&mut self.stream, &mut timeout, buf)?
})
//self.reader.read(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<R: ?Sized + Read, T: ThrottleProvider, B: BufferProvider> Read for ThrottleAdaptor<R, T, B>
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
todo!("read_buf_with_timeout()")
}
} }

Loading…
Cancel
Save