From 72e6eee2579130c515496421877b66c00cd6655b Mon Sep 17 00:00:00 2001 From: Geoffrey Allott Date: Wed, 3 Mar 2021 18:24:10 +0000 Subject: [PATCH] fix timestamps to use simple newtype around millis --- src/api.rs | 1 + src/client.rs | 8 +-- src/dealer.rs | 12 ++--- src/game/action.rs | 7 ++- src/game/chatroom.rs | 6 +-- src/game/mod.rs | 4 +- src/game/poker/holdem.rs | 25 ++++----- src/game/whist.rs | 9 ++-- src/main.rs | 22 ++++---- src/server.rs | 106 +++++++++++++++++++++++++++++++++------ src/util/millis.rs | 62 ----------------------- src/util/mod.rs | 2 +- src/util/timestamp.rs | 69 +++++++++++++++++++++++++ 13 files changed, 206 insertions(+), 127 deletions(-) delete mode 100644 src/util/millis.rs create mode 100644 src/util/timestamp.rs diff --git a/src/api.rs b/src/api.rs index 36d3ee3..1f9e581 100644 --- a/src/api.rs +++ b/src/api.rs @@ -57,4 +57,5 @@ pub enum ServerMessage { LeaveLobbySuccess, LeaveLobbyFailure { reason: String }, ProtocolError { reason: String }, + InternalError { reason: String }, } diff --git a/src/client.rs b/src/client.rs index 23a8a44..2ff9f91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; -use std::time::SystemTime; use futures::stream::{Stream, StreamExt, empty, iter, once}; @@ -28,7 +27,7 @@ pub enum ClientInterest { GameList, Game { id: i64 }, User { username: Username }, - Timeout { timestamp: i64 }, + Timeout { id: i64 }, } #[derive(Debug, Clone)] @@ -176,7 +175,10 @@ impl ConnectionState { } } (&mut ClientState::LoggedIn{username, state: LoggedInState::InGame{ref mut game}}, ClientMessage::TakeAction{action}) => { - let timestamp = SystemTime::now(); // TODO use time from db? + let timestamp = match self.server.now().await { + Ok(timestamp) => timestamp, + Err(err) => return ServerMessage::InternalError{reason: err.to_string()}, + }; let action = UserAction{timestamp, username, action}; let id = game.id(); loop { diff --git a/src/dealer.rs b/src/dealer.rs index c9cb93a..ad4e42c 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; -use std::time::SystemTime; use async_std::stream::StreamExt; use futures::channel::mpsc::Receiver; @@ -8,7 +7,6 @@ use redis::{ErrorKind, RedisError, RedisResult}; use crate::client::ClientInterest; use crate::game::{Game, DealerAction, ValidatedUserAction}; use crate::server::{ActionStatus, ServerState}; -use crate::util::millis::to_millis; pub struct Dealer { server: ServerState, @@ -29,6 +27,7 @@ impl Dealer { pub async fn new(mut server: ServerState, id: i64) -> RedisResult { let mut interests = HashSet::new(); interests.insert(ClientInterest::Game{id}); + interests.insert(ClientInterest::Timeout{id}); server.register_interests(interests).await; let summary = server.game_summary(id).await?; let seed = server.game_seed(id).await?; @@ -66,7 +65,7 @@ impl Dealer { debug!("Dealer: Game state: {:#?}", self.dealer.game); } 'take_action: loop { - let timestamp = SystemTime::now(); + let timestamp = self.server.now().await?; match self.dealer.game.next_dealer_action(timestamp) { DealerAction::TakeAction(action) => match self.take_action(action).await { Ok(ActionStatus::Committed) => { @@ -77,12 +76,7 @@ impl Dealer { Err(err) => return Err(err), }, DealerAction::WaitUntil(timestamp) => { - let timestamp = to_millis(×tamp); - self.server.create_timeout(timestamp).await?; - let mut interests = HashSet::new(); - interests.insert(ClientInterest::Game{id}); - interests.insert(ClientInterest::Timeout{timestamp}); - self.server.register_interests(interests).await; + self.server.create_timeout(id, timestamp).await?; return Ok(Termination::Continue); } DealerAction::WaitForPlayer => return Ok(Termination::Continue), diff --git a/src/game/action.rs b/src/game/action.rs index 05a6669..e7a9753 100644 --- a/src/game/action.rs +++ b/src/game/action.rs @@ -1,14 +1,13 @@ use std::cmp::PartialEq; use std::fmt::{Debug, Display, Formatter}; -use std::time::SystemTime; use crate::card::{Card, Suit}; use crate::username::Username; +use crate::util::timestamp::Timestamp; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct UserAction { - #[serde(with = "crate::util::millis")] - pub timestamp: SystemTime, + pub timestamp: Timestamp, pub username: Username, pub action: Action, } @@ -30,7 +29,7 @@ impl ValidatedUserAction { #[derive(Debug, Clone, PartialEq, Eq)] pub enum DealerAction { TakeAction(ValidatedUserAction), - WaitUntil(SystemTime), + WaitUntil(Timestamp), WaitForPlayer, Leave, } diff --git a/src/game/chatroom.rs b/src/game/chatroom.rs index 66934a8..6cd8399 100644 --- a/src/game/chatroom.rs +++ b/src/game/chatroom.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; -use std::time::SystemTime; use crate::username::{DEALER, Username}; use crate::game::{Action, ActionError, DealerAction}; +use crate::util::timestamp::Timestamp; use super::{Game, UserAction}; use super::ValidatedUserAction; @@ -16,7 +16,7 @@ enum ChatroomAction { #[derive(Debug, Clone)] struct ChatroomUserAction { - timestamp: SystemTime, + timestamp: Timestamp, username: Username, action: ChatroomAction, } @@ -90,7 +90,7 @@ impl Game for Chatroom { } } - fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction { + fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction { match self.messages.len() { n if n % 10 == 0 => DealerAction::TakeAction( ValidatedUserAction(UserAction{ diff --git a/src/game/mod.rs b/src/game/mod.rs index e4ad5b1..e7088b7 100644 --- a/src/game/mod.rs +++ b/src/game/mod.rs @@ -5,10 +5,10 @@ mod whist; use std::collections::HashSet; use std::fmt::Debug; -use std::time::SystemTime; use crate::rng::Seed; use crate::username::Username; +use crate::util::timestamp::Timestamp; use self::chatroom::{Chatroom, ChatroomSettings}; use self::whist::{KnockOutWhist, KnockOutWhistSettings}; @@ -22,7 +22,7 @@ pub trait Game : Debug + CloneBoxGame + Send + Sync { fn actions_len(&self) -> usize; fn validate_action(&self, action: UserAction) -> Result; fn take_action(&mut self, action: ValidatedUserAction) -> Result<(), ActionError>; - fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction; + fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction; } pub trait CloneBoxGame { diff --git a/src/game/poker/holdem.rs b/src/game/poker/holdem.rs index 5fdeaf2..22ad26e 100644 --- a/src/game/poker/holdem.rs +++ b/src/game/poker/holdem.rs @@ -1,14 +1,13 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use std::time::{Duration, SystemTime}; use itertools::Itertools; use crate::card::{Card, FIFTY_TWO_CARD_DECK}; use crate::seats::Seats; -use crate::username::Username; -use crate::util::max::IteratorMaxItems; use crate::rng::{Seed, WaveRng}; +use crate::username::Username; +use crate::util::{max::IteratorMaxItems, timestamp::Timestamp}; use super::super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction}; @@ -37,7 +36,7 @@ pub struct TexasHoldEmSettings { max_players: u32, small_blind: u64, starting_stack: u64, - action_timeout_secs: Option, + action_timeout: Option, } #[derive(Clone, Debug)] @@ -46,7 +45,7 @@ pub struct TexasHoldEm { settings: TexasHoldEmSettings, rng: WaveRng, actions_len: usize, - last_action_time: Option, + last_action_time: Option, state: State, seats: Seats, stacks: HashMap, @@ -352,7 +351,7 @@ impl Game for TexasHoldEm { } } - fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction { + fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction { let mut rng = self.rng.clone(); match self.state { State::NotStarted => { @@ -466,8 +465,8 @@ impl Game for TexasHoldEm { error!("Logic error: no dealer could be chosen: {:#?}", self); DealerAction::Leave } - } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout_secs) { - let timeout_time = last_time + Duration::from_secs(timeout); + } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout) { + let timeout_time = last_time.plus_millis(timeout); if timestamp >= timeout_time { DealerAction::TakeAction( ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold}) @@ -550,8 +549,6 @@ impl Game for TexasHoldEm { mod tests { use super::*; - use std::time::SystemTime; - fn test_game(actions: Vec, settings: TexasHoldEmSettings, seed: Seed) { let mut game = TexasHoldEm::new(0, settings, seed); for action in actions { @@ -562,7 +559,7 @@ mod tests { game.take_action(validated).unwrap(); } _ => { - let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH); + let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME); if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action { assert_eq!(action, dealer_action); game.take_action(ValidatedUserAction(action)).unwrap(); @@ -605,7 +602,7 @@ mod tests { ]"#; let actions = serde_json::from_str(actions).unwrap(); - let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#; + let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#; let settings = serde_json::from_str(settings).unwrap(); let seed = r#"{"rng":"ChaCha20","seed":"e0355d5c6c63ef757d1b874b0392a3deec73cadfb0a2aa7947a04db651bf9269"}"#; @@ -651,7 +648,7 @@ mod tests { ]"#; let actions = serde_json::from_str(actions).unwrap(); - let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout_secs":null}"#; + let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout":null}"#; let settings = serde_json::from_str(settings).unwrap(); let seed = r#"{"rng":"ChaCha20","seed":"f05dc83bdce966e72a3a81b19ccded2e70387eb68deacf60ed8de1ee78b9ff0e"}"#; @@ -734,7 +731,7 @@ mod tests { ]"#; let actions = serde_json::from_str(actions).unwrap(); - let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#; + let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#; let settings = serde_json::from_str(settings).unwrap(); let seed = r#"{"rng":"ChaCha20","seed":"fd87ec4b51fcaf056ef53c0460322e1fa5261cf2801d005065c9add8ec541bb4"}"#; diff --git a/src/game/whist.rs b/src/game/whist.rs index be75887..80c800c 100644 --- a/src/game/whist.rs +++ b/src/game/whist.rs @@ -1,11 +1,10 @@ use std::collections::{HashMap, HashSet}; -use std::time::SystemTime; use crate::card::{Card, Suit, FIFTY_TWO_CARD_DECK}; use crate::rng::{Seed, WaveRng}; use crate::seats::Seats; use crate::username::Username; -use crate::util::max::IteratorMaxItems; +use crate::util::{max::IteratorMaxItems, timestamp::Timestamp}; use super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction}; @@ -303,7 +302,7 @@ impl Game for KnockOutWhist { } } - fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction { + fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction { let mut rng = self.rng.clone(); match self.state { State::NotStarted => { @@ -416,8 +415,6 @@ impl Game for KnockOutWhist { mod tests { use super::*; - use std::time::SystemTime; - fn test_game(actions: Vec, settings: KnockOutWhistSettings, seed: Seed) { let mut game = KnockOutWhist::new(0, settings, seed); for action in actions { @@ -428,7 +425,7 @@ mod tests { game.take_action(validated).unwrap(); } _ => { - let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH); + let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME); if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action { assert_eq!(action, dealer_action); game.take_action(ValidatedUserAction(action)).unwrap(); diff --git a/src/main.rs b/src/main.rs index 1e6697c..f4d2b0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ use crate::api::ServerMessage; use crate::client::{ClientInterest, ConnectionState}; use crate::dealer::Dealer; use crate::game::GameList; -use crate::server::{ClientInterestSender, Server, ServerState}; +use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState}; pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { #[derive(Debug)] @@ -116,17 +116,21 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec } Action::SendInterest{interest} => { debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest); - if let Ok(interest) = TryFrom::try_from(interest) { - for (index, Client{sender, interests}) in clients.iter_mut().enumerate() { - if interests.contains(&interest) { - debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index); - if let Err(err) = sender.interest.send(interest.clone()).await { - error!("handle_client_interest: Send failed: {}", err); + match TryFrom::try_from(interest) { + Ok(interest) => { + for (index, Client{sender, interests}) in clients.iter_mut().enumerate() { + if interests.contains(&interest) { + debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index); + if let Err(err) = sender.interest.send(interest.clone()).await { + error!("handle_client_interest: Send failed: {}", err); + } } } + }, + Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}, + Err(ClientInterestFromMsgError::InvalidChannelName{channel_name}) => { + error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name); } - } else { - error!("handle_client_interest: Failed to convert interest to ClientInterest"); } } Action::RemoveClient{index} => { diff --git a/src/server.rs b/src/server.rs index 9556f3c..3f804c7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,6 +10,7 @@ use crate::client::ClientInterest; use crate::game::{GameSettings, GameSummary, ValidatedUserAction}; use crate::rng::Seed; use crate::username::Username; +use crate::util::timestamp::Timestamp; #[derive(Clone)] pub struct Server { @@ -39,9 +40,10 @@ fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) { (tx, rx) } +#[derive(Debug)] pub enum ClientInterestFromMsgError { InvalidChannelName{channel_name: String}, - UsernameParseError(&'static str), + TimeoutMessageWasNotExpired, } impl TryFrom for ClientInterest { @@ -51,12 +53,27 @@ impl TryFrom for ClientInterest { if channel_name == "__keyspace@0__:game:list" { Ok(ClientInterest::GameList) } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") { - username.parse().map_err(ClientInterestFromMsgError::UsernameParseError) - .map(|username| ClientInterest::User{username}) - } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) { - Ok(ClientInterest::Game{id}) - } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) { - Ok(ClientInterest::Timeout{timestamp}) + match username.parse() { + Ok(username) => Ok(ClientInterest::User{username}), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}), + } + } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") { + if let Some(id) = key.strip_suffix(":actions") { + match id.parse() { + Ok(id) => Ok(ClientInterest::Game{id}), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}), + } + } else if let Some(id) = key.strip_suffix(":timeout") { + match msg.get_payload::() { + Ok(str) if str == "expired" => match id.parse() { + Ok(id) => Ok(ClientInterest::Timeout{id}), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}), + }, + _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired), + } + } else { + Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}) + } } else { Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}) } @@ -71,7 +88,7 @@ impl ToRedisArgs for ClientInterest { ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"), ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)), ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)), - ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)), + ClientInterest::Timeout{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)), } } } @@ -129,8 +146,8 @@ fn game_actions_key(id: i64) -> String { format!("game:{}:actions", id) } -fn timeout_key(timestamp: i64) -> String { - format!("timeout:{}", timestamp) +fn timeout_key(id: i64) -> String { + format!("game:{}:timeout", id) } impl ServerState { @@ -209,10 +226,14 @@ impl ServerState { self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await } - pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> { - let key = timeout_key(timestamp); - debug!("Creating timeout {}", key); - cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await + pub async fn now(&mut self) -> RedisResult { + cmd("TIME").query_async(&mut self.redis).await + } + + pub async fn create_timeout(&mut self, id: i64, timestamp: Timestamp) -> RedisResult<()> { + let key = timeout_key(id); + debug!("Creating timeout {} at {}", key, timestamp); + cmd("SET").arg(key).arg(timestamp).arg("PXAT").arg(timestamp).query_async(&mut self.redis).await } pub async fn register_interests(&mut self, interests: HashSet) { @@ -281,3 +302,60 @@ impl ToRedisArgs for AsJson true } } + +#[cfg(test)] +mod tests { + use super::*; + + use redis::{Msg, Value}; + + #[test] + fn convert_channel_names_to_interest() { + let msg = Value::Bulk(vec![ + Value::Data(b"message".to_vec()), + Value::Data(b"__keyspace@0__:game:list".to_vec()), + Value::Data(b"expire".to_vec()), + ]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::GameList, interest); + + let msg = Value::Bulk(vec![ + Value::Data(b"message".to_vec()), + Value::Data(b"__keyspace@0__:user:player1".to_vec()), + Value::Data(b"set".to_vec()), + ]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::User{username: "player1".parse().unwrap()}, interest); + + let msg = Value::Bulk(vec![ + Value::Data(b"message".to_vec()), + Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), + Value::Data(b"rpush".to_vec()), + ]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::Game{id: 12345}, interest); + + let msg = Value::Bulk(vec![ + Value::Data(b"message".to_vec()), + Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), + Value::Data(b"expired".to_vec()), + ]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::Timeout{id: 12345}, interest); + } + + #[test] + fn ignores_timeout_key_set() { + let msg = Value::Bulk(vec![ + Value::Data(b"message".to_vec()), + Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), + Value::Data(b"set".to_vec()), + ]); + let msg = Msg::from_value(&msg).unwrap(); + assert!(ClientInterest::try_from(msg).is_err()); + } +} diff --git a/src/util/millis.rs b/src/util/millis.rs deleted file mode 100644 index 08bf84a..0000000 --- a/src/util/millis.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::fmt; -use std::time::{Duration, SystemTime}; - -use serde::{Serializer, Deserializer, de::{self, Visitor}}; - -pub fn deserialize<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de> -{ - struct SystemTimeVisitor; - - impl<'de> Visitor<'de> for SystemTimeVisitor { - type Value = SystemTime; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a timestamp in milliseconds since the unix epoch") - } - - fn visit_i64(self, value: i64) -> Result - where - E: de::Error - { - Ok(from_millis(value)) - } - - fn visit_u64(self, value: u64) -> Result - where - E: de::Error - { - Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value)) - } - } - - deserializer.deserialize_i64(SystemTimeVisitor) -} - -pub fn serialize(timestamp: &SystemTime, serializer: S) -> Result -where - S: Serializer -{ - serializer.serialize_i64(to_millis(timestamp)) -} - -pub fn to_millis(timestamp: &SystemTime) -> i64 { - match timestamp.duration_since(SystemTime::UNIX_EPOCH) { - Ok(duration) => { - duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64 - } - Err(err) => { - let duration = err.duration(); - -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64) - } - } -} - -pub fn from_millis(millis: i64) -> SystemTime { - if millis >= 0 { - SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64) - } else { - SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64) - } -} diff --git a/src/util/mod.rs b/src/util/mod.rs index 0fe572d..34aac28 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,2 +1,2 @@ pub mod max; -pub mod millis; +pub mod timestamp; diff --git a/src/util/timestamp.rs b/src/util/timestamp.rs new file mode 100644 index 0000000..a2e8461 --- /dev/null +++ b/src/util/timestamp.rs @@ -0,0 +1,69 @@ +use std::fmt::{self, Display, Formatter}; + +use redis::{FromRedisValue, Value, RedisResult, ToRedisArgs, RedisWrite, NumericBehavior, RedisError, ErrorKind}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct Timestamp(i64); + +impl Timestamp { + #[cfg(test)] + pub const TEST_TIME: Timestamp = Timestamp(0); + + pub fn plus_millis(self, millis: i64) -> Self { + Timestamp(self.0.saturating_add(millis)) + } +} + +impl Display for Timestamp { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl FromRedisValue for Timestamp { + fn from_redis_value(value: &Value) -> RedisResult { + match value { + Value::Bulk(values) => match &**values { + [secs, micros] => { + let secs = i64::from_redis_value(secs)?; + let micros = i64::from_redis_value(micros)?; + Ok(Timestamp(secs * 1000 + micros / 1000)) + } + _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))), + } + _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))), + } + } +} + +impl ToRedisArgs for Timestamp { + fn write_redis_args(&self, out: &mut W) + where W: ?Sized + RedisWrite + { + self.0.write_redis_args(out) + } + + fn describe_numeric_behavior(&self) -> NumericBehavior { + NumericBehavior::NumberIsInteger + } + + fn is_single_arg(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_redis_time_command() { + let value = Value::Bulk(vec![ + Value::Data(b"1614792013".to_vec()), + Value::Data(b"938726".to_vec()), + ]); + let timestamp = Timestamp::from_redis_value(&value).unwrap(); + assert_eq!(Timestamp(1614792013938), timestamp); + } +} -- 2.34.1