You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
2.8 KiB

//! Stream adaptors
use super::*;
use std::{
io::{
self,
Read,
Write,
},
};
use prov::prelude::*;
/// Adaptor over a reader/writer that can chunk and throttle either way.
#[derive(Debug)]
pub struct ThrottleAdaptor<S: ?Sized, T: ThrottleProvider, B: BufferProvider>
{
timeout_provider: T,
buffer_size_provier: B,
stream: S
}
/// `ThrottleAdaptor` with configuration created from a `conf::State`
pub type StateThrottleAdaptor<S> = ThrottleAdaptor<S, Box<dyn DynThrottleProvider<'static> + 'static>, Box<dyn BufferProvider + 'static>>;
impl<S> StateThrottleAdaptor<S>
{
/// Create a new instance from `conf::State`
#[inline]
pub fn new_from_state(state: conf::State, stream: S) -> Self
{
Self {
timeout_provider: state.throttle_generator.unwrap_or_else(|| Box::new(NoThrottleProvider)),
buffer_size_provier: state.buffer_size,
stream,
}
}
}
impl<S: Read + Write> From<(conf::State, S)> for StateThrottleAdaptor<S>
{
#[inline(always)]
fn from((state, stream): (conf::State, S)) -> Self
{
Self::new_from_state(state, stream)
}
}
impl<S, T: ThrottleProvider, B: BufferProvider> ThrottleAdaptor<S, T, B>
{
/// Create a new adaptor instance from these providers over this stream
#[inline]
pub fn new(throttle: T, buffer: B, stream: S) -> Self
{
Self {
timeout_provider: throttle,
buffer_size_provier: buffer,
stream,
}
}
}
#[inline]
fn write_buf_with_timeout(writer: &mut (impl Write + ?Sized), timeout: &mut (impl DurationProvider + ?Sized), mut buf: &[u8]) -> io::Result<usize>
{
let mut sz = 0;
while !buf.is_empty() {
// Wait
if let Some(dur) = timeout.get_next_duration() {
std::thread::sleep(dur);
}
// Write
let w = writer.write(buf)?;
sz += w;
if w != buf.len() {
// Update buffer
buf = &buf[w..];
} else {
// Whole buffer written, we're done
break;
}
}
Ok(sz)
}
impl<W: ?Sized + Write, T: ThrottleProvider, B: BufferProvider> Write for ThrottleAdaptor<W, T, B>
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buffsz = self.buffer_size_provier.get_next_buffer_size().map(|x| x.get());
let mut timeout = self.timeout_provider.get_timeout();
Ok(match buffsz {
// Chunk buffer
Some(buffsz) => {
buf.chunks(buffsz).map(|buf| {
write_buf_with_timeout(&mut self.stream, &mut timeout, buf)
}).try_fold(0usize, |a, b| b.map(|b| b + a))?
},
// Don't chunk buffer
_ => write_buf_with_timeout(&mut self.stream, &mut timeout, buf)?
})
//self.reader.read(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<R: ?Sized + Read, T: ThrottleProvider, B: BufferProvider> Read for ThrottleAdaptor<R, T, B>
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
todo!("read_buf_with_timeout()")
}
}