use super::*; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::{ RwLock, mpsc, }; use generational_arena::{ Arena,Index, }; use futures::prelude::*; use user::{User, UserID}; use post::{Post, PostID}; mod freeze; pub use freeze::*; /// An `Arc>` wrapper type SharedMut = Arc>; /// A shared pointer to a post pub type SharedPost = SharedMut; /// A shared pointer to a user pub type SharedUser = SharedMut; #[derive(Debug)] struct Oneesan { users: Arena>>, posts: Arena>>, /// Maps `UserID`s to indexes in the `users` arena. users_map: HashMap, /// Maps `PostID`s to indexies in the `posts` arena. posts_map: HashMap, /// Maps `UserID`s to the user's owned posts in the `posts` arena. posts_user_map: HashMap>, } #[derive(Debug)] struct Inner { /// The posts and user state. oneesan: RwLock, } /// Contains all posts and users #[derive(Debug, Clone)] pub struct State(Arc); impl State { /// Create a new empty state pub fn new() -> Self { Self(Arc::new( Inner { oneesan: RwLock::new(Oneesan { users: Arena::new(), posts: Arena::new(), users_map: HashMap::new(), posts_map: HashMap::new(), posts_user_map: HashMap::new(), }) } )) } /// Get a shared reference to a user by this ID, if it exists. /// /// # Locks /// This functions holds the state read lock while performing lookups. /// # Panics /// If the internal ID mappings are invalid pub async fn get_user_by_id(&self, id: UserID) -> Option { let read = self.0.oneesan.read().await; read.users_map.get(&id).map(|&idx| read.users.get(idx).unwrap().clone()) } /// Get a shared reference to a post by this ID, if it exists. /// /// # Locks /// This functions holds the state read lock while performing lookups. /// # Panics /// If the internal ID mappings are invalid pub async fn get_post_by_id(&self, id: PostID) -> Option { let read = self.0.oneesan.read().await; read.posts_map.get(&id).map(|&idx| read.posts.get(idx).unwrap().clone()) } /// Consume into a stream that yields all users lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. pub fn all_users_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, user) in onee.users.iter() { if tx.send(user.clone()).await.is_err() { break; } } }); rx } /// Consume into a stream that yields all posts created by this user lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. /// /// # Panics /// The background task will panic and drop the producer if the internal ID mappings are invalid pub fn all_posts_by_user_stream(self: Arc, user: UserID) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; if let Some(map) = onee.posts_user_map.get(&user) { for &post in map.iter() { if tx.send(onee.posts.get(post).unwrap().clone()).await.is_err() { break; } } } }); rx } /// Consume into a stream that yields all posts lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. pub fn all_posts_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, post) in onee.posts.iter() { if tx.send(post.clone()).await.is_err() { break; } } }); rx } }