LeaveLobbySuccess,
LeaveLobbyFailure { reason: String },
ProtocolError { reason: String },
+ InternalError { reason: String },
}
use std::collections::HashSet;
-use std::time::SystemTime;
use futures::stream::{Stream, StreamExt, empty, iter, once};
GameList,
Game { id: i64 },
User { username: Username },
- Timeout { timestamp: i64 },
+ Timeout { id: i64 },
}
#[derive(Debug, Clone)]
}
}
(&mut ClientState::LoggedIn{username, state: LoggedInState::InGame{ref mut game}}, ClientMessage::TakeAction{action}) => {
- let timestamp = SystemTime::now(); // TODO use time from db?
+ let timestamp = match self.server.now().await {
+ Ok(timestamp) => timestamp,
+ Err(err) => return ServerMessage::InternalError{reason: err.to_string()},
+ };
let action = UserAction{timestamp, username, action};
let id = game.id();
loop {
use std::collections::HashSet;
-use std::time::SystemTime;
use async_std::stream::StreamExt;
use futures::channel::mpsc::Receiver;
use crate::client::ClientInterest;
use crate::game::{Game, DealerAction, ValidatedUserAction};
use crate::server::{ActionStatus, ServerState};
-use crate::util::millis::to_millis;
pub struct Dealer {
server: ServerState,
pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
let mut interests = HashSet::new();
interests.insert(ClientInterest::Game{id});
+ interests.insert(ClientInterest::Timeout{id});
server.register_interests(interests).await;
let summary = server.game_summary(id).await?;
let seed = server.game_seed(id).await?;
debug!("Dealer: Game state: {:#?}", self.dealer.game);
}
'take_action: loop {
- let timestamp = SystemTime::now();
+ let timestamp = self.server.now().await?;
match self.dealer.game.next_dealer_action(timestamp) {
DealerAction::TakeAction(action) => match self.take_action(action).await {
Ok(ActionStatus::Committed) => {
Err(err) => return Err(err),
},
DealerAction::WaitUntil(timestamp) => {
- let timestamp = to_millis(×tamp);
- self.server.create_timeout(timestamp).await?;
- let mut interests = HashSet::new();
- interests.insert(ClientInterest::Game{id});
- interests.insert(ClientInterest::Timeout{timestamp});
- self.server.register_interests(interests).await;
+ self.server.create_timeout(id, timestamp).await?;
return Ok(Termination::Continue);
}
DealerAction::WaitForPlayer => return Ok(Termination::Continue),
use std::cmp::PartialEq;
use std::fmt::{Debug, Display, Formatter};
-use std::time::SystemTime;
use crate::card::{Card, Suit};
use crate::username::Username;
+use crate::util::timestamp::Timestamp;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct UserAction {
- #[serde(with = "crate::util::millis")]
- pub timestamp: SystemTime,
+ pub timestamp: Timestamp,
pub username: Username,
pub action: Action,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DealerAction {
TakeAction(ValidatedUserAction),
- WaitUntil(SystemTime),
+ WaitUntil(Timestamp),
WaitForPlayer,
Leave,
}
use std::collections::HashSet;
-use std::time::SystemTime;
use crate::username::{DEALER, Username};
use crate::game::{Action, ActionError, DealerAction};
+use crate::util::timestamp::Timestamp;
use super::{Game, UserAction};
use super::ValidatedUserAction;
#[derive(Debug, Clone)]
struct ChatroomUserAction {
- timestamp: SystemTime,
+ timestamp: Timestamp,
username: Username,
action: ChatroomAction,
}
}
}
- fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+ fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
match self.messages.len() {
n if n % 10 == 0 => DealerAction::TakeAction(
ValidatedUserAction(UserAction{
use std::collections::HashSet;
use std::fmt::Debug;
-use std::time::SystemTime;
use crate::rng::Seed;
use crate::username::Username;
+use crate::util::timestamp::Timestamp;
use self::chatroom::{Chatroom, ChatroomSettings};
use self::whist::{KnockOutWhist, KnockOutWhistSettings};
fn actions_len(&self) -> usize;
fn validate_action(&self, action: UserAction) -> Result<ValidatedUserAction, ActionError>;
fn take_action(&mut self, action: ValidatedUserAction) -> Result<(), ActionError>;
- fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction;
+ fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction;
}
pub trait CloneBoxGame {
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
-use std::time::{Duration, SystemTime};
use itertools::Itertools;
use crate::card::{Card, FIFTY_TWO_CARD_DECK};
use crate::seats::Seats;
-use crate::username::Username;
-use crate::util::max::IteratorMaxItems;
use crate::rng::{Seed, WaveRng};
+use crate::username::Username;
+use crate::util::{max::IteratorMaxItems, timestamp::Timestamp};
use super::super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction};
max_players: u32,
small_blind: u64,
starting_stack: u64,
- action_timeout_secs: Option<u64>,
+ action_timeout: Option<i64>,
}
#[derive(Clone, Debug)]
settings: TexasHoldEmSettings,
rng: WaveRng,
actions_len: usize,
- last_action_time: Option<SystemTime>,
+ last_action_time: Option<Timestamp>,
state: State,
seats: Seats,
stacks: HashMap<Username, u64>,
}
}
- fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+ fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
let mut rng = self.rng.clone();
match self.state {
State::NotStarted => {
error!("Logic error: no dealer could be chosen: {:#?}", self);
DealerAction::Leave
}
- } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout_secs) {
- let timeout_time = last_time + Duration::from_secs(timeout);
+ } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout) {
+ let timeout_time = last_time.plus_millis(timeout);
if timestamp >= timeout_time {
DealerAction::TakeAction(
ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold})
mod tests {
use super::*;
- use std::time::SystemTime;
-
fn test_game(actions: Vec<UserAction>, settings: TexasHoldEmSettings, seed: Seed) {
let mut game = TexasHoldEm::new(0, settings, seed);
for action in actions {
game.take_action(validated).unwrap();
}
_ => {
- let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH);
+ let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME);
if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action {
assert_eq!(action, dealer_action);
game.take_action(ValidatedUserAction(action)).unwrap();
]"#;
let actions = serde_json::from_str(actions).unwrap();
- let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#;
+ let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#;
let settings = serde_json::from_str(settings).unwrap();
let seed = r#"{"rng":"ChaCha20","seed":"e0355d5c6c63ef757d1b874b0392a3deec73cadfb0a2aa7947a04db651bf9269"}"#;
]"#;
let actions = serde_json::from_str(actions).unwrap();
- let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout_secs":null}"#;
+ let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout":null}"#;
let settings = serde_json::from_str(settings).unwrap();
let seed = r#"{"rng":"ChaCha20","seed":"f05dc83bdce966e72a3a81b19ccded2e70387eb68deacf60ed8de1ee78b9ff0e"}"#;
]"#;
let actions = serde_json::from_str(actions).unwrap();
- let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#;
+ let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#;
let settings = serde_json::from_str(settings).unwrap();
let seed = r#"{"rng":"ChaCha20","seed":"fd87ec4b51fcaf056ef53c0460322e1fa5261cf2801d005065c9add8ec541bb4"}"#;
use std::collections::{HashMap, HashSet};
-use std::time::SystemTime;
use crate::card::{Card, Suit, FIFTY_TWO_CARD_DECK};
use crate::rng::{Seed, WaveRng};
use crate::seats::Seats;
use crate::username::Username;
-use crate::util::max::IteratorMaxItems;
+use crate::util::{max::IteratorMaxItems, timestamp::Timestamp};
use super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction};
}
}
- fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+ fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
let mut rng = self.rng.clone();
match self.state {
State::NotStarted => {
mod tests {
use super::*;
- use std::time::SystemTime;
-
fn test_game(actions: Vec<UserAction>, settings: KnockOutWhistSettings, seed: Seed) {
let mut game = KnockOutWhist::new(0, settings, seed);
for action in actions {
game.take_action(validated).unwrap();
}
_ => {
- let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH);
+ let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME);
if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action {
assert_eq!(action, dealer_action);
game.take_action(ValidatedUserAction(action)).unwrap();
use crate::client::{ClientInterest, ConnectionState};
use crate::dealer::Dealer;
use crate::game::GameList;
-use crate::server::{ClientInterestSender, Server, ServerState};
+use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
#[derive(Debug)]
}
Action::SendInterest{interest} => {
debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
- if let Ok(interest) = TryFrom::try_from(interest) {
- for (index, Client{sender, interests}) in clients.iter_mut().enumerate() {
- if interests.contains(&interest) {
- debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
- if let Err(err) = sender.interest.send(interest.clone()).await {
- error!("handle_client_interest: Send failed: {}", err);
+ match TryFrom::try_from(interest) {
+ Ok(interest) => {
+ for (index, Client{sender, interests}) in clients.iter_mut().enumerate() {
+ if interests.contains(&interest) {
+ debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
+ if let Err(err) = sender.interest.send(interest.clone()).await {
+ error!("handle_client_interest: Send failed: {}", err);
+ }
}
}
+ },
+ Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {},
+ Err(ClientInterestFromMsgError::InvalidChannelName{channel_name}) => {
+ error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
}
- } else {
- error!("handle_client_interest: Failed to convert interest to ClientInterest");
}
}
Action::RemoveClient{index} => {
use crate::game::{GameSettings, GameSummary, ValidatedUserAction};
use crate::rng::Seed;
use crate::username::Username;
+use crate::util::timestamp::Timestamp;
#[derive(Clone)]
pub struct Server {
(tx, rx)
}
+#[derive(Debug)]
pub enum ClientInterestFromMsgError {
InvalidChannelName{channel_name: String},
- UsernameParseError(&'static str),
+ TimeoutMessageWasNotExpired,
}
impl TryFrom<Msg> for ClientInterest {
if channel_name == "__keyspace@0__:game:list" {
Ok(ClientInterest::GameList)
} else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
- username.parse().map_err(ClientInterestFromMsgError::UsernameParseError)
- .map(|username| ClientInterest::User{username})
- } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) {
- Ok(ClientInterest::Game{id})
- } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) {
- Ok(ClientInterest::Timeout{timestamp})
+ match username.parse() {
+ Ok(username) => Ok(ClientInterest::User{username}),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+ }
+ } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
+ if let Some(id) = key.strip_suffix(":actions") {
+ match id.parse() {
+ Ok(id) => Ok(ClientInterest::Game{id}),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+ }
+ } else if let Some(id) = key.strip_suffix(":timeout") {
+ match msg.get_payload::<String>() {
+ Ok(str) if str == "expired" => match id.parse() {
+ Ok(id) => Ok(ClientInterest::Timeout{id}),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+ },
+ _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
+ }
+ } else {
+ Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
+ }
} else {
Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
}
ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
- ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)),
+ ClientInterest::Timeout{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
}
}
}
format!("game:{}:actions", id)
}
-fn timeout_key(timestamp: i64) -> String {
- format!("timeout:{}", timestamp)
+fn timeout_key(id: i64) -> String {
+ format!("game:{}:timeout", id)
}
impl ServerState {
self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await
}
- pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> {
- let key = timeout_key(timestamp);
- debug!("Creating timeout {}", key);
- cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await
+ pub async fn now(&mut self) -> RedisResult<Timestamp> {
+ cmd("TIME").query_async(&mut self.redis).await
+ }
+
+ pub async fn create_timeout(&mut self, id: i64, timestamp: Timestamp) -> RedisResult<()> {
+ let key = timeout_key(id);
+ debug!("Creating timeout {} at {}", key, timestamp);
+ cmd("SET").arg(key).arg(timestamp).arg("PXAT").arg(timestamp).query_async(&mut self.redis).await
}
pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
true
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use redis::{Msg, Value};
+
+ #[test]
+ fn convert_channel_names_to_interest() {
+ let msg = Value::Bulk(vec![
+ Value::Data(b"message".to_vec()),
+ Value::Data(b"__keyspace@0__:game:list".to_vec()),
+ Value::Data(b"expire".to_vec()),
+ ]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::GameList, interest);
+
+ let msg = Value::Bulk(vec![
+ Value::Data(b"message".to_vec()),
+ Value::Data(b"__keyspace@0__:user:player1".to_vec()),
+ Value::Data(b"set".to_vec()),
+ ]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::User{username: "player1".parse().unwrap()}, interest);
+
+ let msg = Value::Bulk(vec![
+ Value::Data(b"message".to_vec()),
+ Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()),
+ Value::Data(b"rpush".to_vec()),
+ ]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::Game{id: 12345}, interest);
+
+ let msg = Value::Bulk(vec![
+ Value::Data(b"message".to_vec()),
+ Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()),
+ Value::Data(b"expired".to_vec()),
+ ]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::Timeout{id: 12345}, interest);
+ }
+
+ #[test]
+ fn ignores_timeout_key_set() {
+ let msg = Value::Bulk(vec![
+ Value::Data(b"message".to_vec()),
+ Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()),
+ Value::Data(b"set".to_vec()),
+ ]);
+ let msg = Msg::from_value(&msg).unwrap();
+ assert!(ClientInterest::try_from(msg).is_err());
+ }
+}
+++ /dev/null
-use std::fmt;
-use std::time::{Duration, SystemTime};
-
-use serde::{Serializer, Deserializer, de::{self, Visitor}};
-
-pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
-where
- D: Deserializer<'de>
-{
- struct SystemTimeVisitor;
-
- impl<'de> Visitor<'de> for SystemTimeVisitor {
- type Value = SystemTime;
-
- fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
- formatter.write_str("a timestamp in milliseconds since the unix epoch")
- }
-
- fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
- where
- E: de::Error
- {
- Ok(from_millis(value))
- }
-
- fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
- where
- E: de::Error
- {
- Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value))
- }
- }
-
- deserializer.deserialize_i64(SystemTimeVisitor)
-}
-
-pub fn serialize<S>(timestamp: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
-where
- S: Serializer
-{
- serializer.serialize_i64(to_millis(timestamp))
-}
-
-pub fn to_millis(timestamp: &SystemTime) -> i64 {
- match timestamp.duration_since(SystemTime::UNIX_EPOCH) {
- Ok(duration) => {
- duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64
- }
- Err(err) => {
- let duration = err.duration();
- -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64)
- }
- }
-}
-
-pub fn from_millis(millis: i64) -> SystemTime {
- if millis >= 0 {
- SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64)
- } else {
- SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64)
- }
-}
pub mod max;
-pub mod millis;
+pub mod timestamp;
--- /dev/null
+use std::fmt::{self, Display, Formatter};
+
+use redis::{FromRedisValue, Value, RedisResult, ToRedisArgs, RedisWrite, NumericBehavior, RedisError, ErrorKind};
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
+#[serde(transparent)]
+pub struct Timestamp(i64);
+
+impl Timestamp {
+ #[cfg(test)]
+ pub const TEST_TIME: Timestamp = Timestamp(0);
+
+ pub fn plus_millis(self, millis: i64) -> Self {
+ Timestamp(self.0.saturating_add(millis))
+ }
+}
+
+impl Display for Timestamp {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+impl FromRedisValue for Timestamp {
+ fn from_redis_value(value: &Value) -> RedisResult<Self> {
+ match value {
+ Value::Bulk(values) => match &**values {
+ [secs, micros] => {
+ let secs = i64::from_redis_value(secs)?;
+ let micros = i64::from_redis_value(micros)?;
+ Ok(Timestamp(secs * 1000 + micros / 1000))
+ }
+ _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))),
+ }
+ _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))),
+ }
+ }
+}
+
+impl ToRedisArgs for Timestamp {
+ fn write_redis_args<W>(&self, out: &mut W)
+ where W: ?Sized + RedisWrite
+ {
+ self.0.write_redis_args(out)
+ }
+
+ fn describe_numeric_behavior(&self) -> NumericBehavior {
+ NumericBehavior::NumberIsInteger
+ }
+
+ fn is_single_arg(&self) -> bool {
+ true
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn from_redis_time_command() {
+ let value = Value::Bulk(vec![
+ Value::Data(b"1614792013".to_vec()),
+ Value::Data(b"938726".to_vec()),
+ ]);
+ let timestamp = Timestamp::from_redis_value(&value).unwrap();
+ assert_eq!(Timestamp(1614792013938), timestamp);
+ }
+}