diff --git a/src/data.rs b/src/data.rs index 5c708bc..b52cc17 100644 --- a/src/data.rs +++ b/src/data.rs @@ -7,8 +7,18 @@ use tokio::sync::{ mpsc, watch, }; +use tokio::time::Duration; use std::fs::Metadata; +/// How many unprocessed insertion requests are allowed in the backlog before the insertion stream backpressures? +pub const CACHE_SEND_BUFFER_SIZE: usize = 30; +/// How many insertion requests should be grouped before emitting them all as blocks downstream. +pub const CACHE_GATE_BUFFER_SIZE: usize = 80; +/// How long should a partial block be kept in the gate buffer with an open upstream before being sent by force. +pub const CACHE_GATE_TIMEOUT: Duration = duration!(100 ms); +/// How long should the insertion stream be forced to wait before accepting new blocks to insert. +pub const CACHE_GATED_LAG: Duration = duration!(10 ms); + /// A raw file or directory inode number /// /// Ususally created from the `.inode()` extension method on `fs::Metadata` found in prelude. @@ -99,7 +109,7 @@ impl Cache let inner = Arc::new(INodeCache { cache: RwLock::new(HashMap::new()), }); - let (tx, rx) = mpsc::channel(24); + let (tx, rx) = mpsc::channel(CACHE_SEND_BUFFER_SIZE); let (e_tx, e_rx) = watch::channel(true); tokio::spawn({ @@ -108,8 +118,8 @@ impl Cache use futures::prelude::*; { let mut rx = rx - .gate_with_timeout(60, duration!(100 ms)) - .lag(duration!(10 ms)); + .gate_with_timeout(CACHE_GATE_BUFFER_SIZE, CACHE_GATE_TIMEOUT) + .lag(CACHE_GATED_LAG); while let Some(elements) = rx.next().await { let mut writer = cache.cache.write().await;