diff --git a/src/handle.rs b/src/handle.rs index 1c379f2..698942c 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -92,9 +92,8 @@ impl From<()> for Options /// /// # Returns /// The input stream and output stream `(tx, rx)`. -pub fn spawn_with_cancel(opt: impl Into, cancel: impl Future + 'static + Send + Unpin) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +pub fn spawn_with_cancel(opt: Options, cancel: impl Future + 'static + Send + Unpin) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) { - let opt = opt.into(); let (tx, rx) = mpsc::channel(opt.forward_pressure.into()); let rx = file_handler(rx,cancel , match opt.max_operations { Some(n) => n.into(), @@ -108,7 +107,7 @@ pub fn spawn_with_cancel(opt: impl Into, cancel: impl Future) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) +pub fn spawn(opt: Options) -> (mpsc::Sender, impl Stream)> + Send + Sync + 'static) { #[derive(Debug)] struct NeverFuture; @@ -120,7 +119,7 @@ pub fn spawn(opt: impl Into) -> (mpsc::Sender, impl Stream + 'static + Unpin + Send while let Some(path) = tokio::select!{ n = recv.recv() => n, _ = &mut cancel => None, - } { //TODO: We can add a cancellation mechanism here, since the semaphore is closed whenever this loop breaks. + } { let ret = r_tx.clone(); let sem = sem.clone(); let h_tx = h_tx.clone(); tokio_uring::spawn(async move { - let _h_tx = h_tx; + let _sem = match OptionFuture::from(sem.as_ref().map(|x| Semaphore::acquire(x))).await { - Some(Err(_)) => return, // Semaphore has been closed. + Some(Err(_e)) => return, // Semaphore has been closed. Some(Ok(v)) => Some(v), None => None, }; + let _h_tx = h_tx; + eprintln!("Got file: {:?}", path); + let mut file = OpenOptions::new() .read(true) .open(&path).await.unwrap(); + eprintln!("Opened file {:?}",path); let mut hasher = Sha256::new(); let ring_res = uring_read(&mut file, |buffer| { if ret.is_closed() { @@ -196,21 +199,31 @@ where C: Future + 'static + Unpin + Send } ]; }); + + //Yield the current task to allow the newly spawned one to acquire the semaphore. + //XXX: Is this a safe way of passing the semaphore to the task? + tokio::task::yield_now().await; } // --- End of new inputs + //XXX: FUUUUUUUCK why can't i just acquire_owned() without using Arc? Fucking hell... + + //let _sem = sem.as_ref().map(|x| x.try_acquire_many(x.available_permits() as u32).unwrap()); // Drop the master refcount of `h_tx`. drop(h_tx); // Drop sender that we're cloning from drop(r_tx); - // Drop the semaphore source refcount - if let Some(sem) = sem { - sem.close(); - } - + // When the sender is dropped (i.e. after the thread completes), this will stop waiting and return an error. let _ = h_rx.await; + + // Close and drop the semaphore source refcount. + if let Some(sem) = &sem { + sem.close(); + } + drop(_sem); + drop(sem); }); }); ReceiverStream::new(r_rx) diff --git a/src/main.rs b/src/main.rs index 9d47c4b..5c474a7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,23 @@ use std::path::PathBuf; mod handle; -fn main() { - println!("Hello, world!"); +#[tokio::main] +async fn main() { + let (tx, mut rx) = handle::spawn(Default::default()); + let _res = tokio::join![ + tokio::spawn(async move { + for file in ["./src/main.rs", "./src/handle.rs"] + { + eprintln!("Sending {}", file); + tx.send(std::path::Path::new(file).into()).await.unwrap(); + } + }), + tokio::spawn(async move { + use futures::prelude::*; + while let Some((file, hash)) = rx.next().await { + println!("Got file hash for {:?}: {:?}", file, hash); + } + }), + ]; + eprintln!("Done: {:?}", _res); }