use super::*; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::{ RwLock, mpsc, }; use generational_arena::{ Arena,Index, }; use futures::prelude::*; use user::{User, UserID}; use post::{Post, PostID}; mod freeze; pub use freeze::*; /// An `Arc>` wrapper type SharedMut = Arc>; /// A shared pointer to a post pub type SharedPost = SharedMut; /// A shared pointer to a user pub type SharedUser = SharedMut; #[derive(Debug)] struct Oneesan { users: Arena>>, posts: Arena>>, /// Maps `UserID`s to indexes in the `users` arena. users_map: HashMap, /// Maps `PostID`s to indexies in the `posts` arena. posts_map: HashMap, /// Maps `UserID`s to the user's owned posts in the `posts` arena. posts_user_map: HashMap>, } #[derive(Debug)] struct Inner { /// The posts and user state. oneesan: RwLock, } /// Contains all posts and users #[derive(Debug, Clone)] pub struct State(Arc); impl State { /// Create a new empty state pub fn new() -> Self { Self(Arc::new( Inner { oneesan: RwLock::new(Oneesan { users: Arena::new(), posts: Arena::new(), users_map: HashMap::new(), posts_map: HashMap::new(), posts_user_map: HashMap::new(), }) } )) } /// Insert a new post pub async fn insert_new_post(&self, post: Post) -> SharedPost { let mut onee = self.0.oneesan.write().await; let id = *post.post_id(); if let Some(&owner) = post.owner_id() { //TODO: Add `index` to `posts_user_map`. (how do we get `index` before this) } let post = Arc::new(RwLock::new(post)); //TODO: If post exists with the same ID, remove it first. let index = onee.posts.insert(Arc::clone(&post)); onee.posts_map.insert(id, index); post } /// Consume into a stream that yields all users lazily. pub fn all_users_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc::channel(1); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, user) in onee.users.iter() { if tx.send(user.clone()).await.is_err() { break; } } }); rx } /// Consume into a stream that yields all posts lazily. pub fn all_posts_stream(self: Arc) -> impl Stream { let (mut tx, rx) = mpsc::channel(1); tokio::spawn(async move { let onee = self.0.oneesan.read().await; for (_, post) in onee.posts.iter() { if tx.send(post.clone()).await.is_err() { break; } } }); rx } }