You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
116 lines
2.8 KiB
116 lines
2.8 KiB
//! Stream adaptors
|
|
use super::*;
|
|
use std::{
|
|
io::{
|
|
self,
|
|
Read,
|
|
Write,
|
|
},
|
|
};
|
|
use prov::prelude::*;
|
|
|
|
/// Adaptor over a reader/writer that can chunk and throttle either way.
|
|
#[derive(Debug)]
|
|
pub struct ThrottleAdaptor<S: ?Sized, T: ThrottleProvider, B: BufferProvider>
|
|
{
|
|
timeout_provider: T,
|
|
buffer_size_provier: B,
|
|
|
|
stream: S
|
|
}
|
|
|
|
/// `ThrottleAdaptor` with configuration created from a `conf::State`
|
|
pub type StateThrottleAdaptor<S> = ThrottleAdaptor<S, Box<dyn DynThrottleProvider<'static> + 'static>, Box<dyn BufferProvider + 'static>>;
|
|
|
|
impl<S> StateThrottleAdaptor<S>
|
|
{
|
|
/// Create a new instance from `conf::State`
|
|
#[inline]
|
|
pub fn new_from_state(state: conf::State, stream: S) -> Self
|
|
{
|
|
Self {
|
|
timeout_provider: state.throttle_generator.unwrap_or_else(|| Box::new(NoThrottleProvider)),
|
|
buffer_size_provier: state.buffer_size,
|
|
stream,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<S: Read + Write> From<(conf::State, S)> for StateThrottleAdaptor<S>
|
|
{
|
|
#[inline(always)]
|
|
fn from((state, stream): (conf::State, S)) -> Self
|
|
{
|
|
Self::new_from_state(state, stream)
|
|
}
|
|
}
|
|
|
|
|
|
impl<S, T: ThrottleProvider, B: BufferProvider> ThrottleAdaptor<S, T, B>
|
|
{
|
|
/// Create a new adaptor instance from these providers over this stream
|
|
#[inline]
|
|
pub fn new(throttle: T, buffer: B, stream: S) -> Self
|
|
{
|
|
Self {
|
|
timeout_provider: throttle,
|
|
buffer_size_provier: buffer,
|
|
stream,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn write_buf_with_timeout(writer: &mut (impl Write + ?Sized), timeout: &mut (impl DurationProvider + ?Sized), mut buf: &[u8]) -> io::Result<usize>
|
|
{
|
|
let mut sz = 0;
|
|
while !buf.is_empty() {
|
|
// Wait
|
|
if let Some(dur) = timeout.get_next_duration() {
|
|
std::thread::sleep(dur);
|
|
}
|
|
// Write
|
|
let w = writer.write(buf)?;
|
|
sz += w;
|
|
if w != buf.len() {
|
|
// Update buffer
|
|
buf = &buf[w..];
|
|
} else {
|
|
// Whole buffer written, we're done
|
|
break;
|
|
}
|
|
}
|
|
Ok(sz)
|
|
}
|
|
|
|
impl<W: ?Sized + Write, T: ThrottleProvider, B: BufferProvider> Write for ThrottleAdaptor<W, T, B>
|
|
{
|
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
let buffsz = self.buffer_size_provier.get_next_buffer_size().map(|x| x.get());
|
|
let mut timeout = self.timeout_provider.get_timeout();
|
|
|
|
Ok(match buffsz {
|
|
// Chunk buffer
|
|
Some(buffsz) => {
|
|
buf.chunks(buffsz).map(|buf| {
|
|
write_buf_with_timeout(&mut self.stream, &mut timeout, buf)
|
|
}).try_fold(0usize, |a, b| b.map(|b| b + a))?
|
|
},
|
|
// Don't chunk buffer
|
|
_ => write_buf_with_timeout(&mut self.stream, &mut timeout, buf)?
|
|
})
|
|
//self.reader.read(buf)
|
|
|
|
}
|
|
fn flush(&mut self) -> io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl<R: ?Sized + Read, T: ThrottleProvider, B: BufferProvider> Read for ThrottleAdaptor<R, T, B>
|
|
{
|
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
todo!("read_buf_with_timeout()")
|
|
}
|
|
}
|