From 7142f865f0577cfb05a505d9e49b716cf1608f2d Mon Sep 17 00:00:00 2001 From: Geoffrey Allott Date: Fri, 5 Feb 2021 00:47:27 +0000 Subject: [PATCH] minimal chat server working --- src/api.rs | 2 +- src/auth.rs | 2 +- src/client.rs | 47 ++++++++++---- src/game.rs | 7 ++- src/gamestate.rs | 14 +++++ src/main.rs | 159 ++++++++++++++++++++++++++++++++++++++++++----- src/server.rs | 102 ++++++++++++++++++++++++++---- 7 files changed, 292 insertions(+), 41 deletions(-) diff --git a/src/api.rs b/src/api.rs index 17cb91d..8daac46 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,7 +2,7 @@ use crate::auth::Auth; use crate::game::{Action, Game, GameSettings, GameSummary, UserAction}; #[derive(Debug, Clone, Deserialize)] -enum Scope { +pub enum Scope { Global, Game { id: u32 }, Hand { id: u32 }, diff --git a/src/auth.rs b/src/auth.rs index c25f8d2..fd3049a 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -6,7 +6,7 @@ pub enum Auth { } impl Auth { - pub fn verify(&self, challenge: &str, signature: &str) -> bool { + pub fn verify(&self, _challenge: &str, signature: &str) -> bool { match self { Auth::NoLogin => false, Auth::Plain{password} => signature == password, diff --git a/src/client.rs b/src/client.rs index f2006a1..2bb2e39 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, RwLock}; +use std::collections::HashSet; use futures::stream::{Stream, StreamExt, empty, iter, once}; @@ -21,10 +21,11 @@ pub enum ClientState { }, } -pub enum ClientUpdate { +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ClientInterest { GameList, - Game, - Users, + Game { id: u32 }, + User { username: String }, } #[derive(Debug, Clone)] @@ -41,11 +42,11 @@ impl ConnectionState { } } - pub async fn retrieve_updates(&mut self, update: ClientUpdate) -> impl Stream { + pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream { match update { - ClientUpdate::GameList => empty().boxed(), // TODO - ClientUpdate::Game => match &mut self.client { - ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} => { + ClientInterest::GameList => empty().boxed(), // TODO + ClientInterest::Game{id} => match &mut self.client { + ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} if game.id() == id => { let from = game.actions_len(); match self.server.update_game_state(game).await { Ok(()) => iter(game.update_view_for(username, from)).map(|action| ServerMessage::NewAction{action}).boxed(), @@ -54,10 +55,27 @@ impl ConnectionState { } _ => empty().boxed(), } - ClientUpdate::Users => empty().boxed(), // TODO + ClientInterest::User{..} => empty().boxed(), // TODO } } + pub fn interests(&self) -> HashSet { + let mut ret = HashSet::new(); + if let ClientState::LoggedIn{ref username, ref state} = &self.client { + let username = username.to_string(); + ret.insert(ClientInterest::User{username}); + ret.insert(ClientInterest::GameList); + if let LoggedInState::InGame{ref game} = state { + ret.insert(ClientInterest::Game{id: game.id()}); + for username in game.players() { + let username = username.to_string(); + ret.insert(ClientInterest::User{username}); + } + } + } + ret + } + pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage { match (&mut self.client, message) { (_, ClientMessage::CreateUser{username, auth, nickname}) => { @@ -74,19 +92,20 @@ impl ConnectionState { (ClientState::LoginAuthIssued{username, challenge}, ClientMessage::LoginAuthResponse{signature}) => { if self.server.verify(&username, &challenge, &signature).await { self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby}; + self.server.register_interests(self.interests()).await; ServerMessage::LoginSuccess } else { self.client = ClientState::Connected; ServerMessage::LoginFailure{reason: "Invalid username or password".to_string()} } } - (ClientState::LoggedIn{username, ..}, ClientMessage::GetGameList{tags}) => { + (ClientState::LoggedIn{..}, ClientMessage::GetGameList{..}) => { match self.server.get_game_list().await { Ok(games) => ServerMessage::GameList{games}, Err(err) => ServerMessage::GameListFailure{reason: err.to_string()}, } } - (ClientState::LoggedIn{username, ..}, ClientMessage::CreateGame{settings}) => { + (ClientState::LoggedIn{..}, ClientMessage::CreateGame{settings}) => { match self.server.create_game(settings).await { Ok(id) => ServerMessage::CreateGameSuccess{id}, Err(err) => ServerMessage::CreateGameFailure{reason: err.to_string()}, @@ -97,6 +116,7 @@ impl ConnectionState { Ok(game) => { let game_view = game.view_for(&username); self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InGame{game}}; + self.server.register_interests(self.interests()).await; ServerMessage::JoinGameSuccess{game: game_view} } Err(err) => ServerMessage::JoinGameFailure{reason: err.to_string()}, @@ -115,6 +135,7 @@ impl ConnectionState { Err(err) => return ServerMessage::TakeActionFailure{reason: err.to_string()}, } Ok(ActionStatus::Interrupted) => { + debug!("Action {:?} was interrupted - updating game state", action); if let Err(err) = self.server.update_game_state(game).await { return ServerMessage::TakeActionFailure{reason: err.to_string()}; } @@ -125,10 +146,12 @@ impl ConnectionState { } (ClientState::LoggedIn{username, state: LoggedInState::InGame{..}}, ClientMessage::LeaveGame) => { self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby}; + self.server.register_interests(self.interests()).await; ServerMessage::LeaveGameSuccess } - (ClientState::LoggedIn{username, ..}, ClientMessage::Logout) => { + (ClientState::LoggedIn{..}, ClientMessage::Logout) => { self.client = ClientState::Connected; + self.server.register_interests(self.interests()).await; ServerMessage::LogoutSuccess } (ClientState::LoggedIn{username, ..}, ClientMessage::ChangeAuth{auth}) => { diff --git a/src/game.rs b/src/game.rs index ce72da5..b4d8f07 100644 --- a/src/game.rs +++ b/src/game.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::fmt::Debug; use crate::card::Card; @@ -109,6 +110,10 @@ impl Game { self.state.actions_len() } + pub fn players(&self) -> HashSet<&str> { + self.state.players() + } + pub fn view_for(&self, username: &str) -> Game { Game { summary: self.summary.clone(), @@ -127,7 +132,7 @@ impl Game { Action::Join{seat: 0, chips: 0} if self.state.user_has_joined(username) => Err(ActionError::AlreadyJoined), Action::Join{seat: 0, chips: 0} if self.state.num_players() + 1 > max_users => Err(ActionError::NoSeatAvailable), Action::Join{seat: 0, chips: 0} => Ok(()), - Action::Message{ref message} if self.state.user_has_joined(username) => Ok(()), + Action::Message{..} if self.state.user_has_joined(username) => Ok(()), Action::Message{..} => Err(ActionError::NotAuthorised), Action::Leave if self.state.user_has_joined(username) => Ok(()), Action::Leave => Err(ActionError::NotAuthorised), diff --git a/src/gamestate.rs b/src/gamestate.rs index c1d1350..b767611 100644 --- a/src/gamestate.rs +++ b/src/gamestate.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use crate::game::{Action, UserAction}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -38,6 +40,18 @@ impl GameState { logged_in } + pub fn players(&self) -> HashSet<&str> { + let mut players = HashSet::new(); + for action in &self.actions { + match action.action { + Action::Join{..} => players.insert(&*action.username), + Action::Leave => players.remove(&*action.username), + _ => continue, + }; + } + players + } + pub fn num_players(&self) -> u32 { let mut num_players = 0; for action in &self.actions { diff --git a/src/main.rs b/src/main.rs index 25386be..4fe04b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; -use std::pin::Pin; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::mem::swap; -use async_std::prelude::*; -use futures::{future::select, future::Either, pin_mut, Stream, StreamExt, stream}; +use futures::{channel::mpsc::{channel, Receiver}, future::{Either, select}, select, FutureExt, SinkExt, StreamExt, pin_mut, stream::{self, FuturesUnordered, pending}}; +use redis::{aio::PubSub, Client, Msg}; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; use tide::{Body, Request, Error, Response, StatusCode}; @@ -21,16 +23,127 @@ mod gamestate; mod server; use crate::api::ServerMessage; -use crate::server::ServerState; -use crate::client::{ClientUpdate, ConnectionState}; +use crate::server::{ClientInterestSender, Server, ServerState}; +use crate::client::{ClientInterest, ConnectionState}; + +pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { + #[derive(Debug)] + struct Client { + sender: ClientInterestSender, + interests: HashSet, + } + debug!("handle_client_interest: init"); + let mut clients: Vec = Vec::new(); + loop { + enum Action { + AddClient { sender: ClientInterestSender }, + RegisterInterest { index: usize, client_interests: HashSet }, + SendInterest { interest: Msg }, + RemoveClient { index: usize }, + NoNewClients, + ConnectionClosed, + } + match { + let mut connection_on_message = connection.on_message(); + let mut next_interest = connection_on_message.next().fuse(); + let mut next_client = new_clients.next().fuse(); + let mut next_client_interest = FuturesUnordered::new(); + for (index, Client{ref mut sender, ..}) in clients.iter_mut().enumerate() { + next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest))); + } + let mut next_client_interest = next_client_interest.select_next_some(); + let mut action = Action::ConnectionClosed; + select! { + interest = next_interest => { + if let Some(interest) = interest { + action = Action::SendInterest{interest}; + } else { + action = Action::ConnectionClosed; + } + } + sender = next_client => { + if let Some(sender) = sender { + action = Action::AddClient{sender}; + } else { + action = Action::NoNewClients; + } + } + client_interests = next_client_interest => { + match client_interests { + (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests}, + (index, None) => action = Action::RemoveClient{index}, + } + } + } + action + } { + Action::AddClient{sender} => { + debug!("handle_client_interest: Action::AddClient {{ sender: {:?} }}", sender); + clients.push(Client{sender, interests: HashSet::new()}); + } + Action::RegisterInterest{index, mut client_interests} => { + debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index); + let interests_prior: HashSet = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect(); + swap(&mut clients[index].interests, &mut client_interests); + let interests_after: HashSet = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect(); + for interest in &interests_after - &interests_prior { + debug!("handle_client_interest: Subscribing to {:?}", interest); + if let Err(err) = connection.subscribe(interest).await { + error!("handle_client_interest: Subscribe failed: {}", err); + } + } + for interest in &interests_prior - &interests_after { + debug!("handle_client_interest: Unsubscribing from {:?}", interest); + if let Err(err) = connection.unsubscribe(interest).await { + error!("handle_client_interest: Unsubscribe failed: {}", err); + } + } + } + Action::SendInterest{interest} => { + debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest); + if let Ok(interest) = TryFrom::try_from(interest) { + for Client{sender, interests} in &mut clients { + if interests.contains(&interest) { + debug!("handle_client_interest: Sending {:?} to {:?}", interest, sender); + if let Err(err) = sender.interest.send(interest.clone()).await { + error!("handle_client_interest: Send failed: {}", err); + } + } + } + } + } + Action::RemoveClient{index} => { + debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index); + let interests_prior: HashSet = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect(); + clients.remove(index); + let interests_after: HashSet = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect(); + for interest in &interests_prior - &interests_after { + debug!("handle_client_interest: Unsubscribing from {:?}", interest); + if let Err(err) = connection.unsubscribe(interest).await { + error!("handle_client_interest: Unsubscribe failed: {}", err); + } + } + } + Action::NoNewClients => { + debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients); + if clients.is_empty() { + return Ok(()); + } + } + Action::ConnectionClosed => { + debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients); + return Ok(()); + } + } + } +} -pub async fn handle_websocket_request(request: Request, mut stream: WebSocketConnection) -> Result<(), Error> { - let mut client = ConnectionState::new(request.state().clone()); - let update_stream: Pin + Send>>; - update_stream = stream::empty().boxed(); // TODO +pub async fn handle_websocket_request(request: Request, stream: WebSocketConnection) -> Result<(), Error> { + let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await; + let mut client = ConnectionState::new(server_state); let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right)); while let Some(message) = combined.next().await { - let message: Either, ClientUpdate> = message; + let message: Either, ClientInterest> = message; match message { Either::Left(Ok(Message::Text(input))) => { let response = match serde_json::from_str(&input) { @@ -40,15 +153,23 @@ pub async fn handle_websocket_request(request: Request, mut stream: stream.send_json(&response).await?; } Either::Right(update) => { + debug!("client received update: {:?}", update); let mut responses = client.retrieve_updates(update).await; while let Some(response) = responses.next().await { stream.send_json(&response).await? } } - Either::Left(Ok(_)) => { - error!("Websocket received non-text"); + Either::Left(Ok(Message::Close(_))) => { + info!("Websocket closed"); + break; + } + Either::Left(Ok(Message::Binary(_))) => { + error!("Websocket received unexpected binary data"); break; } + Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => { + continue; + } Either::Left(Err(err)) => { error!("Websocket error: {}", err); break; @@ -79,7 +200,13 @@ async fn serve_404(response: Response) -> Result { async fn main() -> Result<(), Error> { env_logger::init(); - let mut app = tide::with_state(ServerState::new("redis://localhost/").await?); + const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128; + let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER); + + let client = Client::open("redis://localhost/")?; + let connection = client.get_multiplexed_async_std_connection().await?; + let pubsub = client.get_async_std_connection().await?.into_pubsub(); + let mut app = tide::with_state(Server::new(connection, register_update_stream_tx)); app.at("/").serve_dir("site/")?; app.at("/").serve_file("site/index.html")?; @@ -89,15 +216,17 @@ async fn main() -> Result<(), Error> { let signals = Signals::new(&[SIGINT])?; let signal_handler = handle_signals(signals); + let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx); + let listener = TlsListener::build() .addrs("localhost:4433") .cert("cert/cert.pem") .key("cert/key.pem"); let app = app.listen(listener); - pin_mut!(signal_handler, app); + pin_mut!(app, handle_client_interest, signal_handler); - select(app, signal_handler).await.factor_first().0?; + select(select(app, handle_client_interest).map(|f| f.factor_first().0), signal_handler).await.factor_first().0?; info!("Pokerwave shut down gracefully."); diff --git a/src/server.rs b/src/server.rs index 54d5f03..155c0c0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,14 +1,94 @@ -use futures::Stream; -use redis::{AsyncCommands, Client, ErrorKind, FromRedisValue, IntoConnectionInfo, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection}; +use std::collections::HashSet; +use std::convert::TryFrom; + +use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt}; +use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection}; use serde::{Serialize, Deserialize}; use crate::auth::Auth; +use crate::client::ClientInterest; use crate::game::{Game, GameSettings, GameSummary, UserAction}; #[derive(Clone)] +pub struct Server { + redis: MultiplexedConnection, + register_update_stream: Sender, +} + +#[derive(Debug)] +pub struct ClientInterestSender { + pub register_interest: Receiver>, + pub interest: Sender, +} + +#[derive(Debug)] +pub struct ClientInterestReceiver { + pub register_interest: Sender>, + pub interest: Receiver, +} + +fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) { + const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128; + const INTEREST_CHANNEL_BUFFER: usize = 1024; + let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER); + let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER); + let tx = ClientInterestSender{register_interest: register_interest_rx, interest: interest_tx}; + let rx = ClientInterestReceiver{register_interest: register_interest_tx, interest: interest_rx}; + (tx, rx) +} + +pub enum ClientInterestFromMsgError { + InvalidChannelName{channel_name: String}, +} + +impl TryFrom for ClientInterest { + type Error = ClientInterestFromMsgError; + fn try_from(msg: Msg) -> Result { + let channel_name = msg.get_channel_name(); + if channel_name == "__keyspace@0__:games" { + Ok(ClientInterest::GameList) + } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") { + Ok(ClientInterest::User{username: username.to_string()}) + } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").map(str::parse) { + Ok(ClientInterest::Game{id}) + } else { + Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}) + } + } +} + +impl ToRedisArgs for ClientInterest { + fn write_redis_args(&self, out: &mut W) + where W: ?Sized + RedisWrite + { + match self { + ClientInterest::GameList => out.write_arg(b"__keyspace@0__:games"), + ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}", id)), + ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)), + } + } +} + +impl Server { + pub fn new(redis: MultiplexedConnection, register_update_stream: Sender) -> Self { + Self { + redis, + register_update_stream + } + } + + pub async fn new_state(&self) -> (ServerState, Receiver) { + let (client_interest_sender, ClientInterestReceiver{register_interest, interest}) = client_interest_channel(); + if let Err(err) = self.register_update_stream.clone().send(client_interest_sender).await { + error!("new_state: Send failed: {}", err); + } + (ServerState{redis: self.redis.clone(), register_interest}, interest) + } +} + pub struct ServerState { - client: Client, redis: MultiplexedConnection, + register_interest: Sender>, } fn user_key(username: &str) -> String { @@ -33,18 +113,12 @@ const TAKE_ACTION_LUA_SCRIPT: &'static str = r#" "#; impl ServerState { - pub async fn new(params: T) -> RedisResult { - let client = Client::open(params)?; - let redis = client.get_multiplexed_async_std_connection().await?; - Ok(Self{client, redis}) - } - pub async fn create_user(&mut self, username: &str, auth: Auth, nickname: &str) -> RedisResult<()> { let key = user_key(username); if self.redis.hset_nx::<_, _, _, i32>(&key, "auth", AsJson(auth)).await? == 0 { return Err(RedisError::from((ErrorKind::ResponseError, "User already exists"))); } - self.redis.hset(&key, "nickname", username).await + self.redis.hset(&key, "nickname", nickname).await } pub async fn set_user_auth(&mut self, username: &str, auth: Auth) -> RedisResult<()> { @@ -93,7 +167,7 @@ impl ServerState { let actions: Vec> = self.redis.lrange(&key, i, i + GAME_ACTION_BLOCK_SIZE).await?; if actions.is_empty() { break; } for AsJson(action) in actions { - game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action")))?; + game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action", err.to_string())))?; } } Ok(()) @@ -111,6 +185,12 @@ impl ServerState { debug!("redis: executing: EVAL {:?} 1 {:?} {} {:?}", TAKE_ACTION_LUA_SCRIPT, key, len, serde_json::to_string(&action).unwrap()); redis::cmd("EVAL").arg(TAKE_ACTION_LUA_SCRIPT).arg(1).arg(key).arg(len).arg(AsJson(action)).query_async(&mut self.redis).await } + + pub async fn register_interests(&mut self, interests: HashSet) { + if let Err(err) = self.register_interest.send(interests).await { + error!("register_interests: could not register interest: {}", err); + } + } } pub enum ActionStatus { -- 2.34.1