this.redraw_players();
break;
case "Fold":
+ case "TimeoutFold":
for (const card of this.hands.get(user_action.username)) {
this.svg.removeChild(card.image);
}
Timeout { id: i64 },
}
+impl ClientInterest {
+ pub const STREAM_COALESCE_MAX: usize = 16;
+}
+
#[derive(Debug, Clone)]
pub enum LoggedInState {
Idle,
use std::collections::HashSet;
-use async_std::stream::StreamExt;
-use futures::channel::mpsc::Receiver;
+use futures::{channel::mpsc::Receiver, StreamExt};
use redis::{ErrorKind, RedisError, RedisResult};
use crate::client::ClientInterest;
Ok(dealer)
}
- pub async fn start(mut self, mut update_stream: Receiver<ClientInterest>) {
+ pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
+ let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
while let Some(_) = update_stream.next().await {
match self.retrieve_updates().await {
Ok(Termination::Continue) => continue,
PlayCard { card: Card },
ChooseTrumps { suit: Suit },
Fold,
+ TimeoutFold,
Bet { chips: u64 },
PostBlind { chips: u64 },
WinTrick,
}
Ok(())
}
- (_, Action::Fold) => {
+ (_, Action::Fold) | (_, Action::TimeoutFold) => {
self.pot += *self.bets.entry(username).or_default();
self.bets.remove(&username);
self.players.remove(&username);
let timeout_time = last_time.plus_millis(timeout);
if timestamp >= timeout_time {
DealerAction::TakeAction(
- ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold})
+ ValidatedUserAction(UserAction{timestamp, username, action: Action::TimeoutFold})
)
} else {
DealerAction::WaitUntil(timeout_time)
fn validate_action(&self, UserAction{timestamp, username, action}: UserAction) -> Result<ValidatedUserAction, ActionError> {
match (self.state, action) {
- (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => {
+ (_, Action::AddOn{..}) | (_, Action::RevealCard{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => {
Err(ActionError::InvalidActionForGameType)
}
(State::NotStarted, Action::Join{seat, ..}) => {
self.actions_len += 1;
self.rng.advance();
match (self.state, action) {
- (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::Bet{..}) => {
+ (_, Action::AddOn{..}) | (_, Action::Fold) | (_, Action::TimeoutFold) | (_, Action::Bet{..}) => {
Err(ActionError::InvalidActionForGameType)
}
(State::NotStarted, Action::Join{seat, ..}) => {
pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
let mut client = ConnectionState::new(server_state);
+ let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
while let Some(message) = combined.next().await {
- let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
+ let message: Either<Result<Message, tide_websockets::Error>, Vec<ClientInterest>> = message;
match message {
Either::Left(Ok(Message::Text(input))) => {
let response = match serde_json::from_str(&input) {
};
stream.send_json(&response).await?;
}
- Either::Right(update) => {
- debug!("client received update: {:?}", update);
- let mut responses = client.retrieve_updates(update).await;
- while let Some(response) = responses.next().await {
- stream.send_json(&response).await?
+ Either::Right(mut updates) => {
+ updates.dedup();
+ for update in updates {
+ debug!("client received update: {:?}", update);
+ let mut responses = client.retrieve_updates(update).await;
+ while let Some(response) = responses.next().await {
+ stream.send_json(&response).await?
+ }
}
}
Either::Left(Ok(Message::Close(_))) => {