//! Stream adaptors use super::*; use std::{ io::{ self, Read, Write, }, }; use prov::prelude::*; /// Adaptor over a reader/writer that can chunk and throttle either way. #[derive(Debug)] pub struct ThrottleAdaptor { timeout_provider: T, buffer_size_provier: B, stream: S } /// `ThrottleAdaptor` with configuration created from a `conf::State` pub type StateThrottleAdaptor = ThrottleAdaptor + 'static>, Box>; impl StateThrottleAdaptor { /// Create a new instance from `conf::State` #[inline] pub fn new_from_state(state: conf::State, stream: S) -> Self { Self { timeout_provider: state.throttle_generator.unwrap_or_else(|| Box::new(NoThrottleProvider)), buffer_size_provier: state.buffer_size, stream, } } } impl From<(conf::State, S)> for StateThrottleAdaptor { #[inline(always)] fn from((state, stream): (conf::State, S)) -> Self { Self::new_from_state(state, stream) } } impl ThrottleAdaptor { /// Create a new adaptor instance from these providers over this stream #[inline] pub fn new(throttle: T, buffer: B, stream: S) -> Self { Self { timeout_provider: throttle, buffer_size_provier: buffer, stream, } } } #[inline] fn write_buf_with_timeout(writer: &mut (impl Write + ?Sized), timeout: &mut (impl DurationProvider + ?Sized), mut buf: &[u8]) -> io::Result { let mut sz = 0; while !buf.is_empty() { // Wait if let Some(dur) = timeout.get_next_duration() { std::thread::sleep(dur); } // Write let w = writer.write(buf)?; sz += w; if w != buf.len() { // Update buffer buf = &buf[w..]; } else { // Whole buffer written, we're done break; } } Ok(sz) } impl Write for ThrottleAdaptor { fn write(&mut self, buf: &[u8]) -> io::Result { let buffsz = self.buffer_size_provier.get_next_buffer_size().map(|x| x.get()); let mut timeout = self.timeout_provider.get_timeout(); Ok(match buffsz { // Chunk buffer Some(buffsz) => { buf.chunks(buffsz).map(|buf| { write_buf_with_timeout(&mut self.stream, &mut timeout, buf) }).try_fold(0usize, |a, b| b.map(|b| b + a))? }, // Don't chunk buffer _ => write_buf_with_timeout(&mut self.stream, &mut timeout, buf)? }) //self.reader.read(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl Read for ThrottleAdaptor { fn read(&mut self, buf: &mut [u8]) -> io::Result { todo!("read_buf_with_timeout()") } }