|
|
|
@ -92,9 +92,8 @@ impl From<()> for Options
|
|
|
|
|
///
|
|
|
|
|
/// # Returns
|
|
|
|
|
/// The input stream and output stream `(tx, rx)`.
|
|
|
|
|
pub fn spawn_with_cancel(opt: impl Into<Options>, cancel: impl Future<Output = ()> + 'static + Send + Unpin) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
|
|
|
|
|
pub fn spawn_with_cancel(opt: Options, cancel: impl Future<Output = ()> + 'static + Send + Unpin) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
|
|
|
|
|
{
|
|
|
|
|
let opt = opt.into();
|
|
|
|
|
let (tx, rx) = mpsc::channel(opt.forward_pressure.into());
|
|
|
|
|
let rx = file_handler(rx,cancel , match opt.max_operations {
|
|
|
|
|
Some(n) => n.into(),
|
|
|
|
@ -108,7 +107,7 @@ pub fn spawn_with_cancel(opt: impl Into<Options>, cancel: impl Future<Output = (
|
|
|
|
|
///
|
|
|
|
|
/// # Returns
|
|
|
|
|
/// The input stream and output stream `(tx, rx)`.
|
|
|
|
|
pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
|
|
|
|
|
pub fn spawn(opt: Options) -> (mpsc::Sender<PathBuf>, impl Stream<Item = (PathBuf, io::Result<sha256::Sha256Hash>)> + Send + Sync + 'static)
|
|
|
|
|
{
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct NeverFuture;
|
|
|
|
@ -120,7 +119,7 @@ pub fn spawn(opt: impl Into<Options>) -> (mpsc::Sender<PathBuf>, impl Stream<Ite
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
spawn_with_cancel(opt.into(), NeverFuture.map(|_| ()))
|
|
|
|
|
spawn_with_cancel(opt, NeverFuture.map(|_| ()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Raw handler for io_uring file hashing.
|
|
|
|
@ -149,21 +148,25 @@ where C: Future<Output = ()> + 'static + Unpin + Send
|
|
|
|
|
while let Some(path) = tokio::select!{
|
|
|
|
|
n = recv.recv() => n,
|
|
|
|
|
_ = &mut cancel => None,
|
|
|
|
|
} { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks.
|
|
|
|
|
} {
|
|
|
|
|
let ret = r_tx.clone();
|
|
|
|
|
let sem = sem.clone();
|
|
|
|
|
let h_tx = h_tx.clone();
|
|
|
|
|
tokio_uring::spawn(async move {
|
|
|
|
|
let _h_tx = h_tx;
|
|
|
|
|
|
|
|
|
|
let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await {
|
|
|
|
|
Some(Err(_)) => return, // Semaphore has been closed.
|
|
|
|
|
Some(Err(_e)) => return, // Semaphore has been closed.
|
|
|
|
|
Some(Ok(v)) => Some(v),
|
|
|
|
|
None => None,
|
|
|
|
|
};
|
|
|
|
|
let _h_tx = h_tx;
|
|
|
|
|
|
|
|
|
|
eprintln!("Got file: {:?}", path);
|
|
|
|
|
|
|
|
|
|
let mut file = OpenOptions::new()
|
|
|
|
|
.read(true)
|
|
|
|
|
.open(&path).await.unwrap();
|
|
|
|
|
eprintln!("Opened file {:?}",path);
|
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
|
let ring_res = uring_read(&mut file, |buffer| {
|
|
|
|
|
if ret.is_closed() {
|
|
|
|
@ -196,21 +199,31 @@ where C: Future<Output = ()> + 'static + Unpin + Send
|
|
|
|
|
}
|
|
|
|
|
];
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//Yield the current task to allow the newly spawned one to acquire the semaphore.
|
|
|
|
|
//XXX: Is this a safe way of passing the semaphore to the task?
|
|
|
|
|
tokio::task::yield_now().await;
|
|
|
|
|
}
|
|
|
|
|
// --- End of new inputs
|
|
|
|
|
//XXX: FUUUUUUUCK why can't i just acquire_owned() without using Arc? Fucking hell...
|
|
|
|
|
|
|
|
|
|
//let _sem = sem.as_ref().map(|x| x.try_acquire_many(x.available_permits() as u32).unwrap());
|
|
|
|
|
|
|
|
|
|
// Drop the master refcount of `h_tx`.
|
|
|
|
|
drop(h_tx);
|
|
|
|
|
|
|
|
|
|
// Drop sender that we're cloning from
|
|
|
|
|
drop(r_tx);
|
|
|
|
|
// Drop the semaphore source refcount
|
|
|
|
|
if let Some(sem) = sem {
|
|
|
|
|
sem.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error.
|
|
|
|
|
let _ = h_rx.await;
|
|
|
|
|
|
|
|
|
|
// Close and drop the semaphore source refcount.
|
|
|
|
|
if let Some(sem) = &sem {
|
|
|
|
|
sem.close();
|
|
|
|
|
}
|
|
|
|
|
drop(_sem);
|
|
|
|
|
drop(sem);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
ReceiverStream::new(r_rx)
|
|
|
|
|