added configurable message passing buffer size

new-idea
Avril 4 years ago
parent 519cd814eb
commit 85e6df6dc5
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -8,9 +8,11 @@ edition = "2018"
[features]
default = ["nightly", "short-mnemonics"]
short-mnemonics = []
nightly = ["smallvec/const_generics"]
# Use shortened mnemonics for post IDs
short-mnemonics = []
[dependencies]
ad-hoc-iter = "0.2.3"
base64 = "0.13.0"

@ -1,4 +1,5 @@
use crate::mnemonic::MnemonicSaltKind;
use std::num::NonZeroUsize;
/// Default anonymous name
pub const ANON_NAME: &'static str = "名無し";
@ -45,3 +46,9 @@ pub const MNEMONIC_SALT: MnemonicSaltKind = MnemonicSaltKind::Random;
///
/// Set to `0` for unlimited.
pub const MAX_IMAGE_READ_SIZE: usize = (1024 * 1024 * 1024) * 3; // 3GB
/// The size of state stream message passing buffers.
///
/// Must be 1 or larger.
pub const STATE_STREAM_BUFFER_SIZE: usize = 1;
static_assert!(STATE_STREAM_BUFFER_SIZE > 0);

@ -814,3 +814,27 @@ pub use cancel::{
}
}
}
cfg_if!{
if #[cfg(feature="unlimited-buffers")] {
//TODO: Create mpsc wrapper for unlimited channel that works like normal mpsc channel
#[macro_export] macro_rules! mpsc_channel {
($sz:expr) => (::tokio::sync::mpsc::channel($sz));
}
} else {
/// Create an mpsc channel
///
/// # Intended for state
#[macro_export] macro_rules! mpsc_channel {
($sz:expr) => (::tokio::sync::mpsc::channel($sz));
}
}
}
/// A static assertions
#[macro_export] macro_rules! static_assert {
($expr:expr) => {
const _:[();1] = [(); (($expr) as bool) as usize];
}
}

@ -73,6 +73,7 @@ impl State
/// Insert a new post
pub async fn insert_new_post(&self, post: Post) -> SharedPost
{
todo!("Rewrite");
let mut onee = self.0.oneesan.write().await;
let id = *post.post_id();
@ -90,9 +91,12 @@ impl State
}
/// Consume into a stream that yields all users lazily.
///
/// # Locks
/// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped.
pub fn all_users_stream(self: Arc<Self>) -> impl Stream<Item=SharedUser>
{
let (mut tx, rx) = mpsc::channel(1);
let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE);
tokio::spawn(async move {
let onee = self.0.oneesan.read().await;
for (_, user) in onee.users.iter()
@ -106,9 +110,12 @@ impl State
}
/// Consume into a stream that yields all posts lazily.
///
/// # Locks
/// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped.
pub fn all_posts_stream(self: Arc<Self>) -> impl Stream<Item=SharedPost>
{
let (mut tx, rx) = mpsc::channel(1);
let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE);
tokio::spawn(async move {
let onee = self.0.oneesan.read().await;
for (_, post) in onee.posts.iter()

Loading…
Cancel
Save