|
|
|
use super::*;
|
|
|
|
use futures::prelude::*;
|
|
|
|
use std::sync::atomic::{Ordering, AtomicBool};
|
|
|
|
use std::mem;
|
|
|
|
use tokio::time::{
|
|
|
|
Duration,
|
|
|
|
Interval,
|
|
|
|
self,
|
|
|
|
};
|
|
|
|
use std::{
|
|
|
|
pin::Pin,
|
|
|
|
task::{Poll, Context},
|
|
|
|
};
|
|
|
|
|
|
|
|
pub mod prelude
|
|
|
|
{
|
|
|
|
pub use super::StreamGateExt as _;
|
|
|
|
pub use super::StreamLagExt as _;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// A gated stream that releases every N items from the backing stream.
|
|
|
|
#[pin_project]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct GatedStream<S,T>
|
|
|
|
{
|
|
|
|
#[pin] backing: stream::Fuse<S>,
|
|
|
|
buffer: Vec<T>,
|
|
|
|
release_at: usize,
|
|
|
|
force_release: AtomicBool,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A gated stream that also releases on a timeout.
|
|
|
|
#[pin_project]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct TimedGatedStream<S, T>
|
|
|
|
{
|
|
|
|
#[pin] backing: GatedStream<S, T>,
|
|
|
|
interval: Interval, // no need to Pin this, it's Unpin, we call `poll_next_unpin`.
|
|
|
|
}
|
|
|
|
|
|
|
|
pub trait StreamLagExt: Sized
|
|
|
|
{
|
|
|
|
/// Throttle this stream with this duration.
|
|
|
|
fn lag(self, timeout: Duration) -> time::Throttle<Self>;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> StreamLagExt for S
|
|
|
|
where S: Stream
|
|
|
|
{
|
|
|
|
#[inline] fn lag(self, timeout: Duration) -> time::Throttle<Self>
|
|
|
|
{
|
|
|
|
time::throttle(timeout, self)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub trait StreamGateExt<T>: Sized
|
|
|
|
{
|
|
|
|
/// Gate this stream every `n` elements.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// If `n` is 0.
|
|
|
|
fn gate(self, n: usize) -> GatedStream<Self, T>;
|
|
|
|
|
|
|
|
/// Gate this stream every `n` elements or after `timeout` completes.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// If `n` is 0.
|
|
|
|
fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream<Self, T>;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> StreamGateExt<S::Item> for S
|
|
|
|
where S: Stream,
|
|
|
|
{
|
|
|
|
fn gate(self, n: usize) -> GatedStream<Self, S::Item>
|
|
|
|
{
|
|
|
|
assert!(n > 0, "Size of gate must be above 0");
|
|
|
|
GatedStream
|
|
|
|
{
|
|
|
|
backing: self.fuse(),
|
|
|
|
buffer: Vec::with_capacity(n),
|
|
|
|
release_at: n,
|
|
|
|
force_release: AtomicBool::new(false),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream<Self, S::Item> {
|
|
|
|
TimedGatedStream
|
|
|
|
{
|
|
|
|
backing: self.gate(n),
|
|
|
|
interval: tokio::time::interval(timeout),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> GatedStream<S, S::Item>
|
|
|
|
where S: Stream
|
|
|
|
{
|
|
|
|
/// Force release of next block whether its full or not
|
|
|
|
pub fn force_release(&self)
|
|
|
|
{
|
|
|
|
self.force_release.store(true, Ordering::SeqCst);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Consume into the inner stream and the current buffer.
|
|
|
|
pub fn into_inner(self) -> (S, Vec<S::Item>)
|
|
|
|
{
|
|
|
|
(self.backing.into_inner(), self.buffer)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Size of the gated block
|
|
|
|
pub fn block_size(&self) -> usize
|
|
|
|
{
|
|
|
|
self.release_at
|
|
|
|
}
|
|
|
|
}
|
|
|
|
impl<S> Stream for GatedStream<S, S::Item>
|
|
|
|
where S: Stream
|
|
|
|
{
|
|
|
|
type Item = Box<[S::Item]>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
|
|
|
|
while self.buffer.len() < self.release_at
|
|
|
|
{
|
|
|
|
let this = self.as_mut().project();
|
|
|
|
match this.backing.poll_next(cx)
|
|
|
|
{
|
|
|
|
Poll::Pending => break,
|
|
|
|
Poll::Ready(None) => {
|
|
|
|
return if this.buffer.len() == 0 {
|
|
|
|
Poll::Ready(None)
|
|
|
|
} else {
|
|
|
|
let this = self.project();
|
|
|
|
Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into()))
|
|
|
|
};
|
|
|
|
|
|
|
|
},
|
|
|
|
Poll::Ready(Some(item)) => this.buffer.push(item),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.buffer.len() > 0 && self.force_release.load(Ordering::Acquire) {
|
|
|
|
let this = self.project();
|
|
|
|
let output = mem::replace(this.buffer, Vec::with_capacity(*this.release_at));
|
|
|
|
this.force_release.store(false, Ordering::Release);
|
|
|
|
Poll::Ready(Some(output.into()))
|
|
|
|
}
|
|
|
|
else if self.buffer.len() == self.release_at
|
|
|
|
{
|
|
|
|
let this = self.project();
|
|
|
|
Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into()))
|
|
|
|
} else {
|
|
|
|
Poll::Pending
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
let bshint = self.backing.size_hint();
|
|
|
|
(bshint.0 / self.release_at, bshint.1.map(|x| x / self.release_at))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl<S> Stream for TimedGatedStream<S, S::Item>
|
|
|
|
where S: Stream
|
|
|
|
{
|
|
|
|
type Item = Box<[S::Item]>;
|
|
|
|
#[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.project();
|
|
|
|
match this.interval.poll_next_unpin(cx)
|
|
|
|
{
|
|
|
|
Poll::Ready(_) => this.backing.force_release(),
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
|
|
|
|
this.backing.poll_next(cx)
|
|
|
|
}
|
|
|
|
#[inline] fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
self.backing.size_hint()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests
|
|
|
|
{
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn stream_gating_with_timeout()
|
|
|
|
{
|
|
|
|
let mut stream = stream::iter(0i32..100)
|
|
|
|
.gate_with_timeout(16, Duration::from_millis(100))
|
|
|
|
// .gate(16)
|
|
|
|
.lag(Duration::from_millis(10));
|
|
|
|
let mut sum = 0i32;
|
|
|
|
while let Some(numbers) = stream.next().await
|
|
|
|
{
|
|
|
|
eprintln!("{}: {:?}", numbers.len(), numbers);
|
|
|
|
sum+=numbers.into_iter().sum::<i32>();
|
|
|
|
}
|
|
|
|
println!("{}", sum);
|
|
|
|
assert_eq!((0..100).sum::<i32>(),sum);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn stream_gating()
|
|
|
|
{
|
|
|
|
let mut stream = stream::iter(0i32..100)
|
|
|
|
.gate(16)
|
|
|
|
.lag(Duration::from_millis(10));
|
|
|
|
let mut sum = 0i32;
|
|
|
|
while let Some(numbers) = stream.next().await
|
|
|
|
{
|
|
|
|
eprintln!("{}: {:?}", numbers.len(), numbers);
|
|
|
|
sum+=numbers.into_iter().sum::<i32>();
|
|
|
|
}
|
|
|
|
println!("{}", sum);
|
|
|
|
assert_eq!((0..100).sum::<i32>(),sum);
|
|
|
|
}
|
|
|
|
}
|