inode cache works

redo-gragh
Avril 4 years ago
parent 99e740eb97
commit 68d432710f
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,5 +1,13 @@
//! Datatypes for the program
use super::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{
RwLock,
mpsc,
watch,
};
use std::fs::Metadata;
/// A raw file or directory inode number
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
@ -14,9 +22,127 @@ impl INode
Self(ino)
}
/// Get `ino` from an `fs::Metadata` object.
#[inline] pub fn new(meta: &Metadata) -> Self
{
use std::os::unix::fs::MetadataExt as _;
Self(meta.ino())
}
/// Convert into raw `u64` inode number.
#[inline] pub const fn into_inner(self) -> u64
{
self.0
}
}
/// A valid file system info
///
/// # Note
/// This does not contains the INode of the fs object itself, that is considered that key to the table.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FsInfo
{
File(u64, INode), //Size, parent dir inode
Directory,
}
#[derive(Debug)]
struct INodeCache
{
cache: RwLock<HashMap<INode, FsInfo>>,
}
/// Cache of `INode`s and their associated infos
#[derive(Debug, Clone)]
pub struct Cache(Arc<INodeCache>, mpsc::Sender<(INode, FsInfo)>, watch::Receiver<bool>);
impl Cache
{
/// Lookup this `INode` in the cache.
///
/// # Usage
/// Should be used to check if directory `INode`s have already been added, if so, they don't need to be walked.
pub async fn get(&self, node: &INode) -> Option<FsInfo>
{
self.0.cache.read().await.get(node).map(Clone::clone)
}
/// Insert an `INode`'s information into the cache.
pub async fn insert(&mut self, node: INode, val: FsInfo)
{
self.1.send((node, val)).await.expect("Failed to send to caching task: Receiver dropped or closed");
}
/// Create a new, empty, cache.
pub fn new() -> Self
{
let inner = Arc::new(INodeCache {
cache: RwLock::new(HashMap::new()),
});
let (tx, rx) = mpsc::channel(24);
let (e_tx, e_rx) = watch::channel(true);
tokio::spawn({
let cache = Arc::clone(&inner);
async move {
use futures::prelude::*;
{
let mut rx = rx
.gate_with_timeout(60, duration!(100 ms))
.lag(duration!(10 ms));
while let Some(elements) = rx.next().await
{
let mut writer = cache.cache.write().await;
writer.extend(elements.into_iter());
}
drop(cache); // drop the Arc before broadcasting we're done
}
let _ = e_tx.broadcast(false);
}
});
Self(inner, tx, e_rx)
}
/// Clone the current cache table.
///
/// # Notes
/// This table may be out of date. Use `try_complete` to wait for a completed version of the table and return that without cloning.
pub async fn clone_current_table(&self) -> HashMap<INode, FsInfo>
{
self.0.cache.read().await.clone()
}
/// Consume into a `Future` that returns the completely filled `HashMap` when background task has finished inserting elements.
///
/// # Note
/// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`.
pub fn try_complete(self) -> impl futures::Future<Output=Result<HashMap<INode, FsInfo>, Self>> + 'static
{
async move {
if Arc::strong_count(&self.0) > 2 {
return Err(self); // there is another other than the background task holding the shared cache reference
}
let Cache(arc, sender, mut watch) = self;
drop(sender); // drop sender, allowing background task to finish.
while Arc::strong_count(&arc) > 1 && *watch.borrow() {
match watch.recv().await {
Some(false) | None => {
// background task has closed
drop(watch);
break;
},
Some(true) => continue,
}
}
match Arc::try_unwrap(arc)
{
Ok(inner) => {
Ok(inner.cache.into_inner())
},
#[cold] Err(_) => unreachable!("Arc should have only 1 strong reference now. This should never happend"),
}
}
}
}

@ -18,6 +18,18 @@ pub mod prelude
pub use super::StreamLagExt as _;
}
pub trait INodeExt
{
/// Get the `ino` of this fs object metadata.
fn inode(&self) -> data::INode;
}
impl INodeExt for std::fs::Metadata
{
#[inline] fn inode(&self) -> data::INode {
data::INode::new(self)
}
}
/// A gated stream that releases every N items from the backing stream.
#[pin_project]
@ -116,7 +128,7 @@ where S: Stream
impl<S> Stream for GatedStream<S, S::Item>
where S: Stream
{
type Item = Box<[S::Item]>;
type Item = Vec<S::Item>;//Box<[S::Item]>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while self.buffer.len() < self.release_at
@ -162,7 +174,7 @@ where S: Stream
impl<S> Stream for TimedGatedStream<S, S::Item>
where S: Stream
{
type Item = Box<[S::Item]>;
type Item = Vec<S::Item>;//Box<[S::Item]>;
#[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.interval.poll_next_unpin(cx)

Loading…
Cancel
Save