You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dirstat/src/ext.rs

498 lines
12 KiB

use super::*;
use futures::prelude::*;
use std::sync::atomic::{Ordering, AtomicBool};
use std::mem;
use tokio::time::{
Duration,
Interval,
self,
};
use std::{
pin::Pin,
task::{Poll, Context},
};
use std::{fmt, error};
pub mod prelude
{
pub use super::StreamGateExt as _;
pub use super::StreamLagExt as _;
pub use super::INodeExt as _;
pub use super::async_write_ext::{
EitherWrite,
DeadSink,
};
}
pub trait INodeExt
{
/// Get the `ino` of this fs object metadata.
fn inode(&self) -> data::INode;
}
impl INodeExt for std::fs::Metadata
{
#[inline] fn inode(&self) -> data::INode {
data::INode::new(self)
}
}
impl INodeExt for tokio::fs::DirEntry
{
#[inline] fn inode(&self) -> data::INode {
use std::os::unix::fs::DirEntryExt as _;
unsafe { data::INode::new_unchecked(self.ino()) }
}
}
/// A gated stream that releases every N items from the backing stream.
#[pin_project]
#[derive(Debug)]
pub struct GatedStream<S,T>
{
#[pin] backing: stream::Fuse<S>,
buffer: Vec<T>,
release_at: usize,
force_release: AtomicBool,
}
/// A gated stream that also releases on a timeout.
#[pin_project]
#[derive(Debug)]
pub struct TimedGatedStream<S, T>
{
#[pin] backing: GatedStream<S, T>,
interval: Interval, // no need to Pin this, it's Unpin, we call `poll_next_unpin`.
}
pub trait StreamLagExt: Sized
{
/// Throttle this stream with this duration.
fn lag(self, timeout: Duration) -> time::Throttle<Self>;
}
impl<S> StreamLagExt for S
where S: Stream
{
#[inline] fn lag(self, timeout: Duration) -> time::Throttle<Self>
{
time::throttle(timeout, self)
}
}
pub trait StreamGateExt<T>: Sized
{
/// Gate this stream every `n` elements.
///
/// # Panics
/// If `n` is 0.
fn gate(self, n: usize) -> GatedStream<Self, T>;
/// Gate this stream every `n` elements or after `timeout` completes.
///
/// # Panics
/// If `n` is 0.
fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream<Self, T>;
}
impl<S> StreamGateExt<S::Item> for S
where S: Stream,
{
fn gate(self, n: usize) -> GatedStream<Self, S::Item>
{
assert!(n > 0, "Size of gate must be above 0");
GatedStream
{
backing: self.fuse(),
buffer: Vec::with_capacity(n),
release_at: n,
force_release: AtomicBool::new(false),
}
}
fn gate_with_timeout(self, n: usize, timeout: Duration) -> TimedGatedStream<Self, S::Item> {
TimedGatedStream
{
backing: self.gate(n),
interval: tokio::time::interval(timeout),
}
}
}
impl<S> GatedStream<S, S::Item>
where S: Stream
{
/// Force release of next block whether its full or not
pub fn force_release(&self)
{
self.force_release.store(true, Ordering::SeqCst);
}
/// Consume into the inner stream and the current buffer.
pub fn into_inner(self) -> (S, Vec<S::Item>)
{
(self.backing.into_inner(), self.buffer)
}
/// Size of the gated block
pub fn block_size(&self) -> usize
{
self.release_at
}
}
impl<S> Stream for GatedStream<S, S::Item>
where S: Stream
{
type Item = Vec<S::Item>;//Box<[S::Item]>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while self.buffer.len() < self.release_at
{
let this = self.as_mut().project();
match this.backing.poll_next(cx)
{
Poll::Pending => break,
Poll::Ready(None) => {
return if this.buffer.len() == 0 {
Poll::Ready(None)
} else {
let this = self.project();
Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into()))
};
},
Poll::Ready(Some(item)) => this.buffer.push(item),
}
}
if self.buffer.len() > 0 && self.force_release.load(Ordering::Acquire) {
let this = self.project();
let output = mem::replace(this.buffer, Vec::with_capacity(*this.release_at));
this.force_release.store(false, Ordering::Release);
Poll::Ready(Some(output.into()))
}
else if self.buffer.len() == self.release_at
{
let this = self.project();
Poll::Ready(Some(mem::replace(this.buffer, Vec::with_capacity(*this.release_at)).into()))
} else {
Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let bshint = self.backing.size_hint();
(bshint.0 / self.release_at, bshint.1.map(|x| x / self.release_at))
}
}
impl<S> Stream for TimedGatedStream<S, S::Item>
where S: Stream
{
type Item = Vec<S::Item>;//Box<[S::Item]>;
#[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.interval.poll_next_unpin(cx)
{
Poll::Ready(_) => this.backing.force_release(),
_ => (),
}
this.backing.poll_next(cx)
}
#[inline] fn size_hint(&self) -> (usize, Option<usize>) {
self.backing.size_hint()
}
}
/// Create a duration with time suffix `h`, `m`, `s`, `ms` or `ns`.
///
/// # Combination
/// These can also be combined.
/// ```
/// #use crate::duration;
/// duration!(1 h, 20 m, 30 s);
/// ```
#[macro_export ]macro_rules! duration
{
(0 $($_any:tt)?) => (::std::time::Duration::from_secs(0));
($dur:literal ms) => (::std::time::Duration::from_millis($dur));
($dur:literal ns) => (::std::time::Duration::from_nanos($dur));
($dur:literal s) => (::std::time::Duration::from_secs($dur));
($dur:literal m) => (::std::time::Duration::from_secs($dur * 60));
($dur:literal h) => (::std::time::Duration::from_secs($dur * 60 * 60));
( $($dur:literal $unit:tt),*)=> {
duration!(0 s) $(
+ duration!($dur $unit)
)*
};
}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn duration()
{
let dur = duration!(1 h) +
duration!(10 m) +
duration!(1 s) +
duration!(10 ms, 2 ns);
println!("{:?}", dur);
let dur2 = duration!(
1 h,
10 m,
1 s,
10 ms,
2 ns
);
println!("{:?}", dur2);
assert_eq!(dur, dur2);
}
#[tokio::test]
async fn stream_gating_with_timeout()
{
let mut stream = stream::iter(0i32..100)
.gate_with_timeout(16, Duration::from_millis(100))
// .gate(16)
.lag(Duration::from_millis(10));
let mut sum = 0i32;
while let Some(numbers) = stream.next().await
{
eprintln!("{}: {:?}", numbers.len(), numbers);
sum+=numbers.into_iter().sum::<i32>();
}
println!("{}", sum);
assert_eq!((0..100).sum::<i32>(),sum);
}
#[tokio::test]
async fn stream_gating()
{
let mut stream = stream::iter(0i32..100)
.gate(16)
.lag(Duration::from_millis(10));
let mut sum = 0i32;
while let Some(numbers) = stream.next().await
{
eprintln!("{}: {:?}", numbers.len(), numbers);
sum+=numbers.into_iter().sum::<i32>();
}
println!("{}", sum);
assert_eq!((0..100).sum::<i32>(),sum);
}
}
/// Explicitly drop this item.
///
/// If `defer-drop` feature is enabled, this may move the object to the background collector thread.
///
/// # Speicialisations
/// There can be special handling for `Vec<T>` types in this way.
/// ```
/// let large_vec = vec![String::from("Hello world"); 1000];
/// drop!(vec large_vec);
/// ```
/// It also has an `async` variant, that lets you await the background dropping task.
/// ```
/// let large_vec = vec![String::from("Hello world"); 1000];
/// drop!(async vec large_vec);
/// ```
#[macro_export] macro_rules! drop {
(async vec $item:expr) => {
#[cfg(feature="defer-drop")] {
$crate::defer_drop::drop_vec($item).await;
}
#[cfg(not(feature="defer-drop"))] {
drop($item);
}
()
};
(vec $item:expr) => {
#[cfg(feature="defer-drop")] {
$crate::defer_drop::drop_vec_sync($item);
}
#[cfg(not(feature="defer-drop"))] {
drop($item);
}
()
}
}
/// Base type from macro `eyre_assert`.
#[derive(Debug)]
pub struct SoftAssertionFailedError;
impl error::Error for SoftAssertionFailedError{}
impl fmt::Display for SoftAssertionFailedError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Assertion failed")
}
}
/// A soft assertion that yields an `Err(eyre::Report)` if the condition fails.
#[macro_export] macro_rules! eyre_assert {
($cond:expr $(; $message:literal)?) => {
if !$cond {
Err($crate::ext::SoftAssertionFailedError)
$(.wrap_err(eyre!($message)))?
.with_section(|| stringify!($cond).header("Expression was"))
} else {
Ok(())
}
};
}
mod async_write_ext {
use std::ops::{Deref, DerefMut};
use tokio::io::{
self,
AsyncWrite, AsyncRead,
};
use std::{
pin::Pin,
task::{Poll, Context},
};
use std::marker::PhantomData;
#[derive(Debug, Clone)]
pub enum EitherWrite<'a, T,U>
{
First(T),
Second(U, PhantomData<&'a mut U>),
}
impl<'a, T, U> Deref for EitherWrite<'a, T,U>
where T: AsyncWrite + Unpin + 'a,
U: AsyncWrite + Unpin + 'a
{
type Target = dyn AsyncWrite + Unpin + 'a;
fn deref(&self) -> &Self::Target {
match self {
Self::First(t) => t,
Self::Second(u, _) => u,
}
}
}
impl<'a, T, U> DerefMut for EitherWrite<'a, T,U>
where T: AsyncWrite + Unpin + 'a,
U: AsyncWrite + Unpin + 'a
{
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::First(t) => t,
Self::Second(u, _) => u,
}
}
}
impl<'a, T, U> From<Result<T,U>> for EitherWrite<'a, T, U>
where T: AsyncWrite + Unpin + 'a,
U: AsyncWrite + Unpin + 'a
{
#[inline] fn from(from: Result<T,U>) -> Self
{
match from {
Ok(v) => Self::First(v),
Err(v) => Self::Second(v, PhantomData),
}
}
}
impl<'a, T> EitherWrite<'a, T, DeadSink>
{
#[inline] fn as_first_infallible(&mut self) -> &mut T
{
match self {
Self::Second(_, _) => unsafe { core::hint::unreachable_unchecked() },
Self::First(t) => t
}
}
}
impl<'a, U> EitherWrite<'a, DeadSink, U>
{
#[inline] fn as_second_infallible(&mut self) -> &mut U
{
match self {
Self::First(_) => unsafe { core::hint::unreachable_unchecked() },
Self::Second(t, _) => t
}
}
}
/* impl<'a, T> AsyncWrite for EitherWrite<'a, T, DeadSink>
where T: AsyncWrite + Unpin + 'a
{
#[inline] fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_first_infallible()) };
this.poll_write(cx, buf)
}
#[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_first_infallible()) };
this.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_first_infallible()) };
this.poll_shutdown(cx)
}
}*/
impl<'a, U> AsyncWrite for EitherWrite<'a, DeadSink, U>
where U: AsyncWrite + Unpin + 'a
{
#[inline] fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_second_infallible()) };
this.poll_write(cx, buf)
}
#[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_second_infallible()) };
this.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = unsafe { self.map_unchecked_mut(|x| x.as_second_infallible()) };
this.poll_shutdown(cx)
}
}
/// An `Infallible` type for `AsyncWrite` & `AsyncRead`
#[derive(Debug)]
pub enum DeadSink { }
impl AsyncWrite for DeadSink
{
#[inline] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
unreachable!();
}
#[inline] fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &[u8]) -> Poll<Result<usize, io::Error>> {
unreachable!();
}
#[inline] fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
unreachable!();
}
}
impl AsyncRead for DeadSink
{
#[inline] fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &mut [u8]) -> Poll<io::Result<usize>> {
unreachable!();
}
}
}