You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dirstat/src/data/cache.rs

116 lines
3.1 KiB

use super::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{
RwLock,
mpsc,
watch,
};
#[derive(Debug)]
struct INodeCache
{
cache: RwLock<HashMap<INode, FsInfo>>,
}
/// Cache of `INode`s and their associated infos
#[derive(Debug, Clone)]
pub struct Cache(Arc<INodeCache>, mpsc::Sender<(INode, FsInfo)>, watch::Receiver<bool>);
impl Cache
{
/// Lookup this `INode` in the cache.
///
/// # Usage
/// Should be used to check if directory `INode`s have already been added, if so, they don't need to be walked.
pub async fn get(&self, node: &INode) -> Option<FsInfo>
{
self.0.cache.read().await.get(node).map(Clone::clone)
}
/// Insert an `INode`'s information into the cache.
pub async fn insert(&mut self, node: INode, val: FsInfo)
{
self.1.send((node, val)).await.expect("Failed to send to caching task: Receiver dropped or closed");
}
/// Create a new, empty, cache.
pub fn new() -> Self
{
let inner = Arc::new(INodeCache {
cache: RwLock::new(HashMap::new()),
});
let (tx, rx) = mpsc::channel(CACHE_SEND_BUFFER_SIZE);
let (e_tx, e_rx) = watch::channel(true);
tokio::spawn({
let cache = Arc::clone(&inner);
async move {
use futures::prelude::*;
{
let mut rx = rx
.gate_with_timeout(CACHE_GATE_BUFFER_SIZE, CACHE_GATE_TIMEOUT)
.lag(CACHE_GATED_LAG);
while let Some(elements) = rx.next().await
{
let mut writer = cache.cache.write().await;
writer.extend(elements.into_iter());
}
drop(cache); // drop the Arc before broadcasting we're done
}
let _ = e_tx.broadcast(false);
}
});
Self(inner, tx, e_rx)
}
/// Clone the current cache table.
///
/// # Notes
/// This table may be out of date. Use `try_complete` to wait for a completed version of the table and return that without cloning.
pub async fn clone_current_table(&self) -> HashMap<INode, FsInfo>
{
self.0.cache.read().await.clone()
}
/// Consume into a `Future` that returns the completely filled `HashMap` when background task has finished inserting elements.
///
/// # Note
/// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`.
pub fn try_complete(self) -> impl futures::Future<Output=Result<HashMap<INode, FsInfo>, Self>> + 'static
{
#[inline(never)]
#[cold]
fn only_one() -> !
{
unreachable!("Arc should have only 1 strong reference now. This should never happend")
}
async move {
if Arc::strong_count(&self.0) > 2 {
return Err(self); // there is another other than the background task holding the shared cache reference
}
let Cache(arc, sender, mut watch) = self;
drop(sender); // drop sender, allowing background task to finish.
while Arc::strong_count(&arc) > 1 && *watch.borrow() {
match watch.recv().await {
Some(false) | None => {
// background task has closed
drop(watch);
break;
},
Some(true) => continue,
}
}
match Arc::try_unwrap(arc)
{
Ok(inner) => {
Ok(inner.cache.into_inner())
},
Err(_) => only_one(),
}
}
}
}