GameList,
Game { id: i64 },
User { username: Username },
+ Timeout { timestamp: i64 },
}
#[derive(Debug, Clone)]
_ => empty().boxed(),
}
ClientInterest::User{..} => empty().boxed(), // TODO
+ ClientInterest::Timeout{..} => empty().boxed(), // TODO
}
}
use crate::client::ClientInterest;
use crate::game::{Game, DealerAction, ValidatedUserAction};
use crate::server::{ActionStatus, ServerState};
+use crate::util::millis::to_millis;
pub struct Dealer {
server: ServerState,
game: Box<dyn Game>,
}
+enum Termination {
+ Continue,
+ Break,
+}
+
impl Dealer {
pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
let mut interests = HashSet::new();
pub async fn start(mut self, mut update_stream: Receiver<ClientInterest>) {
while let Some(_) = update_stream.next().await {
match self.retrieve_updates().await {
- Ok(()) => continue,
+ Ok(Termination::Continue) => continue,
+ Ok(Termination::Break) => break,
Err(err) => {
error!("Could not retrieve updates: {}", err);
break;
}
}
- pub async fn retrieve_updates(&mut self) -> RedisResult<()> {
+ async fn retrieve_updates(&mut self) -> RedisResult<Termination> {
let id = self.dealer.game.id();
+ info!("Dealer is retrieving updates for game {}", id);
'retrieve_updates: loop {
let from = self.dealer.game.actions_len();
let actions = self.server.game_state(id, from).await?;
+ info!("Got {} actions for game {}", actions.len(), id);
for action in actions {
debug!("Taking action: {:?}", action);
if let Err(err) = self.dealer.game.take_action(action) {
Ok(ActionStatus::Interrupted) => continue 'retrieve_updates,
Err(err) => return Err(err),
},
- DealerAction::WaitUntil(_) => todo!(),
- DealerAction::WaitForPlayer => return Ok(()),
- DealerAction::Leave => return Ok(()), // TODO
+ DealerAction::WaitUntil(timestamp) => {
+ let timestamp = to_millis(×tamp);
+ self.server.create_timeout(timestamp).await?;
+ let mut interests = HashSet::new();
+ interests.insert(ClientInterest::Game{id});
+ interests.insert(ClientInterest::Timeout{timestamp});
+ self.server.register_interests(interests).await;
+ return Ok(Termination::Continue);
+ }
+ DealerAction::WaitForPlayer => return Ok(Termination::Continue),
+ DealerAction::Leave => return Ok(Termination::Break),
}
}
}
}
}
}
+ } else {
+ error!("handle_client_interest: Failed to convert interest to ClientInterest");
}
}
Action::RemoveClient{index} => {
use std::convert::TryFrom;
use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt};
-use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection};
+use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value, aio::MultiplexedConnection, cmd};
use serde::{Serialize, Deserialize};
use crate::auth::Auth;
.map(|username| ClientInterest::User{username})
} else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").and_then(|str| str.strip_suffix(":actions")).map(str::parse) {
Ok(ClientInterest::Game{id})
+ } else if let Some(Ok(timestamp)) = channel_name.strip_prefix("__keyspace@0__:timeout:").map(str::parse) {
+ Ok(ClientInterest::Timeout{timestamp})
} else {
Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
}
ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+ ClientInterest::Timeout{timestamp} => out.write_arg_fmt(format!("__keyspace@0__:timeout:{}", timestamp)),
}
}
}
format!("game:{}:actions", id)
}
+fn timeout_key(timestamp: i64) -> String {
+ format!("timeout:{}", timestamp)
+}
+
impl ServerState {
pub async fn create_user(&mut self, username: Username, auth: Auth, nickname: &str) -> RedisResult<()> {
let key = user_key(username);
self.take_action_script.key(key).arg(len).arg(AsJson(action)).invoke_async(&mut self.redis).await
}
+ pub async fn create_timeout(&mut self, timestamp: i64) -> RedisResult<()> {
+ let key = timeout_key(timestamp);
+ debug!("Creating timeout {}", key);
+ cmd("SET").arg(key).arg("").arg("PXAT").arg(timestamp).arg("NX").query_async(&mut self.redis).await
+ }
+
pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
if let Err(err) = self.register_interest.send(interests).await {
error!("register_interests: could not register interest: {}", err);
where
E: de::Error
{
- if value >= 0 {
- Ok(SystemTime::UNIX_EPOCH + Duration::from_millis(value as u64))
- } else {
- Ok(SystemTime::UNIX_EPOCH - Duration::from_millis((-value) as u64))
- }
+ Ok(from_millis(value))
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
S: Serializer
{
+ serializer.serialize_i64(to_millis(timestamp))
+}
+
+pub fn to_millis(timestamp: &SystemTime) -> i64 {
match timestamp.duration_since(SystemTime::UNIX_EPOCH) {
Ok(duration) => {
- let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64;
- serializer.serialize_i64(millis)
+ duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64
}
Err(err) => {
let duration = err.duration();
- let millis = duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64;
- serializer.serialize_i64(-millis)
+ -(duration.as_secs() as i64 * 1000 + duration.subsec_millis() as i64)
}
}
}
+
+pub fn from_millis(millis: i64) -> SystemTime {
+ if millis >= 0 {
+ SystemTime::UNIX_EPOCH + Duration::from_millis(millis as u64)
+ } else {
+ SystemTime::UNIX_EPOCH - Duration::from_millis((-millis) as u64)
+ }
+}