diff --git a/Cargo.lock b/Cargo.lock index 55f8c56..3340fd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + [[package]] name = "cfg-if" version = "1.0.0" @@ -61,11 +67,46 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver", +] + +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + +[[package]] +name = "stackalloc" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4f5c9dd3feb8a4adc8eae861e5f48862a92f9a9f38cf8fc99b92fc6ec016121" +dependencies = [ + "cc", + "rustc_version", +] + [[package]] name = "throttle" version = "0.1.0" dependencies = [ "rand", + "stackalloc", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4ab93b7..858a303 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "throttle" +description="pipes stdin to stdout with a throttle (slowdown) rate specified by user" version = "0.1.0" edition = "2021" @@ -7,3 +8,4 @@ edition = "2021" [dependencies] rand = "0.8.5" +stackalloc = "1.1.1" diff --git a/src/conf.rs b/src/conf.rs new file mode 100644 index 0000000..8a84bac --- /dev/null +++ b/src/conf.rs @@ -0,0 +1,82 @@ +//! Configuration +use super::*; +use prov::prelude::*; + + +/// Application state +pub struct State +{ + pub throttle_generator: Option + 'static>>, + pub buffer_size: Box, +} + +impl State +{ + /// Convert into an instance that can be used to create a `StateThrottleAdaptor` + #[inline(always)] + pub fn with_stream(self, stream: S) -> (Self, S) + { + (self, stream) + } +} + +/// Configuration for application +#[derive(Debug, Clone)] +pub struct Config +{ + pub throttle: Option<(Duration, Option)>, + pub buffer_size: Option<(usize, Option)>, +} + +impl From for State +{ + #[inline] + fn from(from: Config) -> Self + { + Self { + throttle_generator: if let Some(throttle) = from.throttle { + Some(match throttle { + (Duration::ZERO, Some(Duration::ZERO) | None) => Box::new(NoThrottleProvider), + (low, Some(high)) if low == high => Box::new(SingleThrottleProvider::from(low)), + (low, Some(high)) => Box::new(UniformThrottleProvider::from(low..high)), + (low, _) => Box::new(SingleThrottleProvider::from(low)), + }) + } else { + None + }, + buffer_size: if let Some(size) = from.buffer_size { + match size { + (0, Some(0) | None) => Box::new(NoBufferProvider), + (low, Some(high)) if low == high => Box::new(low), + (low, Some(high)) => Box::new(UniformBufferProvider::from(low..high)), + (low, _) => Box::new(low) + } + } else { + Box::new(DefaultBufferSize) + } + } + } +} + +impl Default for Config +{ + #[inline] + fn default() -> Self + { + Self::new() + } +} + +impl Config +{ + /// Create an empty new config + #[inline(always)] + pub const fn new() -> Self + { + Self { + throttle: None, + buffer_size: None, + } + } +} + diff --git a/src/main.rs b/src/main.rs index bf49002..a0ed8f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ +mod conf; mod prov; mod stream; diff --git a/src/prov.rs b/src/prov.rs index 5122b8f..7719710 100644 --- a/src/prov.rs +++ b/src/prov.rs @@ -1,9 +1,117 @@ //! Throttle provider -use std::time::Duration; use std::iter; +use std::num::NonZeroUsize; + +/// Buffer size for `DefaultBufferSize` +pub const DEFAULT_BUFFER_SIZE: usize = 4096; + +/// `BufferProvider` that only provides the `DEFAULT_BUFFER_SIZE`. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] +pub struct DefaultBufferSize; + +/// `BufferProvider` that never provides a chunking buffer size. +/// +/// Therefore, buffers are always consumed as-is. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct NoBufferProvider; + +impl BufferProvider for NoBufferProvider +{ + #[inline(always)] + fn get_next_buffer_size(&mut self) -> Option { + None + } +} + +impl BufferProvider for DefaultBufferSize +{ + #[inline(always)] + fn get_next_buffer_size(&mut self) -> Option { + NonZeroUsize::new(DEFAULT_BUFFER_SIZE) + } +} + +/// The type that dictates the amount of time to wait between each read. +pub type Duration = std::time::Duration; + +pub trait BufferProvider +{ + /// Try to get the next buffer size. + /// + /// # Implementations + /// * *Should* always return `Some` *at least* once with no upper bound on the number of `Some`s that can be returned. + /// * *Should* return `Some` until there are no more left (*can* be infinite.) + /// * After a `None`, there **must never** be another non-`None` value. + fn get_next_buffer_size(&mut self) -> Option; +} + +/// Iterator adaptor for `BufferProvider`'s `gen_next_buffer_size()`. +#[derive(Debug, Clone)] +pub struct BufferProviderIter(T); + +impl Iterator for BufferProviderIter +{ + type Item = NonZeroUsize; + #[inline] + fn next(&mut self) -> Option + { + self.0.get_next_buffer_size() + } +} + +pub trait BufferProviderIterExt<'a> +{ + type Iter: iter::Iterator + 'a; + fn get_all_buffer_sizes(self) -> Self::Iter; +} + +pub trait BufferProviderDynIterExt<'a> +{ + fn gen_all_buffer_sizes(self: Box) -> Box +'a>; +} + +impl<'a, T: BufferProvider + 'a> BufferProviderDynIterExt<'a> for T +{ + #[inline] + fn gen_all_buffer_sizes(self: Box) -> Box +'a> { + Box::new(BufferProviderIter(*self)) + } +} + +impl<'a, T: BufferProvider + 'a> BufferProviderIterExt<'a> for T +{ + type Iter = BufferProviderIter; + #[inline(always)] + fn get_all_buffer_sizes(self) -> Self::Iter { + BufferProviderIter(self) + } +} + +impl BufferProvider for usize +{ + #[inline(always)] + fn get_next_buffer_size(&mut self) -> Option { + NonZeroUsize::new(*self) + } +} + +impl BufferProvider for (usize, usize) +{ + #[inline] + fn get_next_buffer_size(&mut self) -> Option { + use rand::prelude::*; + NonZeroUsize::new(rand::thread_rng().gen_range(self.0..self.1)) + } +} pub trait DurationProvider { + /// Try to get the next `Duration` from this provider. If there is not one available, `None` will be returned. + /// + /// # Implementations + /// * *Should* always return `Some` *at least* once with no upper bound on the number of `Some`s that can be returned. + /// * *Should* return `Some` until there are no more left (*can* be infinite.) + /// * After a `None`, there **must never** be another non-`None` value. fn get_next_duration(&mut self) -> Option; } @@ -87,20 +195,47 @@ pub trait ThrottleProvider } -pub trait DynThrottleProvider +pub trait DynThrottleProvider<'a> { - fn get_timeout(&self) -> Box; + fn get_timeout(&self) -> Box; } -impl DynThrottleProvider for T -where T: ThrottleProvider + +impl<'a, T: ?Sized> DynThrottleProvider<'a> for T +where T: ThrottleProvider, + T::Timer: 'a { #[inline(always)] - fn get_timeout(&self) -> Box + fn get_timeout(&self) -> Box { Box::new(ThrottleProvider::get_timeout(&self)) } } +impl<'a> DurationProvider for Box +{ + #[inline(always)] + fn get_next_duration(&mut self) -> Option { + (**self).get_next_duration() + } +} + +impl<'a> ThrottleProvider for Box + 'a> +{ + type Timer = Box; + #[inline(always)] + fn get_timeout(&self) -> Self::Timer { + DynThrottleProvider::get_timeout(self) + } +} + +impl<'a> BufferProvider for Box +{ + #[inline(always)] + fn get_next_buffer_size(&mut self) -> Option { + (**self).get_next_buffer_size() + } +} + impl<'a, T: ?Sized> ThrottleProvider for &'a T where T: ThrottleProvider { @@ -119,10 +254,114 @@ impl ThrottleProvider for Duration } } +/// Provides a buffer size adaptor that returns a buffer size from a uniformly-distributed `BufferProvider` range each time `get_next_buffer_size()` is called. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] +pub struct UniformBufferProvider>(R, std::marker::PhantomData); + +impl From for UniformBufferProvider +where T: BufferProvider, + R: std::ops::RangeBounds +{ + #[inline(always)] + fn from(from: R) -> Self + { + Self(from, std::marker::PhantomData) + } +} + + +impl BufferProvider for UniformBufferProvider +where T: BufferProvider + rand::distributions::uniform::SampleUniform, + R: std::ops::RangeBounds+ rand::distributions::uniform::SampleRange, +//for<'r> &'r R: , +{ + #[inline] + fn get_next_buffer_size(&mut self) -> Option { + use rand::prelude::*; + rand::thread_rng().gen_range(self.0.clone()).get_next_buffer_size() + } +} + +/// Provides a duration adaptor that returns a `Duration` from a uniformly-distributed `T` range each time `get_next_duration()` is called. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] +pub struct UniformDurationProvider>(R, std::marker::PhantomData); + +impl From for UniformDurationProvider +where T: DurationProvider, + R: std::ops::RangeBounds +{ + #[inline(always)] + fn from(from: R) -> Self + { + Self(from, std::marker::PhantomData) + } +} + + +impl DurationProvider for UniformDurationProvider +where T: DurationProvider + rand::distributions::uniform::SampleUniform, + R: std::ops::RangeBounds+ rand::distributions::uniform::SampleRange, +//for<'r> &'r R: rand::distributions::uniform::SampleRange +{ + #[inline] + fn get_next_duration(&mut self) -> Option { + use rand::prelude::*; + rand::thread_rng().gen_range(self.0.clone()).get_next_duration() + } +} + /// Provides a throttle adaptor that returns a uniformly-distributed `T` each time `get_timeout()` is called. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct UniformThrottleProvider>(R, std::marker::PhantomData); +/// Provides a single `DurationProvider` on each call to `get_timeout()`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SingleThrottleProvider(D); + +/// Always provides no timeout duration. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct NoThrottleProvider; + +/// Never provides a timeout duration +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct NoDurationProvider; + +impl DurationProvider for NoDurationProvider +{ + #[inline(always)] + fn get_next_duration(&mut self) -> Option { + None + } +} + +impl ThrottleProvider for NoThrottleProvider +{ + type Timer = NoDurationProvider; + #[inline(always)] + fn get_timeout(&self) -> Self::Timer { + NoDurationProvider + } +} + +impl ThrottleProvider for SingleThrottleProvider +{ + type Timer = D; + #[inline(always)] + fn get_timeout(&self) -> Self::Timer { + self.0.clone() + } +} + +impl From for SingleThrottleProvider +{ + #[inline(always)] + fn from(from: Duration) -> Self + { + Self(from) + } +} + + impl From for UniformThrottleProvider where T: DurationProvider, R: std::ops::RangeBounds @@ -134,15 +373,48 @@ where T: DurationProvider, } } -impl ThrottleProvider for UniformThrottleProvider +impl ThrottleProvider for UniformThrottleProvider where T: DurationProvider + rand::distributions::uniform::SampleUniform, - R: std::ops::RangeBounds, -for <'r> &'r R: rand::distributions::uniform::SampleRange, + R: std::ops::RangeBounds + rand::distributions::uniform::SampleRange, +//for <'r> &'r R: rand::distributions::uniform::SampleRange, { type Timer = T; #[inline] fn get_timeout(&self) -> Self::Timer { use rand::prelude::*; - rand::thread_rng().gen_range(&self.0) + rand::thread_rng().gen_range(self.0.clone()) } } + +pub mod prelude +{ + pub use super::{ + ThrottleProvider, + DynThrottleProvider, + + Duration, + DurationProvider, + + BufferProvider, + + DurationProviderExt as _, + DurationProviderIterExt as _, + BufferProviderIterExt as _, + BufferProviderDynIterExt as _, + + DurationProviderIter, + BufferProviderIter, + + UniformDurationProvider, + UniformThrottleProvider, + UniformBufferProvider, + + SingleThrottleProvider, + + DefaultBufferSize, + + NoThrottleProvider, + NoDurationProvider, + NoBufferProvider, + }; +} diff --git a/src/stream.rs b/src/stream.rs index c2a9845..bcda6fe 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,9 +1,114 @@ //! Stream adaptors use super::*; +use std::{ + io::{ + self, + Read, + Write, + }, +}; +use prov::prelude::*; +/// Adaptor over a reader/writer that can chunk and throttle either way. +#[derive(Debug)] +pub struct ThrottleAdaptor +{ + timeout_provider: T, + buffer_size_provier: B, -pub struct ThrottleAdaptor + stream: S +} + +/// `ThrottleAdaptor` with configuration created from a `conf::State` +pub type StateThrottleAdaptor = ThrottleAdaptor + 'static>, Box>; + +impl StateThrottleAdaptor +{ + /// Create a new instance from `conf::State` + #[inline] + pub fn new_from_state(state: conf::State, stream: S) -> Self + { + Self { + timeout_provider: state.throttle_generator.unwrap_or_else(|| Box::new(NoThrottleProvider)), + buffer_size_provier: state.buffer_size, + stream, + } + } +} + +impl From<(conf::State, S)> for StateThrottleAdaptor +{ + #[inline(always)] + fn from((state, stream): (conf::State, S)) -> Self + { + Self::new_from_state(state, stream) + } +} + + +impl ThrottleAdaptor +{ + /// Create a new adaptor instance from these providers over this stream + #[inline] + pub fn new(throttle: T, buffer: B, stream: S) -> Self + { + Self { + timeout_provider: throttle, + buffer_size_provier: buffer, + stream, + } + } +} + +#[inline] +fn write_buf_with_timeout(writer: &mut (impl Write + ?Sized), timeout: &mut (impl DurationProvider + ?Sized), mut buf: &[u8]) -> io::Result +{ + let mut sz = 0; + while !buf.is_empty() { + // Wait + if let Some(dur) = timeout.get_next_duration() { + std::thread::sleep(dur); + } + // Write + let w = writer.write(buf)?; + sz += w; + if w != buf.len() { + // Update buffer + buf = &buf[w..]; + } else { + // Whole buffer written, we're done + break; + } + } + Ok(sz) +} + +impl Write for ThrottleAdaptor +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + let buffsz = self.buffer_size_provier.get_next_buffer_size().map(|x| x.get()); + let mut timeout = self.timeout_provider.get_timeout(); + + Ok(match buffsz { + // Chunk buffer + Some(buffsz) => { + buf.chunks(buffsz).map(|buf| { + write_buf_with_timeout(&mut self.stream, &mut timeout, buf) + }).try_fold(0usize, |a, b| b.map(|b| b + a))? + }, + _ => write_buf_with_timeout(&mut self.stream, &mut timeout, buf)? + }) + //self.reader.read(buf) + + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Read for ThrottleAdaptor { - reader: R, - timeout_provider: T + fn read(&mut self, buf: &mut [u8]) -> io::Result { + todo!("read_buf_with_timeout()") + } }