coalesce and deduplicate incoming client interest streams
authorGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 19:46:14 +0000 (19:46 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 19:46:14 +0000 (19:46 +0000)
site/modules/poker.js
src/client.rs
src/dealer.rs
src/game/action.rs
src/game/poker/holdem.rs
src/game/whist.rs
src/main.rs

index 68892cc15ce84dde6d96669fe64789d330f76cb7..08fda651dd5f3c8977b6a8a15f339ecdcacda888 100644 (file)
@@ -318,6 +318,7 @@ export class TexasHoldEm {
                 this.redraw_players();
                 break;
             case "Fold":
+            case "TimeoutFold":
                 for (const card of this.hands.get(user_action.username)) {
                     this.svg.removeChild(card.image);
                 }
index 2ff9f91c4d57a2bf00ba65356b24ce4836977e24..ac1cec4a52f964d8d7e47769e16fd0c700c3525b 100644 (file)
@@ -30,6 +30,10 @@ pub enum ClientInterest {
     Timeout { id: i64 },
 }
 
+impl ClientInterest {
+    pub const STREAM_COALESCE_MAX: usize = 16;
+}
+
 #[derive(Debug, Clone)]
 pub enum LoggedInState {
     Idle,
index ad4e42caf6a31d83166ef1d1c755a6503fe4ff08..20f3570e11b738470d4f0a7a48fed4121d3ca24d 100644 (file)
@@ -1,7 +1,6 @@
 use std::collections::HashSet;
 
-use async_std::stream::StreamExt;
-use futures::channel::mpsc::Receiver;
+use futures::{channel::mpsc::Receiver, StreamExt};
 use redis::{ErrorKind, RedisError, RedisResult};
 
 use crate::client::ClientInterest;
@@ -37,7 +36,8 @@ impl Dealer {
         Ok(dealer)
     }
 
-    pub async fn start(mut self, mut update_stream: Receiver<ClientInterest>) {
+    pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
+        let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
         while let Some(_) = update_stream.next().await {
             match self.retrieve_updates().await {
                 Ok(Termination::Continue) => continue,
index e7a97537bd927d94d882b145ce487260b7206cf2..c149934acce48cbb9f67acc2e601cd6b2f51aece 100644 (file)
@@ -47,6 +47,7 @@ pub enum Action {
     PlayCard { card: Card },
     ChooseTrumps { suit: Suit },
     Fold,
+    TimeoutFold,
     Bet { chips: u64 },
     PostBlind { chips: u64 },
     WinTrick,
index 22ad26e0cf83e45c1558fc8ec38be3dee66eccba..4ccd4290e21e002267a69d3799b17d4d13b94064 100644 (file)
@@ -289,7 +289,7 @@ impl Game for TexasHoldEm {
                 }
                 Ok(())
             }
-            (_, Action::Fold) => {
+            (_, Action::Fold) | (_, Action::TimeoutFold) => {
                 self.pot += *self.bets.entry(username).or_default();
                 self.bets.remove(&username);
                 self.players.remove(&username);
@@ -469,7 +469,7 @@ impl Game for TexasHoldEm {
                     let timeout_time = last_time.plus_millis(timeout);
                     if timestamp >= timeout_time {
                         DealerAction::TakeAction(
-                            ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold})
+                            ValidatedUserAction(UserAction{timestamp, username, action: Action::TimeoutFold})
                         )
                     } else {
                         DealerAction::WaitUntil(timeout_time)
index 80c800c3046debb66fabc4fba3481c66543b0b80..ea3ac4d93bce4a739d154da1aabe4d1be5f43c13 100644 (file)
@@ -116,7 +116,7 @@ impl Game for KnockOutWhist {
 
     fn validate_action(&self, UserAction{timestamp, username, action}: UserAction) -> Result<ValidatedUserAction, ActionError> {
         match (self.state, action) {
-            (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => {
+            (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => {
                 Err(ActionError::InvalidActionForGameType)
             }
             (State::NotStarted, Action::Join{seat, ..}) => {
@@ -169,7 +169,7 @@ impl Game for KnockOutWhist {
         self.actions_len += 1;
         self.rng.advance();
         match (self.state, action) {
-            (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => {
+            (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => {
                 Err(ActionError::InvalidActionForGameType)
             }
             (State::NotStarted, Action::Join{seat, ..}) => {
index f4d2b0e92f5c2444dbc4fcc1ac928e389a7a5714..83d0fc52235bf7bd30beccc523c3cf49429c2244 100644 (file)
@@ -193,9 +193,10 @@ async fn handle_new_games(server: Server) -> Result<(), std::io::Error> {
 pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
     let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
     let mut client = ConnectionState::new(server_state);
+    let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
     let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
     while let Some(message) = combined.next().await {
-        let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
+        let message: Either<Result<Message, tide_websockets::Error>, Vec<ClientInterest>> = message;
         match message {
             Either::Left(Ok(Message::Text(input))) => {
                 let response = match serde_json::from_str(&input) {
@@ -204,11 +205,14 @@ pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocke
                 };
                 stream.send_json(&response).await?;
             }
-            Either::Right(update) => {
-                debug!("client received update: {:?}", update);
-                let mut responses = client.retrieve_updates(update).await;
-                while let Some(response) = responses.next().await {
-                    stream.send_json(&response).await?
+            Either::Right(mut updates) => {
+                updates.dedup();
+                for update in updates {
+                    debug!("client received update: {:?}", update);
+                    let mut responses = client.retrieve_updates(update).await;
+                    while let Some(response) = responses.next().await {
+                        stream.send_json(&response).await?
+                    }
                 }
             }
             Either::Left(Ok(Message::Close(_))) => {