diff --git a/src/ext.rs b/src/ext.rs index 9507665..71b1727 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -126,10 +126,13 @@ where S: Stream { Poll::Pending => break, Poll::Ready(None) => { - if this.buffer.len() == 0 { - return Poll::Ready(None); - } - break; + return if this.buffer.len() == 0 { + Poll::Ready(None) + } else { + let this = self.project(); + Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) + }; + }, Poll::Ready(Some(item)) => this.buffer.push(item), } @@ -174,5 +177,43 @@ where S: Stream self.backing.size_hint() } } -//TODO: impl Stream for TimedGatedStream -//TODO: How to handle timeout for TimedGatedStream? + + +#[cfg(test)] +mod tests +{ + use super::*; + + #[tokio::test] + async fn stream_gating_with_timeout() + { + let mut stream = stream::iter(0i32..100) + .gate_with_timeout(16, Duration::from_millis(100)) + // .gate(16) + .lag(Duration::from_millis(10)); + let mut sum = 0i32; + while let Some(numbers) = stream.next().await + { + eprintln!("{}: {:?}", numbers.len(), numbers); + sum+=numbers.into_iter().sum::(); + } + println!("{}", sum); + assert_eq!((0..100).sum::(),sum); + } + + #[tokio::test] + async fn stream_gating() + { + let mut stream = stream::iter(0i32..100) + .gate(16) + .lag(Duration::from_millis(10)); + let mut sum = 0i32; + while let Some(numbers) = stream.next().await + { + eprintln!("{}: {:?}", numbers.len(), numbers); + sum+=numbers.into_iter().sum::(); + } + println!("{}", sum); + assert_eq!((0..100).sum::(),sum); + } +}