From f7bc9424cec1da1fbbe787c537a47a9b370f08b7 Mon Sep 17 00:00:00 2001 From: Geoffrey Allott Date: Wed, 3 Mar 2021 00:49:35 +0000 Subject: [PATCH] initial timeout implementation --- src/client.rs | 2 ++ src/dealer.rs | 27 ++++++++++++++++++++++----- src/main.rs | 2 ++ src/server.rs | 15 ++++++++++++++- src/util/millis.rs | 24 +++++++++++++++--------- 5 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/client.rs b/src/client.rs index bb6cca3..23a8a44 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,6 +28,7 @@ pub enum ClientInterest { GameList, Game { id: i64 }, User { username: Username }, + Timeout { timestamp: i64 }, } #[derive(Debug, Clone)] @@ -82,6 +83,7 @@ impl ConnectionState { _ => empty().boxed(), } ClientInterest::User{..} => empty().boxed(), // TODO + ClientInterest::Timeout{..} => empty().boxed(), // TODO } } diff --git a/src/dealer.rs b/src/dealer.rs index 55c9bc6..c9cb93a 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -8,6 +8,7 @@ use redis::{ErrorKind, RedisError, RedisResult}; use crate::client::ClientInterest; use crate::game::{Game, DealerAction, ValidatedUserAction}; use crate::server::{ActionStatus, ServerState}; +use crate::util::millis::to_millis; pub struct Dealer { server: ServerState, @@ -19,6 +20,11 @@ pub struct DealerState { game: Box, } +enum Termination { + Continue, + Break, +} + impl Dealer { pub async fn new(mut server: ServerState, id: i64) -> RedisResult { let mut interests = HashSet::new(); @@ -35,7 +41,8 @@ impl Dealer { pub async fn start(mut self, mut update_stream: Receiver) { while let Some(_) = update_stream.next().await { match self.retrieve_updates().await { - Ok(()) => continue, + Ok(Termination::Continue) => continue, + Ok(Termination::Break) => break, Err(err) => { error!("Could not retrieve updates: {}", err); break; @@ -44,11 +51,13 @@ impl Dealer { } } - pub async fn retrieve_updates(&mut self) -> RedisResult<()> { + async fn retrieve_updates(&mut self) -> RedisResult { let id = self.dealer.game.id(); + info!("Dealer is retrieving updates for game {}", id); 'retrieve_updates: loop { let from = self.dealer.game.actions_len(); let actions = self.server.game_state(id, from).await?; + info!("Got {} actions for game {}", actions.len(), id); for action in actions { debug!("Taking action: {:?}", action); if let Err(err) = self.dealer.game.take_action(action) { @@ -67,9 +76,17 @@ impl Dealer { Ok(ActionStatus::Interrupted) => continue 'retrieve_updates, Err(err) => return Err(err), }, - DealerAction::WaitUntil(_) => todo!(), - DealerAction::WaitForPlayer => return Ok(()), - DealerAction::Leave => return Ok(()), // TODO + DealerAction::WaitUntil(timestamp) => { + let timestamp = to_millis(×tamp); + self.server.create_timeout(timestamp).await?; + let mut interests = HashSet::new(); + interests.insert(ClientInterest::Game{id}); + interests.insert(ClientInterest::Timeout{timestamp}); + self.server.register_interests(interests).await; + return Ok(Termination::Continue); + } + DealerAction::WaitForPlayer => return Ok(Termination::Continue), + DealerAction::Leave => return Ok(Termination::Break), } } } diff --git a/src/main.rs b/src/main.rs index 1cd378b..1e6697c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -125,6 +125,8 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec } } } + } else { + error!("handle_client_interest: Failed to convert interest to ClientInterest"); } } Action::RemoveClient{index} => { diff --git a/src/server.rs b/src/server.rs index 12a4b36..9556f3c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::convert::TryFrom; use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt}; -use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection}; +use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection, cmd}; use serde::{Serialize, Deserialize}; use crate::auth::Auth; @@ -55,6 +55,8 @@ impl TryFrom for ClientInterest { .map(|username| ClientInterest::User{username}) } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) { Ok(ClientInterest::Game{id}) + } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) { + Ok(ClientInterest::Timeout{timestamp}) } else { Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()}) } @@ -69,6 +71,7 @@ impl ToRedisArgs for ClientInterest { ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"), ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)), ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)), + ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)), } } } @@ -126,6 +129,10 @@ fn game_actions_key(id: i64) -> String { format!("game:{}:actions", id) } +fn timeout_key(timestamp: i64) -> String { + format!("timeout:{}", timestamp) +} + impl ServerState { pub async fn create_user(&mut self, username: Username, auth: Auth, nickname: &str) -> RedisResult<()> { let key = user_key(username); @@ -202,6 +209,12 @@ impl ServerState { self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await } + pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> { + let key = timeout_key(timestamp); + debug!("Creating timeout {}", key); + cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await + } + pub async fn register_interests(&mut self, interests: HashSet) { if let Err(err) = self.register_interest.send(interests).await { error!("register_interests: could not register interest: {}", err); diff --git a/src/util/millis.rs b/src/util/millis.rs index 64f3603..08bf84a 100644 --- a/src/util/millis.rs +++ b/src/util/millis.rs @@ -20,11 +20,7 @@ where where E: de::Error { - if value >= 0 { - Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value as u64)) - } else { - Ok(SystemTime::UNIX_EPOCH - Duration::from_millis((-value) as u64)) - } + Ok(from_millis(value)) } fn visit_u64(self, value: u64) -> Result @@ -42,15 +38,25 @@ pub fn serialize(timestamp: &SystemTime, serializer: S) -> Result i64 { match timestamp.duration_since(SystemTime::UNIX_EPOCH) { Ok(duration) => { - let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64; - serializer.serialize_i64(millis) + duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64 } Err(err) => { let duration = err.duration(); - let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64; - serializer.serialize_i64(-millis) + -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64) } } } + +pub fn from_millis(millis: i64) -> SystemTime { + if millis >= 0 { + SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64) + } else { + SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64) + } +} -- 2.34.1