fix dedup in rec parallel

master
Avril 4 years ago
parent 7a1228e025
commit ca2620aea1
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
Cargo.lock generated

@ -586,6 +586,7 @@ dependencies = [
"futures", "futures",
"lazy_static", "lazy_static",
"log", "log",
"pin-project",
"pretty_env_logger", "pretty_env_logger",
"recolored", "recolored",
"rustc_version", "rustc_version",

@ -1,7 +1,7 @@
[package] [package]
name = "sever" name = "sever"
description = "Coerce hardlinks into new files" description = "Coerce hardlinks into new files"
version = "1.0.1" version = "1.0.2"
authors = ["Avril <flanchan@cumallover.me>"] authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018" edition = "2018"
readme = "README.org" readme = "README.org"
@ -18,7 +18,7 @@ limit-concurrency = ["parallel"]
recursive = [] recursive = []
limit-recursion = ["recursive"] limit-recursion = ["recursive"]
splash = [] splash = []
parallel = ["tokio", "futures"] parallel = ["tokio", "futures", "pin-project"]
threads = ["parallel", "tokio/rt-threaded"] threads = ["parallel", "tokio/rt-threaded"]
# use PRETTY_ENV_LOGGER I guess # use PRETTY_ENV_LOGGER I guess
@ -35,6 +35,7 @@ futures = {version = "0.3.5", optional = true}
lazy_static = "1.4.0" lazy_static = "1.4.0"
uuid = {version = "0.8.1", features = ["v4"]} uuid = {version = "0.8.1", features = ["v4"]}
recolored = "1.9.3" recolored = "1.9.3"
pin-project = {version = "0.4.26", optional=true}
[build-dependencies] [build-dependencies]
rustc_version = "0.2" rustc_version = "0.2"

@ -23,3 +23,60 @@ where I: IntoIterator<Item=T>,
string string
} }
} }
#[cfg(feature="parallel")]
mod para
{
use super::*;
use std::{
collections::HashSet,
task::{Poll, Context,},
pin::Pin,
marker::PhantomData,
hash::Hash,
};
use futures::{
stream::{
Stream,
},
};
#[pin_project]
pub struct DedupStream<I, T>(#[pin] I, HashSet<map::HashOutput>, PhantomData<T>);
impl<I: Stream<Item=T>, T: Hash> Stream for DedupStream<I, T>
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
match this.0.poll_next(cx) {
Poll::Ready(Some(x)) => {
if this.1.insert(map::compute(&x)) {
Poll::Ready(Some(x))
} else {
self.poll_next(cx)
}
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub trait DedupStreamExt: Stream+ Sized
{
fn dedup(self) -> DedupStream<Self, Self::Item>;
}
impl<T: Stream> DedupStreamExt for T
where T::Item: Hash
{
fn dedup(self) -> DedupStream<Self, Self::Item>
{
DedupStream(self, HashSet::new(), PhantomData)
}
}
}
pub use para::*;

@ -1,6 +1,7 @@
#![allow(dead_code)] #![allow(dead_code)]
#![allow(unused_imports)] #![allow(unused_imports)]
#[cfg(feature="parallel")] #[macro_use] extern crate pin_project;
#[macro_use] extern crate log; #[macro_use] extern crate log;
#[macro_use] mod macros; #[macro_use] mod macros;
@ -73,7 +74,9 @@ async fn main() -> eyre::Result<()> {
async move { async move {
Some(parallel::expand_dir(file).await) //TODO: We gotta in here, too Some(parallel::expand_dir(file).await) //TODO: We gotta in here, too
} }
}).flatten()).await, })
.flatten()
.dedup()).await,
"Jobs failed") "Jobs failed")
} }

@ -7,9 +7,9 @@ use std::{
}; };
//TODO: Feature flag for SHA256 hashing //TODO: Feature flag for SHA256 hashing
type HashOutput = u64; pub type HashOutput = u64;
fn compute<H: Hash>(what: &H) -> HashOutput pub fn compute<H: Hash>(what: &H) -> HashOutput
{ {
use std::hash::Hasher; use std::hash::Hasher;
let mut hasher = std::collections::hash_map::DefaultHasher::new(); let mut hasher = std::collections::hash_map::DefaultHasher::new();

@ -213,7 +213,8 @@ pub async fn expand_dir(p: String) -> impl Stream<Item=String>
tx.send(p).await.unwrap(); tx.send(p).await.unwrap();
} }
}); });
rx //TODO: map this to dedup rx
//DedupStream(rx, HashSet::new())
} else { } else {
stream::iter(iter::once(p).filter_map(|p| { stream::iter(iter::once(p).filter_map(|p| {
if Path::new(&p).is_dir() { if Path::new(&p).is_dir() {

Loading…
Cancel
Save