initial timeout implementation
authorGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 00:49:35 +0000 (00:49 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Wed, 3 Mar 2021 00:49:35 +0000 (00:49 +0000)
src/client.rs
src/dealer.rs
src/main.rs
src/server.rs
src/util/millis.rs

index bb6cca373c032a3448f01795e1db8b6e5934725a..23a8a4483b84f0a251af123bf68b2b38c0952950 100644 (file)
@@ -28,6 +28,7 @@ pub enum ClientInterest {
     GameList,
     Game { id: i64 },
     User { username: Username },
+    Timeout { timestamp: i64 },
 }
 
 #[derive(Debug, Clone)]
@@ -82,6 +83,7 @@ impl ConnectionState {
                 _ => empty().boxed(),
             }
             ClientInterest::User{..} => empty().boxed(), // TODO
+            ClientInterest::Timeout{..} => empty().boxed(), // TODO
         }
     }
 
index 55c9bc6bb4862764c169b16442c9a972d30f8ed9..c9cb93ab66e515a47a6038c6b2141dfdfcb49081 100644 (file)
@@ -8,6 +8,7 @@ use redis::{ErrorKind, RedisError, RedisResult};
 use crate::client::ClientInterest;
 use crate::game::{Game, DealerAction, ValidatedUserAction};
 use crate::server::{ActionStatus, ServerState};
+use crate::util::millis::to_millis;
 
 pub struct Dealer {
     server: ServerState,
@@ -19,6 +20,11 @@ pub struct DealerState {
     game: Box<dyn Game>,
 }
 
+enum Termination {
+    Continue,
+    Break,
+}
+
 impl Dealer {
     pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
         let mut interests = HashSet::new();
@@ -35,7 +41,8 @@ impl Dealer {
     pub async fn start(mut self, mut update_stream: Receiver<ClientInterest>) {
         while let Some(_) = update_stream.next().await {
             match self.retrieve_updates().await {
-                Ok(()) => continue,
+                Ok(Termination::Continue) => continue,
+                Ok(Termination::Break) => break,
                 Err(err) => {
                     error!("Could not retrieve updates: {}", err);
                     break;
@@ -44,11 +51,13 @@ impl Dealer {
         }
     }
 
-    pub async fn retrieve_updates(&mut self) -> RedisResult<()> {
+    async fn retrieve_updates(&mut self) -> RedisResult<Termination> {
         let id = self.dealer.game.id();
+        info!("Dealer is retrieving updates for game {}", id);
         'retrieve_updates: loop {
             let from = self.dealer.game.actions_len();
             let actions = self.server.game_state(id, from).await?;
+            info!("Got {} actions for game {}", actions.len(), id);
             for action in actions {
                 debug!("Taking action: {:?}", action);
                 if let Err(err) = self.dealer.game.take_action(action) {
@@ -67,9 +76,17 @@ impl Dealer {
                         Ok(ActionStatus::Interrupted) => continue 'retrieve_updates,
                         Err(err) => return Err(err),
                     },
-                    DealerAction::WaitUntil(_) => todo!(),
-                    DealerAction::WaitForPlayer => return Ok(()),
-                    DealerAction::Leave => return Ok(()), // TODO
+                    DealerAction::WaitUntil(timestamp) => {
+                        let timestamp = to_millis(&timestamp);
+                        self.server.create_timeout(timestamp).await?;
+                        let mut interests = HashSet::new();
+                        interests.insert(ClientInterest::Game{id});
+                        interests.insert(ClientInterest::Timeout{timestamp});
+                        self.server.register_interests(interests).await;
+                        return Ok(Termination::Continue);
+                    }
+                    DealerAction::WaitForPlayer => return Ok(Termination::Continue),
+                    DealerAction::Leave => return Ok(Termination::Break),
                 }
             }
         }
index 1cd378b0536246c710d5fc6543bfebaf4e90b661..1e6697c10d79441e88a397edd95e43142792f674 100644 (file)
@@ -125,6 +125,8 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec
                             }
                         }
                     }
+                } else {
+                    error!("handle_client_interest: Failed to convert interest to ClientInterest");
                 }
             }
             Action::RemoveClient{index} => {
index 12a4b36c74ea50c3b5cb3324b512063d6b99eac3..9556f3cde1e2e5864d93381e5a4ac88f3e761632 100644 (file)
@@ -2,7 +2,7 @@ use std::collections::HashSet;
 use std::convert::TryFrom;
 
 use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt};
-use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection};
+use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection, cmd};
 use serde::{Serialize, Deserialize};
 
 use crate::auth::Auth;
@@ -55,6 +55,8 @@ impl TryFrom<Msg> for ClientInterest {
                 .map(|username| ClientInterest::User{username})
         } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) {
             Ok(ClientInterest::Game{id})
+        } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) {
+            Ok(ClientInterest::Timeout{timestamp})
         } else {
             Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
         }
@@ -69,6 +71,7 @@ impl ToRedisArgs for ClientInterest {
             ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
             ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
             ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+            ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)),
         }
     }
 }
@@ -126,6 +129,10 @@ fn game_actions_key(id: i64) -> String {
     format!("game:{}:actions", id)
 }
 
+fn timeout_key(timestamp: i64) -> String {
+    format!("timeout:{}", timestamp)
+}
+
 impl ServerState {
     pub async fn create_user(&mut self, username: Username, auth: Auth, nickname: &str) -> RedisResult<()> {
         let key = user_key(username);
@@ -202,6 +209,12 @@ impl ServerState {
         self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await
     }
 
+    pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> {
+        let key = timeout_key(timestamp);
+        debug!("Creating timeout {}", key);
+        cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await
+    }
+
     pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
         if let Err(err) = self.register_interest.send(interests).await {
             error!("register_interests: could not register interest: {}", err);
index 64f3603a31020864c625369c87bc267c705ff7b8..08bf84a0a1ac0033f3106488dd69f3fc81a7417b 100644 (file)
@@ -20,11 +20,7 @@ where
         where
             E: de::Error
         {
-            if value >= 0 {
-                Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value as u64))
-            } else {
-                Ok(SystemTime::UNIX_EPOCH - Duration::from_millis((-value) as u64))
-            }
+            Ok(from_millis(value))
         }
 
         fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
@@ -42,15 +38,25 @@ pub fn serialize<S>(timestamp: &SystemTime, serializer: S) -> Result<S::Ok, S::E
 where
     S: Serializer
 {
+    serializer.serialize_i64(to_millis(timestamp))
+}
+
+pub fn to_millis(timestamp: &SystemTime) -> i64 {
     match timestamp.duration_since(SystemTime::UNIX_EPOCH) {
         Ok(duration) => {
-            let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64;
-            serializer.serialize_i64(millis)
+            duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64
         }
         Err(err) => {
             let duration = err.duration();
-            let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64;
-            serializer.serialize_i64(-millis)
+            -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64)
         }
     }
 }
+
+pub fn from_millis(millis: i64) -> SystemTime {
+    if millis >= 0 {
+        SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64)
+    } else {
+        SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64)
+    }
+}