You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
4.1 KiB

//! Async operations
use super::*;
use std::{
num::NonZeroUsize,
convert::{TryFrom, TryInto,},
path::Path,
sync::Arc,
};
use futures::{
future::{
OptionFuture,
FutureExt,
join_all,
},
};
use tokio::{
sync::{
Semaphore,
},
fs::{
OpenOptions,
File,
self,
},
};
use error::{Error, ErrorKind};
cfg_if!{
if #[cfg(feature="limit-concurrency")] {
pub const MAX_WORKERS: Option<NonZeroUsize> = Some(unsafe {NonZeroUsize::new_unchecked(4096)});
} else {
pub const MAX_WORKERS: Option<NonZeroUsize> = None;
}
}
fn gensem() -> Option<Arc<Semaphore>>
{
trace!("Limiting concurrency to {:?}", MAX_WORKERS);
match MAX_WORKERS {
Some(nz) => Some(Arc::new(Semaphore::new(nz.into()))),
None => None,
}
}
async fn unlink(path: &Path) -> Result<(), Error>
{
let tmp = temp::TempFile::new_in(path.parent().unwrap());
fs::copy(path, &tmp).await.map_err(|e| Error::new(ErrorKind::Copy(e), path.to_owned()))?;
fs::remove_file(path).await.map_err(|e| Error::new(ErrorKind::Unlink(e), path.to_owned()))?;
fs::rename(&tmp, path).await.map_err(|e| Error::new(ErrorKind::Move(e), path.to_owned()))?;
tmp.release(); // file no longer exists, so no need to drop;
Ok(())
}
async fn work<P: AsRef<Path>>(apath: P, sem: Option<Arc<Semaphore>>) -> Result<(P, bool), Error>
{
let path = apath.as_ref();
let _lock = OptionFuture::from(sem.map(Semaphore::acquire_owned)).await;
let file = OpenOptions::new()
.read(true)
.open(path).await
.map_err(|e| (ErrorKind::Open(e), path))?;
let meta = match file.metadata().await {
Ok(meta) => meta,
Err(err) => {
debug!("Failed to stat file: {}", err);
warn!("Failed to stat {:?}, skipping", path);
return Err((ErrorKind::Stat(err), path).into());
},
};
use std::os::unix::fs::MetadataExt;
let nlink = meta.nlink();
debug!("<{:?}> has {} links", path, nlink);
if nlink > 1 {
//todo work i guess fuck it
unlink(path).await?;
Ok((apath, true))
} else {
Ok((apath, false))
}
}
pub async fn main<I: IntoIterator<Item=String>>(list: I) -> eyre::Result<()>
{
let sem = gensem();
let list = list.into_iter();
let mut failures = match list.size_hint() {
(0, Some(0)) | (0, None) => Vec::new(),
(x, None) | (_, Some(x)) => Vec::with_capacity(x),
};
let mut done = 0usize;
for (i, res) in (0usize..).zip(join_all(list.map(|file| tokio::spawn(work(file, sem.clone()))))
.map(|x| {trace!("--- {} Finished ---", x.len()); x}).await)
{
//trace!("Done on {:?}", res);
match res {
Ok(Ok((path, true))) => info!("<{:?}> OK (processed)", path),
Ok(Ok((path, false))) => info!("<{:?}> OK (skipped)", path),
Err(e) => {
trace!("child {} cancelled by {}", i, if e.is_panic(){"panic"} else {"cancel"});
if e.is_panic() {
return Err(eyre!("Child {} panic", i))
.with_error(move || e)
.with_warning(|| "This suggests a bug in the program");
} else {
warn!("Child {} cancelled", i);
return Ok(());
}
},
Ok(Err(kind)) if !kind.kind().is_skippable() => {
failures.push((kind.path().to_owned(), kind.to_string()));
let fuck = format!("{:?}", kind.path());
let sug = kind.kind().suggestion();
let err = Err::<std::convert::Infallible, _>(kind)
.wrap_err_with(|| eyre!("<{}> Failed", fuck))
.with_section(move || fuck.header("Path was"))
.with_suggestion(|| sug)
.unwrap_err();
error!("{}", err);
debug!("Error: {:?}", err);
},
Ok(Err(k)) => {
failures.push((k.path().to_owned(), k.to_string()));
trace!("<{:?}> Failed (skipped)", k.path());
},
}
done+=1;
}
if failures.len() > 0 {
return Err(eyre!("{}/{} tasks failed to complete successfullly", failures.len(), done))
.with_section(|| failures.into_iter()
.map(|(x, err)| format!("{}: {}", x.into_os_string()
.into_string()
.unwrap_or_else(|os| os.to_string_lossy().into_owned()), err))
.join("\n")
.header("Failed tasks:"))
.with_suggestion(|| "Run with `RUST_LOG=debug` or `RUST_LOG=trace` for verbose error reporting");
}
Ok(())
}