use super::*; use futures::prelude::*; use std::sync::atomic::{Ordering, AtomicBool}; use std::mem; use tokio::time::{ Duration, Interval, self, }; use std::{ pin::Pin, task::{Poll, Context}, }; pub mod prelude { pub use super::StreamGateExt as _; pub use super::StreamLagExt as _; } /// A gated stream that releases every N items from the backing stream. #[pin_project] #[derive(Debug)] pub struct GatedStream { #[pin] backing: stream::Fuse, buffer: Vec, release_at: usize, force_release: AtomicBool, } /// A gated stream that also releases on a timeout. #[pin_project] #[derive(Debug)] pub struct TimedGatedStream { #[pin] backing: GatedStream, interval: Interval, // no need to Pin this, it's Unpin, we call `poll_next_unpin`. } pub trait StreamLagExt: Sized { /// Throttle this stream with this duration. fn lag(self, timeout: Duration) -> time::Throttle; } impl StreamLagExt for S where S: Stream { #[inline] fn lag(self, timeout: Duration) -> time::Throttle { time::throttle(timeout, self) } } pub trait StreamGateExt: Sized { /// Gate this stream every `n` elements. /// /// # Panics /// If `n` is 0. fn gate(self, n: usize) -> GatedStream; /// Gate this stream every `n` elements or after `timeout` completes. /// /// # Panics /// If `n` is 0. fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream; } impl StreamGateExt for S where S: Stream, { fn gate(self, n: usize) -> GatedStream { assert!(n > 0, "Size of gate must be above 0"); GatedStream { backing: self.fuse(), buffer: Vec::with_capacity(n), release_at: n, force_release: AtomicBool::new(false), } } fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream { TimedGatedStream { backing: self.gate(n), interval: tokio::time::interval(timeout), } } } impl GatedStream where S: Stream { /// Force release of next block whether its full or not pub fn force_release(&self) { self.force_release.store(true, Ordering::SeqCst); } /// Consume into the inner stream and the current buffer. pub fn into_inner(self) -> (S, Vec) { (self.backing.into_inner(), self.buffer) } /// Size of the gated block pub fn block_size(&self) -> usize { self.release_at } } impl Stream for GatedStream where S: Stream { type Item = Box<[S::Item]>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { while self.buffer.len() < self.release_at { let this = self.as_mut().project(); match this.backing.poll_next(cx) { Poll::Pending => break, Poll::Ready(None) => { if this.buffer.len() == 0 { return Poll::Ready(None); } break; }, Poll::Ready(Some(item)) => this.buffer.push(item), } } if self.buffer.len() > 0 && self.force_release.load(Ordering::Acquire) { let this = self.project(); let output = mem::replace(this.buffer, Vec::with_capacity(*this.release_at)); this.force_release.store(false, Ordering::Release); Poll::Ready(Some(output.into())) } else if self.buffer.len() == self.release_at { let this = self.project(); Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) } else { Poll::Pending } } fn size_hint(&self) -> (usize, Option) { let bshint = self.backing.size_hint(); (bshint.0 / self.release_at, bshint.1.map(|x| x / self.release_at)) } } impl Stream for TimedGatedStream where S: Stream { type Item = Box<[S::Item]>; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match this.interval.poll_next_unpin(cx) { Poll::Ready(_) => this.backing.force_release(), _ => (), } this.backing.poll_next(cx) } #[inline] fn size_hint(&self) -> (usize, Option) { self.backing.size_hint() } } //TODO: impl Stream for TimedGatedStream //TODO: How to handle timeout for TimedGatedStream?