use super::*; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::{ RwLock, mpsc, }; use generational_arena::{ Arena,Index, }; use futures::prelude::*; use user::{User, UserID}; use post::{Post, PostID}; mod freeze; pub use freeze::*; /// An `Arc>` wrapper type SharedMut = Arc>; /// A shared pointer to a post pub type SharedPost = SharedMut; /// A shared pointer to a user pub type SharedUser = SharedMut; #[derive(Debug)] struct Oneesan { users: Arena>>, posts: Arena>>, /// Maps `UserID`s to indexes in the `users` arena. users_map: HashMap, /// Maps `PostID`s to indexies in the `posts` arena. posts_map: HashMap, /// Maps `UserID`s to the user's owned posts in the `posts` arena. posts_user_map: HashMap>, } #[derive(Debug)] struct Inner { /// The posts and user state. oneesan: RwLock, } /// Contains all posts and users #[derive(Debug, Clone)] pub struct State(Arc); impl State { /// Create a new empty state pub fn new() -> Self { Self(Arc::new( Inner { oneesan: RwLock::new(Oneesan { users: Arena::new(), posts: Arena::new(), users_map: HashMap::new(), posts_map: HashMap::new(), posts_user_map: HashMap::new(), }) } )) } /// Insert a new user into state. /// /// # Locks /// This function holds the state write lock while performing insertions. pub async fn insert_user(&self, user: User) -> SharedUser { let user_id = *user.id(); let nuser = Arc::new(RwLock::new(user)); let mut write = self.0.oneesan.write().await; let idx = write.users.insert(Arc::clone(&nuser)); write.users_map.insert(user_id, idx); nuser } /// Insert a new post into state. /// /// # Locks /// This function holds the state write lock while performing insertions. pub async fn insert_post(&self, owner: UserID, post: Post) -> SharedPost { let post_id =*post.post_id(); let npost = Arc::new(RwLock::new(post)); let mut write = self.0.oneesan.write().await; let idx = write.posts.insert(Arc::clone(&npost)); write.posts_map.insert(post_id, idx); write.posts_user_map.entry(owner).or_insert_with(|| MaybeVec::new()).push(idx); npost } /// Get a shared reference to a user by this ID, if it exists. /// /// # Locks /// This functions holds the state read lock while performing lookups. /// # Panics /// If the internal ID mappings are invalid pub async fn get_user_by_id(&self, id: UserID) -> Option { let read = self.0.oneesan.read().await; read.users_map.get(&id).map(|&idx| read.users.get(idx).unwrap().clone()) } /// Get a shared reference to a post by this ID, if it exists. /// /// # Locks /// This functions holds the state read lock while performing lookups. /// # Panics /// If the internal ID mappings are invalid pub async fn get_post_by_id(&self, id: PostID) -> Option { let read = self.0.oneesan.read().await; read.posts_map.get(&id).map(|&idx| read.posts.get(idx).unwrap().clone()) } /// Consume into a stream that yields all users lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. pub fn all_users_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, user) in onee.users.iter() { if tx.send(user.clone()).await.is_err() { break; } } }); rx } /// Consume into a stream that yields all posts created by this user lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. /// /// # Panics /// The background task will panic and drop the producer if the internal ID mappings are invalid pub fn all_posts_by_user_stream(self: Arc, user: UserID) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; if let Some(map) = onee.posts_user_map.get(&user) { for &post in map.iter() { if tx.send(onee.posts.get(post).unwrap().clone()).await.is_err() { break; } } } }); rx } /// Consume into a stream that yields all posts lazily. /// /// # Locks /// The stream producer holds the Oneesan *read* lock until the stream is consumed or dropped. pub fn all_posts_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc_channel!(defaults::STATE_STREAM_BUFFER_SIZE); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, post) in onee.posts.iter() { if tx.send(post.clone()).await.is_err() { break; } } }); rx } }