use super::*; use futures::prelude::*; use std::sync::atomic::{Ordering, AtomicBool}; use std::mem; use tokio::time::{ Duration, Interval, self, }; use std::{ pin::Pin, task::{Poll, Context}, }; pub mod prelude { pub use super::StreamGateExt as _; pub use super::StreamLagExt as _; pub use super::INodeExt as _; } pub trait INodeExt { /// Get the `ino` of this fs object metadata. fn inode(&self) -> data::INode; } impl INodeExt for std::fs::Metadata { #[inline] fn inode(&self) -> data::INode { data::INode::new(self) } } impl INodeExt for tokio::fs::DirEntry { #[inline] fn inode(&self) -> data::INode { use std::os::unix::fs::DirEntryExt as _; unsafe { data::INode::new_unchecked(self.ino()) } } } /// A gated stream that releases every N items from the backing stream. #[pin_project] #[derive(Debug)] pub struct GatedStream { #[pin] backing: stream::Fuse, buffer: Vec, release_at: usize, force_release: AtomicBool, } /// A gated stream that also releases on a timeout. #[pin_project] #[derive(Debug)] pub struct TimedGatedStream { #[pin] backing: GatedStream, interval: Interval, // no need to Pin this, it's Unpin, we call `poll_next_unpin`. } pub trait StreamLagExt: Sized { /// Throttle this stream with this duration. fn lag(self, timeout: Duration) -> time::Throttle; } impl StreamLagExt for S where S: Stream { #[inline] fn lag(self, timeout: Duration) -> time::Throttle { time::throttle(timeout, self) } } pub trait StreamGateExt: Sized { /// Gate this stream every `n` elements. /// /// # Panics /// If `n` is 0. fn gate(self, n: usize) -> GatedStream; /// Gate this stream every `n` elements or after `timeout` completes. /// /// # Panics /// If `n` is 0. fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream; } impl StreamGateExt for S where S: Stream, { fn gate(self, n: usize) -> GatedStream { assert!(n > 0, "Size of gate must be above 0"); GatedStream { backing: self.fuse(), buffer: Vec::with_capacity(n), release_at: n, force_release: AtomicBool::new(false), } } fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream { TimedGatedStream { backing: self.gate(n), interval: tokio::time::interval(timeout), } } } impl GatedStream where S: Stream { /// Force release of next block whether its full or not pub fn force_release(&self) { self.force_release.store(true, Ordering::SeqCst); } /// Consume into the inner stream and the current buffer. pub fn into_inner(self) -> (S, Vec) { (self.backing.into_inner(), self.buffer) } /// Size of the gated block pub fn block_size(&self) -> usize { self.release_at } } impl Stream for GatedStream where S: Stream { type Item = Vec;//Box<[S::Item]>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { while self.buffer.len() < self.release_at { let this = self.as_mut().project(); match this.backing.poll_next(cx) { Poll::Pending => break, Poll::Ready(None) => { return if this.buffer.len() == 0 { Poll::Ready(None) } else { let this = self.project(); Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) }; }, Poll::Ready(Some(item)) => this.buffer.push(item), } } if self.buffer.len() > 0 && self.force_release.load(Ordering::Acquire) { let this = self.project(); let output = mem::replace(this.buffer, Vec::with_capacity(*this.release_at)); this.force_release.store(false, Ordering::Release); Poll::Ready(Some(output.into())) } else if self.buffer.len() == self.release_at { let this = self.project(); Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) } else { Poll::Pending } } fn size_hint(&self) -> (usize, Option) { let bshint = self.backing.size_hint(); (bshint.0 / self.release_at, bshint.1.map(|x| x / self.release_at)) } } impl Stream for TimedGatedStream where S: Stream { type Item = Vec;//Box<[S::Item]>; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match this.interval.poll_next_unpin(cx) { Poll::Ready(_) => this.backing.force_release(), _ => (), } this.backing.poll_next(cx) } #[inline] fn size_hint(&self) -> (usize, Option) { self.backing.size_hint() } } /// Create a duration with time suffix `h`, `m`, `s`, `ms` or `ns`. /// /// # Combination /// These can also be combined. /// ``` /// #use crate::duration; /// duration!(1 h, 20 m, 30 s); /// ``` #[macro_export ]macro_rules! duration { (0 $($_any:tt)?) => (::std::time::Duration::from_secs(0)); ($dur:literal ms) => (::std::time::Duration::from_millis($dur)); ($dur:literal ns) => (::std::time::Duration::from_nanos($dur)); ($dur:literal s) => (::std::time::Duration::from_secs($dur)); ($dur:literal m) => (::std::time::Duration::from_secs($dur * 60)); ($dur:literal h) => (::std::time::Duration::from_secs($dur * 60 * 60)); ( $($dur:literal $unit:tt),*)=> { duration!(0 s) $( + duration!($dur $unit) )* }; } #[cfg(test)] mod tests { use super::*; #[test] fn duration() { let dur = duration!(1 h) + duration!(10 m) + duration!(1 s) + duration!(10 ms, 2 ns); println!("{:?}", dur); let dur2 = duration!( 1 h, 10 m, 1 s, 10 ms, 2 ns ); println!("{:?}", dur2); assert_eq!(dur, dur2); } #[tokio::test] async fn stream_gating_with_timeout() { let mut stream = stream::iter(0i32..100) .gate_with_timeout(16, Duration::from_millis(100)) // .gate(16) .lag(Duration::from_millis(10)); let mut sum = 0i32; while let Some(numbers) = stream.next().await { eprintln!("{}: {:?}", numbers.len(), numbers); sum+=numbers.into_iter().sum::(); } println!("{}", sum); assert_eq!((0..100).sum::(),sum); } #[tokio::test] async fn stream_gating() { let mut stream = stream::iter(0i32..100) .gate(16) .lag(Duration::from_millis(10)); let mut sum = 0i32; while let Some(numbers) = stream.next().await { eprintln!("{}: {:?}", numbers.len(), numbers); sum+=numbers.into_iter().sum::(); } println!("{}", sum); assert_eq!((0..100).sum::(),sum); } }