use super::*; use futures::prelude::*; use std::sync::atomic::{Ordering, AtomicBool}; use std::mem; use tokio::time::{ Duration, Interval, self, }; use std::{ pin::Pin, task::{Poll, Context}, }; use std::{fmt, error}; pub mod prelude { pub use super::StreamGateExt as _; pub use super::StreamLagExt as _; pub use super::INodeExt as _; pub use super::OptionIterator; pub use super::MaybeVec; } pub trait INodeExt { /// Get the `ino` of this fs object metadata. fn inode(&self) -> data::INode; } impl INodeExt for std::fs::Metadata { #[inline] fn inode(&self) -> data::INode { data::INode::new(self) } } impl INodeExt for tokio::fs::DirEntry { #[inline] fn inode(&self) -> data::INode { use std::os::unix::fs::DirEntryExt as _; unsafe { data::INode::new_unchecked(self.ino()) } } } /// A gated stream that releases every N items from the backing stream. #[pin_project] #[derive(Debug)] pub struct GatedStream { #[pin] backing: stream::Fuse, buffer: Vec, release_at: usize, force_release: AtomicBool, } /// A gated stream that also releases on a timeout. #[pin_project] #[derive(Debug)] pub struct TimedGatedStream { #[pin] backing: GatedStream, interval: Interval, // no need to Pin this, it's Unpin, we call `poll_next_unpin`. } pub trait StreamLagExt: Sized { /// Throttle this stream with this duration. fn lag(self, timeout: Duration) -> time::Throttle; } impl StreamLagExt for S where S: Stream { #[inline] fn lag(self, timeout: Duration) -> time::Throttle { time::throttle(timeout, self) } } pub trait StreamGateExt: Sized { /// Gate this stream every `n` elements. /// /// # Panics /// If `n` is 0. fn gate(self, n: usize) -> GatedStream; /// Gate this stream every `n` elements or after `timeout` completes. /// /// # Panics /// If `n` is 0. fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream; } impl StreamGateExt for S where S: Stream, { fn gate(self, n: usize) -> GatedStream { assert!(n > 0, "Size of gate must be above 0"); GatedStream { backing: self.fuse(), buffer: Vec::with_capacity(n), release_at: n, force_release: AtomicBool::new(false), } } fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream { TimedGatedStream { backing: self.gate(n), interval: tokio::time::interval(timeout), } } } impl GatedStream where S: Stream { /// Force release of next block whether its full or not pub fn force_release(&self) { self.force_release.store(true, Ordering::SeqCst); } /// Consume into the inner stream and the current buffer. pub fn into_inner(self) -> (S, Vec) { (self.backing.into_inner(), self.buffer) } /// Size of the gated block pub fn block_size(&self) -> usize { self.release_at } } impl Stream for GatedStream where S: Stream { type Item = Vec;//Box<[S::Item]>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { while self.buffer.len() < self.release_at { let this = self.as_mut().project(); match this.backing.poll_next(cx) { Poll::Pending => break, Poll::Ready(None) => { return if this.buffer.len() == 0 { Poll::Ready(None) } else { let this = self.project(); Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) }; }, Poll::Ready(Some(item)) => this.buffer.push(item), } } if self.buffer.len() > 0 && self.force_release.load(Ordering::Acquire) { let this = self.project(); let output = mem::replace(this.buffer, Vec::with_capacity(*this.release_at)); this.force_release.store(false, Ordering::Release); Poll::Ready(Some(output.into())) } else if self.buffer.len() == self.release_at { let this = self.project(); Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into())) } else { Poll::Pending } } fn size_hint(&self) -> (usize, Option) { let bshint = self.backing.size_hint(); (bshint.0 / self.release_at, bshint.1.map(|x| x / self.release_at)) } } impl Stream for TimedGatedStream where S: Stream { type Item = Vec;//Box<[S::Item]>; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match this.interval.poll_next_unpin(cx) { Poll::Ready(_) => this.backing.force_release(), _ => (), } this.backing.poll_next(cx) } #[inline] fn size_hint(&self) -> (usize, Option) { self.backing.size_hint() } } /// An iterator that can be constructed from an `Option`. #[derive(Debug, Clone)] pub struct OptionIterator(Option); impl OptionIterator { /// Consume into the inner `Option`. #[inline] pub fn into_inner(self) -> Option { self.0 } /// Does this `OptionIterator` have a value? pub fn is_some(&self) -> bool { self.0.is_some() } } impl From> for OptionIterator { fn from(from: Option) -> Self { Self(from) } } impl Iterator for OptionIterator { type Item = I::Item; fn next(&mut self) -> Option { self.0.map(|x| x.next()).flatten() } fn size_hint(&self) -> (usize, Option) { match self.0 { Some(i) => i.size_hint(), _ => (0, Some(0)), } } } impl std::iter::FusedIterator for OptionIterator where I: std::iter::FusedIterator{} impl ExactSizeIterator for OptionIterator where I: ExactSizeIterator{} impl DoubleEndedIterator for OptionIterator where I: DoubleEndedIterator { fn next_back(&mut self) -> Option { self.0.map(|x| x.next_back()).flatten() } } /// Create a duration with time suffix `h`, `m`, `s`, `ms` or `ns`. /// /// # Combination /// These can also be combined. /// ``` /// #use crate::duration; /// duration!(1 h, 20 m, 30 s); /// ``` #[macro_export ]macro_rules! duration { (0 $($_any:tt)?) => (::std::time::Duration::from_secs(0)); ($dur:literal ms) => (::std::time::Duration::from_millis($dur)); ($dur:literal ns) => (::std::time::Duration::from_nanos($dur)); ($dur:literal s) => (::std::time::Duration::from_secs($dur)); ($dur:literal m) => (::std::time::Duration::from_secs($dur * 60)); ($dur:literal h) => (::std::time::Duration::from_secs($dur * 60 * 60)); ( $($dur:literal $unit:tt),*)=> { duration!(0 s) $( + duration!($dur $unit) )* }; } #[cfg(test)] mod tests { use super::*; #[test] fn duration() { let dur = duration!(1 h) + duration!(10 m) + duration!(1 s) + duration!(10 ms, 2 ns); println!("{:?}", dur); let dur2 = duration!( 1 h, 10 m, 1 s, 10 ms, 2 ns ); println!("{:?}", dur2); assert_eq!(dur, dur2); } #[tokio::test] async fn stream_gating_with_timeout() { let mut stream = stream::iter(0i32..100) .gate_with_timeout(16, Duration::from_millis(100)) // .gate(16) .lag(Duration::from_millis(10)); let mut sum = 0i32; while let Some(numbers) = stream.next().await { eprintln!("{}: {:?}", numbers.len(), numbers); sum+=numbers.into_iter().sum::(); } println!("{}", sum); assert_eq!((0..100).sum::(),sum); } #[tokio::test] async fn stream_gating() { let mut stream = stream::iter(0i32..100) .gate(16) .lag(Duration::from_millis(10)); let mut sum = 0i32; while let Some(numbers) = stream.next().await { eprintln!("{}: {:?}", numbers.len(), numbers); sum+=numbers.into_iter().sum::(); } println!("{}", sum); assert_eq!((0..100).sum::(),sum); } } /// Explicitly drop this item. /// /// If `defer-drop` feature is enabled, this may move the object to the background collector thread. /// /// # Speicialisations /// There can be special handling for `Vec` types in this way. /// ``` /// let large_vec = vec![String::from("Hello world"); 1000]; /// drop!(vec large_vec); /// ``` /// It also has an `async` variant, that lets you await the background dropping task. /// ``` /// let large_vec = vec![String::from("Hello world"); 1000]; /// drop!(async vec large_vec); /// ``` #[macro_export] macro_rules! drop { (async vec $item:expr) => { #[cfg(feature="defer-drop")] { $crate::defer_drop::drop_vec($item).await; } #[cfg(not(feature="defer-drop"))] { drop($item); } () }; (vec $item:expr) => { #[cfg(feature="defer-drop")] { $crate::defer_drop::drop_vec_sync($item); } #[cfg(not(feature="defer-drop"))] { drop($item); } () } } /// Base type from macro `eyre_assert`. #[derive(Debug)] pub struct SoftAssertionFailedError; impl error::Error for SoftAssertionFailedError{} impl fmt::Display for SoftAssertionFailedError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Assertion failed") } } /// A soft assertion that yields an `Err(eyre::Report)` if the condition fails. #[macro_export] macro_rules! eyre_assert { ($cond:expr $(; $message:literal)?) => { if !$cond { Err($crate::ext::SoftAssertionFailedError) $(.wrap_err(eyre!($message)))? .with_section(|| stringify!($cond).header("Expression was")) } else { Ok(()) } }; } pub type MaybeVec = smallvec::SmallVec<[T; 1]>;