From 7450cedce58c0ffec0f11fca3d6fb826caca716e Mon Sep 17 00:00:00 2001 From: Geoffrey Allott Date: Wed, 3 Mar 2021 19:46:14 +0000 Subject: [PATCH] coalesce and deduplicate incoming client interest streams --- site/modules/poker.js | 1 + src/client.rs | 4 ++++ src/dealer.rs | 6 +++--- src/game/action.rs | 1 + src/game/poker/holdem.rs | 4 ++-- src/game/whist.rs | 4 ++-- src/main.rs | 16 ++++++++++------ 7 files changed, 23 insertions(+), 13 deletions(-) diff --git a/site/modules/poker.js b/site/modules/poker.js index 68892cc..08fda65 100644 --- a/site/modules/poker.js +++ b/site/modules/poker.js @@ -318,6 +318,7 @@ export class TexasHoldEm { this.redraw_players(); break; case "Fold": + case "TimeoutFold": for (const card of this.hands.get(user_action.username)) { this.svg.removeChild(card.image); } diff --git a/src/client.rs b/src/client.rs index 2ff9f91..ac1cec4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,6 +30,10 @@ pub enum ClientInterest { Timeout { id: i64 }, } +impl ClientInterest { + pub const STREAM_COALESCE_MAX: usize = 16; +} + #[derive(Debug, Clone)] pub enum LoggedInState { Idle, diff --git a/src/dealer.rs b/src/dealer.rs index ad4e42c..20f3570 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; -use async_std::stream::StreamExt; -use futures::channel::mpsc::Receiver; +use futures::{channel::mpsc::Receiver, StreamExt}; use redis::{ErrorKind, RedisError, RedisResult}; use crate::client::ClientInterest; @@ -37,7 +36,8 @@ impl Dealer { Ok(dealer) } - pub async fn start(mut self, mut update_stream: Receiver) { + pub async fn start(mut self, update_stream: Receiver) { + let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX); while let Some(_) = update_stream.next().await { match self.retrieve_updates().await { Ok(Termination::Continue) => continue, diff --git a/src/game/action.rs b/src/game/action.rs index e7a9753..c149934 100644 --- a/src/game/action.rs +++ b/src/game/action.rs @@ -47,6 +47,7 @@ pub enum Action { PlayCard { card: Card }, ChooseTrumps { suit: Suit }, Fold, + TimeoutFold, Bet { chips: u64 }, PostBlind { chips: u64 }, WinTrick, diff --git a/src/game/poker/holdem.rs b/src/game/poker/holdem.rs index 22ad26e..4ccd429 100644 --- a/src/game/poker/holdem.rs +++ b/src/game/poker/holdem.rs @@ -289,7 +289,7 @@ impl Game for TexasHoldEm { } Ok(()) } - (_, Action::Fold) => { + (_, Action::Fold) | (_, Action::TimeoutFold) => { self.pot += *self.bets.entry(username).or_default(); self.bets.remove(&username); self.players.remove(&username); @@ -469,7 +469,7 @@ impl Game for TexasHoldEm { let timeout_time = last_time.plus_millis(timeout); if timestamp >= timeout_time { DealerAction::TakeAction( - ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold}) + ValidatedUserAction(UserAction{timestamp, username, action: Action::TimeoutFold}) ) } else { DealerAction::WaitUntil(timeout_time) diff --git a/src/game/whist.rs b/src/game/whist.rs index 80c800c..ea3ac4d 100644 --- a/src/game/whist.rs +++ b/src/game/whist.rs @@ -116,7 +116,7 @@ impl Game for KnockOutWhist { fn validate_action(&self, UserAction{timestamp, username, action}: UserAction) -> Result { match (self.state, action) { - (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => { + (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => { Err(ActionError::InvalidActionForGameType) } (State::NotStarted, Action::Join{seat, ..}) => { @@ -169,7 +169,7 @@ impl Game for KnockOutWhist { self.actions_len += 1; self.rng.advance(); match (self.state, action) { - (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => { + (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => { Err(ActionError::InvalidActionForGameType) } (State::NotStarted, Action::Join{seat, ..}) => { diff --git a/src/main.rs b/src/main.rs index f4d2b0e..83d0fc5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -193,9 +193,10 @@ async fn handle_new_games(server: Server) -> Result<(), std::io::Error> { pub async fn handle_websocket_request(request: Request, stream: WebSocketConnection) -> Result<(), Error> { let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await; let mut client = ConnectionState::new(server_state); + let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX); let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right)); while let Some(message) = combined.next().await { - let message: Either, ClientInterest> = message; + let message: Either, Vec> = message; match message { Either::Left(Ok(Message::Text(input))) => { let response = match serde_json::from_str(&input) { @@ -204,11 +205,14 @@ pub async fn handle_websocket_request(request: Request, stream: WebSocke }; stream.send_json(&response).await?; } - Either::Right(update) => { - debug!("client received update: {:?}", update); - let mut responses = client.retrieve_updates(update).await; - while let Some(response) = responses.next().await { - stream.send_json(&response).await? + Either::Right(mut updates) => { + updates.dedup(); + for update in updates { + debug!("client received update: {:?}", update); + let mut responses = client.retrieve_updates(update).await; + while let Some(response) = responses.next().await { + stream.send_json(&response).await? + } } } Either::Left(Ok(Message::Close(_))) => { -- 2.34.1