fix timestamps to use simple newtype around millis
authorGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 18:24:10 +0000 (18:24 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 18:24:10 +0000 (18:24 +0000)
13 files changed:
src/api.rs
src/client.rs
src/dealer.rs
src/game/action.rs
src/game/chatroom.rs
src/game/mod.rs
src/game/poker/holdem.rs
src/game/whist.rs
src/main.rs
src/server.rs
src/util/millis.rs [deleted file]
src/util/mod.rs
src/util/timestamp.rs [new file with mode: 0644]

index 36d3ee3cd4cb18d22aaacf5430beaa1ae3be53f0..1f9e581622cf95a0a951740ba6c255652c3219a0 100644 (file)
@@ -57,4 +57,5 @@ pub enum ServerMessage {
     LeaveLobbySuccess,
     LeaveLobbyFailure { reason: String },
     ProtocolError { reason: String },
+    InternalError { reason: String },
 }
index 23a8a4483b84f0a251af123bf68b2b38c0952950..2ff9f91c4d57a2bf00ba65356b24ce4836977e24 100644 (file)
@@ -1,5 +1,4 @@
 use std::collections::HashSet;
-use std::time::SystemTime;
 
 use futures::stream::{Stream, StreamExt, empty, iter, once};
 
@@ -28,7 +27,7 @@ pub enum ClientInterest {
     GameList,
     Game { id: i64 },
     User { username: Username },
-    Timeout { timestamp: i64 },
+    Timeout { id: i64 },
 }
 
 #[derive(Debug, Clone)]
@@ -176,7 +175,10 @@ impl ConnectionState {
                 }
             }
             (&mut ClientState::LoggedIn{username, state: LoggedInState::InGame{ref mut game}}, ClientMessage::TakeAction{action}) => {
-                let timestamp = SystemTime::now(); // TODO use time from db?
+                let timestamp = match self.server.now().await {
+                    Ok(timestamp) => timestamp,
+                    Err(err) => return ServerMessage::InternalError{reason: err.to_string()},
+                };
                 let action = UserAction{timestamp, username, action};
                 let id = game.id();
                 loop {
index c9cb93ab66e515a47a6038c6b2141dfdfcb49081..ad4e42caf6a31d83166ef1d1c755a6503fe4ff08 100644 (file)
@@ -1,5 +1,4 @@
 use std::collections::HashSet;
-use std::time::SystemTime;
 
 use async_std::stream::StreamExt;
 use futures::channel::mpsc::Receiver;
@@ -8,7 +7,6 @@ use redis::{ErrorKind, RedisError, RedisResult};
 use crate::client::ClientInterest;
 use crate::game::{Game, DealerAction, ValidatedUserAction};
 use crate::server::{ActionStatus, ServerState};
-use crate::util::millis::to_millis;
 
 pub struct Dealer {
     server: ServerState,
@@ -29,6 +27,7 @@ impl Dealer {
     pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
         let mut interests = HashSet::new();
         interests.insert(ClientInterest::Game{id});
+        interests.insert(ClientInterest::Timeout{id});
         server.register_interests(interests).await;
         let summary = server.game_summary(id).await?;
         let seed = server.game_seed(id).await?;
@@ -66,7 +65,7 @@ impl Dealer {
                 debug!("Dealer: Game state: {:#?}", self.dealer.game);
             }
             'take_action: loop {
-                let timestamp = SystemTime::now();
+                let timestamp = self.server.now().await?;
                 match self.dealer.game.next_dealer_action(timestamp) {
                     DealerAction::TakeAction(action) => match self.take_action(action).await {
                         Ok(ActionStatus::Committed) => {
@@ -77,12 +76,7 @@ impl Dealer {
                         Err(err) => return Err(err),
                     },
                     DealerAction::WaitUntil(timestamp) => {
-                        let timestamp = to_millis(&timestamp);
-                        self.server.create_timeout(timestamp).await?;
-                        let mut interests = HashSet::new();
-                        interests.insert(ClientInterest::Game{id});
-                        interests.insert(ClientInterest::Timeout{timestamp});
-                        self.server.register_interests(interests).await;
+                        self.server.create_timeout(id, timestamp).await?;
                         return Ok(Termination::Continue);
                     }
                     DealerAction::WaitForPlayer => return Ok(Termination::Continue),
index 05a666964c92464f214e53120726587ad2b1ab12..e7a97537bd927d94d882b145ce487260b7206cf2 100644 (file)
@@ -1,14 +1,13 @@
 use std::cmp::PartialEq;
 use std::fmt::{Debug, Display, Formatter};
-use std::time::SystemTime;
 
 use crate::card::{Card, Suit};
 use crate::username::Username;
+use crate::util::timestamp::Timestamp;
 
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct UserAction {
-    #[serde(with = "crate::util::millis")]
-    pub timestamp: SystemTime,
+    pub timestamp: Timestamp,
     pub username: Username,
     pub action: Action,
 }
@@ -30,7 +29,7 @@ impl ValidatedUserAction {
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum DealerAction {
     TakeAction(ValidatedUserAction),
-    WaitUntil(SystemTime),
+    WaitUntil(Timestamp),
     WaitForPlayer,
     Leave,
 }
index 66934a86b5c0c64074b9aaaf41fd38c564df5681..6cd839938937a49caebf711ecd4ddedda58ebe6e 100644 (file)
@@ -1,8 +1,8 @@
 use std::collections::HashSet;
-use std::time::SystemTime;
 
 use crate::username::{DEALER, Username};
 use crate::game::{Action, ActionError, DealerAction};
+use crate::util::timestamp::Timestamp;
 
 use super::{Game, UserAction};
 use super::ValidatedUserAction;
@@ -16,7 +16,7 @@ enum ChatroomAction {
 
 #[derive(Debug, Clone)]
 struct ChatroomUserAction {
-    timestamp: SystemTime,
+    timestamp: Timestamp,
     username: Username,
     action: ChatroomAction,
 }
@@ -90,7 +90,7 @@ impl Game for Chatroom {
         }
     }
 
-    fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+    fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
         match self.messages.len() {
             n if n % 10 == 0 => DealerAction::TakeAction(
                 ValidatedUserAction(UserAction{
index e4ad5b1b50b15025f3ffd874b72a8aa3f3b4bc1a..e7088b76820deed3f219ac77c2750ba35984fe93 100644 (file)
@@ -5,10 +5,10 @@ mod whist;
 
 use std::collections::HashSet;
 use std::fmt::Debug;
-use std::time::SystemTime;
 
 use crate::rng::Seed;
 use crate::username::Username;
+use crate::util::timestamp::Timestamp;
 
 use self::chatroom::{Chatroom, ChatroomSettings};
 use self::whist::{KnockOutWhist, KnockOutWhistSettings};
@@ -22,7 +22,7 @@ pub trait Game : Debug + CloneBoxGame + Send + Sync {
     fn actions_len(&self) -> usize;
     fn validate_action(&self, action: UserAction) -> Result<ValidatedUserAction, ActionError>;
     fn take_action(&mut self, action: ValidatedUserAction) -> Result<(), ActionError>;
-    fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction;
+    fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction;
 }
 
 pub trait CloneBoxGame {
index 5fdeaf23916b43af7fa9774067b9ec818585eeea..22ad26e0cf83e45c1558fc8ec38be3dee66eccba 100644 (file)
@@ -1,14 +1,13 @@
 use std::collections::{HashMap, HashSet};
 use std::convert::TryInto;
-use std::time::{Duration, SystemTime};
 
 use itertools::Itertools;
 
 use crate::card::{Card, FIFTY_TWO_CARD_DECK};
 use crate::seats::Seats;
-use crate::username::Username;
-use crate::util::max::IteratorMaxItems;
 use crate::rng::{Seed, WaveRng};
+use crate::username::Username;
+use crate::util::{max::IteratorMaxItems, timestamp::Timestamp};
 
 use super::super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction};
 
@@ -37,7 +36,7 @@ pub struct TexasHoldEmSettings {
     max_players: u32,
     small_blind: u64,
     starting_stack: u64,
-    action_timeout_secs: Option<u64>,
+    action_timeout: Option<i64>,
 }
 
 #[derive(Clone, Debug)]
@@ -46,7 +45,7 @@ pub struct TexasHoldEm {
     settings: TexasHoldEmSettings,
     rng: WaveRng,
     actions_len: usize,
-    last_action_time: Option<SystemTime>,
+    last_action_time: Option<Timestamp>,
     state: State,
     seats: Seats,
     stacks: HashMap<Username, u64>,
@@ -352,7 +351,7 @@ impl Game for TexasHoldEm {
         }
     }
 
-    fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+    fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
         let mut rng = self.rng.clone();
         match self.state {
             State::NotStarted => {
@@ -466,8 +465,8 @@ impl Game for TexasHoldEm {
                         error!("Logic error: no dealer could be chosen: {:#?}", self);
                         DealerAction::Leave
                     }
-                } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout_secs) {
-                    let timeout_time = last_time + Duration::from_secs(timeout);
+                } else if let (Some(last_time), Some(username), Some(timeout)) = (self.last_action_time, self.active, self.settings.action_timeout) {
+                    let timeout_time = last_time.plus_millis(timeout);
                     if timestamp >= timeout_time {
                         DealerAction::TakeAction(
                             ValidatedUserAction(UserAction{timestamp, username, action: Action::Fold})
@@ -550,8 +549,6 @@ impl Game for TexasHoldEm {
 mod tests {
     use super::*;
 
-    use std::time::SystemTime;
-
     fn test_game(actions: Vec<UserAction>, settings: TexasHoldEmSettings, seed: Seed) {
         let mut game = TexasHoldEm::new(0, settings, seed);
         for action in actions {
@@ -562,7 +559,7 @@ mod tests {
                     game.take_action(validated).unwrap();
                 }
                 _ => {
-                    let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH);
+                    let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME);
                     if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action {
                         assert_eq!(action, dealer_action);
                         game.take_action(ValidatedUserAction(action)).unwrap();
@@ -605,7 +602,7 @@ mod tests {
         ]"#;
         let actions = serde_json::from_str(actions).unwrap();
 
-        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#;
+        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#;
         let settings = serde_json::from_str(settings).unwrap();
 
         let seed = r#"{"rng":"ChaCha20","seed":"e0355d5c6c63ef757d1b874b0392a3deec73cadfb0a2aa7947a04db651bf9269"}"#;
@@ -651,7 +648,7 @@ mod tests {
         ]"#;
         let actions = serde_json::from_str(actions).unwrap();
 
-        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout_secs":null}"#;
+        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":25,"starting_stack":1000,"action_timeout":null}"#;
         let settings = serde_json::from_str(settings).unwrap();
 
         let seed = r#"{"rng":"ChaCha20","seed":"f05dc83bdce966e72a3a81b19ccded2e70387eb68deacf60ed8de1ee78b9ff0e"}"#;
@@ -734,7 +731,7 @@ mod tests {
         ]"#;
         let actions = serde_json::from_str(actions).unwrap();
 
-        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout_secs":null}"#;
+        let settings = r#"{"title":"2-Player TexasHoldEm Test","max_players":2,"small_blind":100,"starting_stack":1000,"action_timeout":null}"#;
         let settings = serde_json::from_str(settings).unwrap();
 
         let seed = r#"{"rng":"ChaCha20","seed":"fd87ec4b51fcaf056ef53c0460322e1fa5261cf2801d005065c9add8ec541bb4"}"#;
index be75887cd0fc707fc43b47a13d9c4044856d787f..80c800c3046debb66fabc4fba3481c66543b0b80 100644 (file)
@@ -1,11 +1,10 @@
 use std::collections::{HashMap, HashSet};
-use std::time::SystemTime;
 
 use crate::card::{Card, Suit, FIFTY_TWO_CARD_DECK};
 use crate::rng::{Seed, WaveRng};
 use crate::seats::Seats;
 use crate::username::Username;
-use crate::util::max::IteratorMaxItems;
+use crate::util::{max::IteratorMaxItems, timestamp::Timestamp};
 
 use super::{Action, ActionError, DealerAction, Game, UserAction, ValidatedUserAction};
 
@@ -303,7 +302,7 @@ impl Game for KnockOutWhist {
         }
     }
 
-    fn next_dealer_action(&self, timestamp: SystemTime) -> DealerAction {
+    fn next_dealer_action(&self, timestamp: Timestamp) -> DealerAction {
         let mut rng = self.rng.clone();
         match self.state {
             State::NotStarted => {
@@ -416,8 +415,6 @@ impl Game for KnockOutWhist {
 mod tests {
     use super::*;
 
-    use std::time::SystemTime;
-
     fn test_game(actions: Vec<UserAction>, settings: KnockOutWhistSettings, seed: Seed) {
         let mut game = KnockOutWhist::new(0, settings, seed);
         for action in actions {
@@ -428,7 +425,7 @@ mod tests {
                     game.take_action(validated).unwrap();
                 }
                 _ => {
-                    let dealer_action = game.next_dealer_action(SystemTime::UNIX_EPOCH);
+                    let dealer_action = game.next_dealer_action(Timestamp::TEST_TIME);
                     if let DealerAction::TakeAction(ValidatedUserAction(dealer_action)) = dealer_action {
                         assert_eq!(action, dealer_action);
                         game.take_action(ValidatedUserAction(action)).unwrap();
index 1e6697c10d79441e88a397edd95e43142792f674..f4d2b0e92f5c2444dbc4fcc1ac928e389a7a5714 100644 (file)
@@ -31,7 +31,7 @@ use crate::api::ServerMessage;
 use crate::client::{ClientInterest, ConnectionState};
 use crate::dealer::Dealer;
 use crate::game::GameList;
-use crate::server::{ClientInterestSender, Server, ServerState};
+use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
 
 pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
     #[derive(Debug)]
@@ -116,17 +116,21 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec
             }
             Action::SendInterest{interest} => {
                 debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
-                if let Ok(interest) = TryFrom::try_from(interest) {
-                    for (index, Client{sender, interests}) in clients.iter_mut().enumerate() {
-                        if interests.contains(&interest) {
-                            debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
-                            if let Err(err) = sender.interest.send(interest.clone()).await {
-                                error!("handle_client_interest: Send failed: {}", err);
+                match TryFrom::try_from(interest) {
+                    Ok(interest) => {
+                        for (index, Client{sender, interests}) in clients.iter_mut().enumerate() {
+                            if interests.contains(&interest) {
+                                debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
+                                if let Err(err) = sender.interest.send(interest.clone()).await {
+                                    error!("handle_client_interest: Send failed: {}", err);
+                                }
                             }
                         }
+                    },
+                    Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {},
+                    Err(ClientInterestFromMsgError::InvalidChannelName{channel_name}) => {
+                        error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
                     }
-                } else {
-                    error!("handle_client_interest: Failed to convert interest to ClientInterest");
                 }
             }
             Action::RemoveClient{index} => {
index 9556f3cde1e2e5864d93381e5a4ac88f3e761632..3f804c7e4dbd240fc70c61eadbeb38e8a117bdd1 100644 (file)
@@ -10,6 +10,7 @@ use crate::client::ClientInterest;
 use crate::game::{GameSettings, GameSummary, ValidatedUserAction};
 use crate::rng::Seed;
 use crate::username::Username;
+use crate::util::timestamp::Timestamp;
 
 #[derive(Clone)]
 pub struct Server {
@@ -39,9 +40,10 @@ fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
     (tx, rx)
 }
 
+#[derive(Debug)]
 pub enum ClientInterestFromMsgError {
     InvalidChannelName{channel_name: String},
-    UsernameParseError(&'static str),
+    TimeoutMessageWasNotExpired,
 }
 
 impl TryFrom<Msg> for ClientInterest {
@@ -51,12 +53,27 @@ impl TryFrom<Msg> for ClientInterest {
         if channel_name == "__keyspace@0__:game:list" {
             Ok(ClientInterest::GameList)
         } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
-            username.parse().map_err(ClientInterestFromMsgError::UsernameParseError)
-                .map(|username| ClientInterest::User{username})
-        } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) {
-            Ok(ClientInterest::Game{id})
-        } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) {
-            Ok(ClientInterest::Timeout{timestamp})
+            match username.parse() {
+                Ok(username) => Ok(ClientInterest::User{username}),
+                Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+            }
+        } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
+            if let Some(id) = key.strip_suffix(":actions") {
+                match id.parse() {
+                    Ok(id) => Ok(ClientInterest::Game{id}),
+                    Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+                }
+            } else if let Some(id) = key.strip_suffix(":timeout") {
+                match msg.get_payload::<String>() {
+                    Ok(str) if str == "expired" => match id.parse() {
+                        Ok(id) => Ok(ClientInterest::Timeout{id}),
+                        Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}),
+                    },
+                    _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
+                }
+            } else {
+                Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
+            }
         } else {
             Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
         }
@@ -71,7 +88,7 @@ impl ToRedisArgs for ClientInterest {
             ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
             ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
             ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
-            ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)),
+            ClientInterest::Timeout{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
         }
     }
 }
@@ -129,8 +146,8 @@ fn game_actions_key(id: i64) -> String {
     format!("game:{}:actions", id)
 }
 
-fn timeout_key(timestamp: i64) -> String {
-    format!("timeout:{}", timestamp)
+fn timeout_key(id: i64) -> String {
+    format!("game:{}:timeout", id)
 }
 
 impl ServerState {
@@ -209,10 +226,14 @@ impl ServerState {
         self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await
     }
 
-    pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> {
-        let key = timeout_key(timestamp);
-        debug!("Creating timeout {}", key);
-        cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await
+    pub async fn now(&mut self) -> RedisResult<Timestamp> {
+        cmd("TIME").query_async(&mut self.redis).await
+    }
+
+    pub async fn create_timeout(&mut self, id: i64, timestamp: Timestamp) -> RedisResult<()> {
+        let key = timeout_key(id);
+        debug!("Creating timeout {} at {}", key, timestamp);
+        cmd("SET").arg(key).arg(timestamp).arg("PXAT").arg(timestamp).query_async(&mut self.redis).await
     }
 
     pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
@@ -281,3 +302,60 @@ impl<T> ToRedisArgs for AsJson<T>
         true
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use redis::{Msg, Value};
+
+    #[test]
+    fn convert_channel_names_to_interest() {
+        let msg = Value::Bulk(vec![
+            Value::Data(b"message".to_vec()),
+            Value::Data(b"__keyspace@0__:game:list".to_vec()),
+            Value::Data(b"expire".to_vec()),
+        ]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::GameList, interest);
+
+        let msg = Value::Bulk(vec![
+            Value::Data(b"message".to_vec()),
+            Value::Data(b"__keyspace@0__:user:player1".to_vec()),
+            Value::Data(b"set".to_vec()),
+        ]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::User{username: "player1".parse().unwrap()}, interest);
+
+        let msg = Value::Bulk(vec![
+            Value::Data(b"message".to_vec()),
+            Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()),
+            Value::Data(b"rpush".to_vec()),
+        ]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::Game{id: 12345}, interest);
+
+        let msg = Value::Bulk(vec![
+            Value::Data(b"message".to_vec()),
+            Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()),
+            Value::Data(b"expired".to_vec()),
+        ]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::Timeout{id: 12345}, interest);
+    }
+
+    #[test]
+    fn ignores_timeout_key_set() {
+        let msg = Value::Bulk(vec![
+            Value::Data(b"message".to_vec()),
+            Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()),
+            Value::Data(b"set".to_vec()),
+        ]);
+        let msg = Msg::from_value(&msg).unwrap();
+        assert!(ClientInterest::try_from(msg).is_err());
+    }
+}
diff --git a/src/util/millis.rs b/src/util/millis.rs
deleted file mode 100644 (file)
index 08bf84a..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-use std::fmt;
-use std::time::{Duration, SystemTime};
-
-use serde::{Serializer, Deserializer, de::{self, Visitor}};
-
-pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
-where
-    D: Deserializer<'de>
-{
-    struct SystemTimeVisitor;
-
-    impl<'de> Visitor<'de> for SystemTimeVisitor {
-        type Value = SystemTime;
-
-        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
-            formatter.write_str("a timestamp in milliseconds since the unix epoch")
-        }
-
-        fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
-        where
-            E: de::Error
-        {
-            Ok(from_millis(value))
-        }
-
-        fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
-        where
-            E: de::Error
-        {
-            Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value))
-        }
-    }
-
-    deserializer.deserialize_i64(SystemTimeVisitor)
-}
-
-pub fn serialize<S>(timestamp: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
-where
-    S: Serializer
-{
-    serializer.serialize_i64(to_millis(timestamp))
-}
-
-pub fn to_millis(timestamp: &SystemTime) -> i64 {
-    match timestamp.duration_since(SystemTime::UNIX_EPOCH) {
-        Ok(duration) => {
-            duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64
-        }
-        Err(err) => {
-            let duration = err.duration();
-            -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64)
-        }
-    }
-}
-
-pub fn from_millis(millis: i64) -> SystemTime {
-    if millis >= 0 {
-        SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64)
-    } else {
-        SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64)
-    }
-}
index 0fe572d92b11db1e53513aff16fc65ca0811db5a..34aac28e1472ebe6901c38fe286f273dedc71c07 100644 (file)
@@ -1,2 +1,2 @@
 pub mod max;
-pub mod millis;
+pub mod timestamp;
diff --git a/src/util/timestamp.rs b/src/util/timestamp.rs
new file mode 100644 (file)
index 0000000..a2e8461
--- /dev/null
@@ -0,0 +1,69 @@
+use std::fmt::{self, Display, Formatter};
+
+use redis::{FromRedisValue, Value, RedisResult, ToRedisArgs, RedisWrite, NumericBehavior, RedisError, ErrorKind};
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
+#[serde(transparent)]
+pub struct Timestamp(i64);
+
+impl Timestamp {
+    #[cfg(test)]
+    pub const TEST_TIME: Timestamp = Timestamp(0);
+
+    pub fn plus_millis(self, millis: i64) -> Self {
+        Timestamp(self.0.saturating_add(millis))
+    }
+}
+
+impl Display for Timestamp {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        self.0.fmt(f)
+    }
+}
+
+impl FromRedisValue for Timestamp {
+    fn from_redis_value(value: &Value) -> RedisResult<Self> {
+        match value {
+            Value::Bulk(values) => match &**values {
+                [secs, micros] => {
+                    let secs = i64::from_redis_value(secs)?;
+                    let micros = i64::from_redis_value(micros)?;
+                    Ok(Timestamp(secs * 1000 + micros / 1000))
+                }
+                _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))),
+            }
+            _ => Err(RedisError::from((ErrorKind::TypeError, "Expected exactly 2 integers, secs and micros"))),
+        }
+    }
+}
+
+impl ToRedisArgs for Timestamp {
+    fn write_redis_args<W>(&self, out: &mut W)
+        where W: ?Sized + RedisWrite
+    {
+        self.0.write_redis_args(out)
+    }
+
+    fn describe_numeric_behavior(&self) -> NumericBehavior {
+        NumericBehavior::NumberIsInteger
+    }
+
+    fn is_single_arg(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn from_redis_time_command() {
+        let value = Value::Bulk(vec![
+            Value::Data(b"1614792013".to_vec()),
+            Value::Data(b"938726".to_vec()),
+        ]);
+        let timestamp = Timestamp::from_redis_value(&value).unwrap();
+        assert_eq!(Timestamp(1614792013938), timestamp);
+    }
+}