minimal chat server working
authorGeoffrey Allott <geoffrey@allott.email>
Fri, 5 Feb 2021 00:47:27 +0000 (00:47 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Fri, 5 Feb 2021 00:47:27 +0000 (00:47 +0000)
src/api.rs
src/auth.rs
src/client.rs
src/game.rs
src/gamestate.rs
src/main.rs
src/server.rs

index 17cb91d9aaaaa60ec83a767462779159e78a7a7c..8daac46b4902b61dffbd3b2ec01811047f757ac5 100644 (file)
@@ -2,7 +2,7 @@ use crate::auth::Auth;
 use crate::game::{Action, Game, GameSettings, GameSummary, UserAction};
 
 #[derive(Debug, Clone, Deserialize)]
-enum Scope {
+pub enum Scope {
     Global,
     Game { id: u32 },
     Hand { id: u32 },
index c25f8d2f71bd472ba52ab7a40be7e84f94e77d9e..fd3049af0dc7d62f742c256f6d2f5bd2007faf53 100644 (file)
@@ -6,7 +6,7 @@ pub enum Auth {
 }
 
 impl Auth {
-    pub fn verify(&self, challenge: &str, signature: &str) -> bool {
+    pub fn verify(&self, _challenge: &str, signature: &str) -> bool {
         match self {
             Auth::NoLogin => false,
             Auth::Plain{password} => signature == password,
index f2006a1fe536402a5606ddfbf55072334a51d69b..2bb2e399d69c8f5aa771be78e53505716c369be3 100644 (file)
@@ -1,4 +1,4 @@
-use std::sync::{Arc, RwLock};
+use std::collections::HashSet;
 
 use futures::stream::{Stream, StreamExt, empty, iter, once};
 
@@ -21,10 +21,11 @@ pub enum ClientState {
     },
 }
 
-pub enum ClientUpdate {
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum ClientInterest {
     GameList,
-    Game,
-    Users,
+    Game { id: u32 },
+    User { username: String },
 }
 
 #[derive(Debug, Clone)]
@@ -41,11 +42,11 @@ impl ConnectionState {
         }
     }
 
-    pub async fn retrieve_updates(&mut self, update: ClientUpdate) -> impl Stream<Item=ServerMessage> {
+    pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item=ServerMessage> {
         match update {
-            ClientUpdate::GameList => empty().boxed(), // TODO
-            ClientUpdate::Game => match &mut self.client {
-                ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} => {
+            ClientInterest::GameList => empty().boxed(), // TODO
+            ClientInterest::Game{id} => match &mut self.client {
+                ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} if game.id() == id => {
                     let from = game.actions_len();
                     match self.server.update_game_state(game).await {
                         Ok(()) => iter(game.update_view_for(username, from)).map(|action| ServerMessage::NewAction{action}).boxed(),
@@ -54,10 +55,27 @@ impl ConnectionState {
                 }
                 _ => empty().boxed(),
             }
-            ClientUpdate::Users => empty().boxed(), // TODO
+            ClientInterest::User{..} => empty().boxed(), // TODO
         }
     }
 
+    pub fn interests(&self) -> HashSet<ClientInterest> {
+        let mut ret = HashSet::new();
+        if let ClientState::LoggedIn{ref username, ref state} = &self.client {
+            let username = username.to_string();
+            ret.insert(ClientInterest::User{username});
+            ret.insert(ClientInterest::GameList);
+            if let LoggedInState::InGame{ref game} = state {
+                ret.insert(ClientInterest::Game{id: game.id()});
+                for username in game.players() {
+                    let username = username.to_string();
+                    ret.insert(ClientInterest::User{username});
+                }
+            }
+        }
+        ret
+    }
+
     pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
         match (&mut self.client, message) {
             (_, ClientMessage::CreateUser{username, auth, nickname}) => {
@@ -74,19 +92,20 @@ impl ConnectionState {
             (ClientState::LoginAuthIssued{username, challenge}, ClientMessage::LoginAuthResponse{signature}) => {
                 if self.server.verify(&username, &challenge, &signature).await {
                     self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby};
+                    self.server.register_interests(self.interests()).await;
                     ServerMessage::LoginSuccess
                 } else {
                     self.client = ClientState::Connected;
                     ServerMessage::LoginFailure{reason: "Invalid username or password".to_string()}
                 }
             }
-            (ClientState::LoggedIn{username, ..}, ClientMessage::GetGameList{tags}) => {
+            (ClientState::LoggedIn{..}, ClientMessage::GetGameList{..}) => {
                 match self.server.get_game_list().await {
                     Ok(games) => ServerMessage::GameList{games},
                     Err(err) => ServerMessage::GameListFailure{reason: err.to_string()},
                 }
             }
-            (ClientState::LoggedIn{username, ..}, ClientMessage::CreateGame{settings}) => {
+            (ClientState::LoggedIn{..}, ClientMessage::CreateGame{settings}) => {
                 match self.server.create_game(settings).await {
                     Ok(id) => ServerMessage::CreateGameSuccess{id},
                     Err(err) => ServerMessage::CreateGameFailure{reason: err.to_string()},
@@ -97,6 +116,7 @@ impl ConnectionState {
                     Ok(game) => {
                         let game_view = game.view_for(&username);
                         self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InGame{game}};
+                        self.server.register_interests(self.interests()).await;
                         ServerMessage::JoinGameSuccess{game: game_view}
                     }
                     Err(err) => ServerMessage::JoinGameFailure{reason: err.to_string()},
@@ -115,6 +135,7 @@ impl ConnectionState {
                             Err(err) => return ServerMessage::TakeActionFailure{reason: err.to_string()},
                         }
                         Ok(ActionStatus::Interrupted) => {
+                            debug!("Action {:?} was interrupted - updating game state", action);
                             if let Err(err) = self.server.update_game_state(game).await {
                                 return ServerMessage::TakeActionFailure{reason: err.to_string()};
                             }
@@ -125,10 +146,12 @@ impl ConnectionState {
             }
             (ClientState::LoggedIn{username, state: LoggedInState::InGame{..}}, ClientMessage::LeaveGame) => {
                 self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby};
+                self.server.register_interests(self.interests()).await;
                 ServerMessage::LeaveGameSuccess
             }
-            (ClientState::LoggedIn{username, ..}, ClientMessage::Logout) => {
+            (ClientState::LoggedIn{..}, ClientMessage::Logout) => {
                 self.client = ClientState::Connected;
+                self.server.register_interests(self.interests()).await;
                 ServerMessage::LogoutSuccess
             }
             (ClientState::LoggedIn{username, ..}, ClientMessage::ChangeAuth{auth}) => {
index ce72da5f78d519c31d683010f137a0c0e2f7b1a7..b4d8f073512a5af395cc4945d4761255917a228c 100644 (file)
@@ -1,3 +1,4 @@
+use std::collections::HashSet;
 use std::fmt::Debug;
 
 use crate::card::Card;
@@ -109,6 +110,10 @@ impl Game {
         self.state.actions_len()
     }
 
+    pub fn players(&self) -> HashSet<&str> {
+        self.state.players()
+    }
+
     pub fn view_for(&self, username: &str) -> Game {
         Game {
             summary: self.summary.clone(),
@@ -127,7 +132,7 @@ impl Game {
                 Action::Join{seat: 0, chips: 0} if self.state.user_has_joined(username) => Err(ActionError::AlreadyJoined),
                 Action::Join{seat: 0, chips: 0} if self.state.num_players() + 1 > max_users => Err(ActionError::NoSeatAvailable),
                 Action::Join{seat: 0, chips: 0} => Ok(()),
-                Action::Message{ref message} if self.state.user_has_joined(username) => Ok(()),
+                Action::Message{..} if self.state.user_has_joined(username) => Ok(()),
                 Action::Message{..} => Err(ActionError::NotAuthorised),
                 Action::Leave if self.state.user_has_joined(username) => Ok(()),
                 Action::Leave => Err(ActionError::NotAuthorised),
index c1d135069c5d4387fa70b9857c01594fc90d022d..b76761193b0b7c227e3e85aa0487ca4002babd40 100644 (file)
@@ -1,3 +1,5 @@
+use std::collections::HashSet;
+
 use crate::game::{Action, UserAction};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -38,6 +40,18 @@ impl GameState {
         logged_in
     }
 
+    pub fn players(&self) -> HashSet<&str> {
+        let mut players = HashSet::new();
+        for action in &self.actions {
+            match action.action {
+                Action::Join{..} => players.insert(&*action.username),
+                Action::Leave => players.remove(&*action.username),
+                _ => continue,
+            };
+        }
+        players
+    }
+
     pub fn num_players(&self) -> u32 {
         let mut num_players = 0;
         for action in &self.actions {
index 25386beac888816363272c9db04c56dc526f2081..4fe04b5b012be16cc759028a273471c8a8ffa1c7 100644 (file)
@@ -1,10 +1,12 @@
 #[macro_use] extern crate log;
 #[macro_use] extern crate serde_derive;
 
-use std::pin::Pin;
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::mem::swap;
 
-use async_std::prelude::*;
-use futures::{future::select, future::Either, pin_mut, Stream, StreamExt, stream};
+use futures::{channel::mpsc::{channel, Receiver}, future::{Either, select}, select, FutureExt, SinkExt, StreamExt, pin_mut, stream::{self, FuturesUnordered, pending}};
+use redis::{aio::PubSub, Client, Msg};
 use signal_hook::consts::signal::*;
 use signal_hook_async_std::Signals;
 use tide::{Body, Request, Error, Response, StatusCode};
@@ -21,16 +23,127 @@ mod gamestate;
 mod server;
 
 use crate::api::ServerMessage;
-use crate::server::ServerState;
-use crate::client::{ClientUpdate, ConnectionState};
+use crate::server::{ClientInterestSender, Server, ServerState};
+use crate::client::{ClientInterest, ConnectionState};
+
+pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
+    #[derive(Debug)]
+    struct Client {
+        sender: ClientInterestSender,
+        interests: HashSet<ClientInterest>,
+    }
+    debug!("handle_client_interest: init");
+    let mut clients: Vec<Client> = Vec::new();
+    loop {
+        enum Action {
+            AddClient { sender: ClientInterestSender },
+            RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
+            SendInterest { interest: Msg },
+            RemoveClient { index: usize },
+            NoNewClients,
+            ConnectionClosed,
+        }
+        match {
+            let mut connection_on_message = connection.on_message();
+            let mut next_interest = connection_on_message.next().fuse();
+            let mut next_client = new_clients.next().fuse();
+            let mut next_client_interest = FuturesUnordered::new();
+            for (index, Client{ref mut sender, ..}) in clients.iter_mut().enumerate() {
+                next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
+            }
+            let mut next_client_interest = next_client_interest.select_next_some();
+            let mut action = Action::ConnectionClosed;
+            select! {
+                interest = next_interest => {
+                    if let Some(interest) = interest {
+                        action = Action::SendInterest{interest};
+                    } else {
+                        action = Action::ConnectionClosed;
+                    }
+                }
+                sender = next_client => {
+                    if let Some(sender) = sender {
+                        action = Action::AddClient{sender};
+                    } else {
+                        action = Action::NoNewClients;
+                    }
+                }
+                client_interests = next_client_interest => {
+                    match client_interests {
+                        (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
+                        (index, None) => action = Action::RemoveClient{index},
+                    }
+                }
+            }
+            action
+        } {
+            Action::AddClient{sender} => {
+                debug!("handle_client_interest: Action::AddClient {{ sender: {:?} }}", sender);
+                clients.push(Client{sender, interests: HashSet::new()});
+            }
+            Action::RegisterInterest{index, mut client_interests} => {
+                debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
+                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+                swap(&mut clients[index].interests, &mut client_interests);
+                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+                for interest in &interests_after - &interests_prior {
+                    debug!("handle_client_interest: Subscribing to {:?}", interest);
+                    if let Err(err) = connection.subscribe(interest).await {
+                        error!("handle_client_interest: Subscribe failed: {}", err);
+                    }
+                }
+                for interest in &interests_prior - &interests_after {
+                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+                    if let Err(err) = connection.unsubscribe(interest).await {
+                        error!("handle_client_interest: Unsubscribe failed: {}", err);
+                    }
+                }
+            }
+            Action::SendInterest{interest} => {
+                debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
+                if let Ok(interest) = TryFrom::try_from(interest) {
+                    for Client{sender, interests} in &mut clients {
+                        if interests.contains(&interest) {
+                            debug!("handle_client_interest: Sending {:?} to {:?}", interest, sender);
+                            if let Err(err) = sender.interest.send(interest.clone()).await {
+                                error!("handle_client_interest: Send failed: {}", err);
+                            }
+                        }
+                    }
+                }
+            }
+            Action::RemoveClient{index} => {
+                debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
+                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+                clients.remove(index);
+                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+                for interest in &interests_prior - &interests_after {
+                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+                    if let Err(err) = connection.unsubscribe(interest).await {
+                        error!("handle_client_interest: Unsubscribe failed: {}", err);
+                    }
+                }
+            }
+            Action::NoNewClients => {
+                debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
+                if clients.is_empty() {
+                    return Ok(());
+                }
+            }
+            Action::ConnectionClosed => {
+                debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
+                return Ok(());
+            }
+        }
+    }
+}
 
-pub async fn handle_websocket_request(request: Request<ServerState>, mut stream: WebSocketConnection) -> Result<(), Error> {
-    let mut client = ConnectionState::new(request.state().clone());
-    let update_stream: Pin<Box<dyn Stream<Item=ClientUpdate> + Send>>;
-    update_stream = stream::empty().boxed(); // TODO
+pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
+    let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
+    let mut client = ConnectionState::new(server_state);
     let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
     while let Some(message) = combined.next().await {
-        let message: Either<Result<Message, tide_websockets::Error>, ClientUpdate> = message;
+        let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
         match message {
             Either::Left(Ok(Message::Text(input))) => {
                 let response = match serde_json::from_str(&input) {
@@ -40,15 +153,23 @@ pub async fn handle_websocket_request(request: Request<ServerState>, mut stream:
                 stream.send_json(&response).await?;
             }
             Either::Right(update) => {
+                debug!("client received update: {:?}", update);
                 let mut responses = client.retrieve_updates(update).await;
                 while let Some(response) = responses.next().await {
                     stream.send_json(&response).await?
                 }
             }
-            Either::Left(Ok(_)) => {
-                error!("Websocket received non-text");
+            Either::Left(Ok(Message::Close(_))) => {
+                info!("Websocket closed");
+                break;
+            }
+            Either::Left(Ok(Message::Binary(_))) => {
+                error!("Websocket received unexpected binary data");
                 break;
             }
+            Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
+                continue;
+            }
             Either::Left(Err(err)) => {
                 error!("Websocket error: {}", err);
                 break;
@@ -79,7 +200,13 @@ async fn serve_404(response: Response) -> Result<Response, Error> {
 async fn main() -> Result<(), Error> {
     env_logger::init();
 
-    let mut app = tide::with_state(ServerState::new("redis://localhost/").await?);
+    const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
+    let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
+
+    let client = Client::open("redis://localhost/")?;
+    let connection = client.get_multiplexed_async_std_connection().await?;
+    let pubsub = client.get_async_std_connection().await?.into_pubsub();
+    let mut app = tide::with_state(Server::new(connection, register_update_stream_tx));
 
     app.at("/").serve_dir("site/")?;
     app.at("/").serve_file("site/index.html")?;
@@ -89,15 +216,17 @@ async fn main() -> Result<(), Error> {
     let signals = Signals::new(&[SIGINT])?;
     let signal_handler = handle_signals(signals);
 
+    let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
+
     let listener = TlsListener::build()
         .addrs("localhost:4433")
         .cert("cert/cert.pem")
         .key("cert/key.pem");
 
     let app = app.listen(listener);
-    pin_mut!(signal_handler, app);
+    pin_mut!(app, handle_client_interest, signal_handler);
 
-    select(app, signal_handler).await.factor_first().0?;
+    select(select(app, handle_client_interest).map(|f| f.factor_first().0), signal_handler).await.factor_first().0?;
 
     info!("Pokerwave shut down gracefully.");
 
index 54d5f032f23e14f8b1bfd9dcaf122d3d5abd9214..155c0c07e28b496f699a514f263a494e401cd187 100644 (file)
@@ -1,14 +1,94 @@
-use futures::Stream;
-use redis::{AsyncCommands, Client, ErrorKind, FromRedisValue, IntoConnectionInfo, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection};
+use std::collections::HashSet;
+use std::convert::TryFrom;
+
+use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt};
+use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection};
 use serde::{Serialize, Deserialize};
 
 use crate::auth::Auth;
+use crate::client::ClientInterest;
 use crate::game::{Game, GameSettings, GameSummary, UserAction};
 
 #[derive(Clone)]
+pub struct Server {
+    redis: MultiplexedConnection,
+    register_update_stream: Sender<ClientInterestSender>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestSender {
+    pub register_interest: Receiver<HashSet<ClientInterest>>,
+    pub interest: Sender<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestReceiver {
+    pub register_interest: Sender<HashSet<ClientInterest>>,
+    pub interest: Receiver<ClientInterest>,
+}
+
+fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
+    const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
+    const INTEREST_CHANNEL_BUFFER: usize = 1024;
+    let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
+    let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
+    let tx = ClientInterestSender{register_interest: register_interest_rx, interest: interest_tx};
+    let rx = ClientInterestReceiver{register_interest: register_interest_tx, interest: interest_rx};
+    (tx, rx)
+}
+
+pub enum ClientInterestFromMsgError {
+    InvalidChannelName{channel_name: String},
+}
+
+impl TryFrom<Msg> for ClientInterest {
+    type Error = ClientInterestFromMsgError;
+    fn try_from(msg: Msg) -> Result<Self, Self::Error> {
+        let channel_name = msg.get_channel_name();
+        if channel_name == "__keyspace@0__:games" {
+            Ok(ClientInterest::GameList)
+        } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
+            Ok(ClientInterest::User{username: username.to_string()})
+        } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").map(str::parse) {
+            Ok(ClientInterest::Game{id})
+        } else {
+            Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
+        }
+    }
+}
+
+impl ToRedisArgs for ClientInterest {
+    fn write_redis_args<W>(&self, out: &mut W)
+        where W: ?Sized + RedisWrite
+    {
+        match self {
+            ClientInterest::GameList => out.write_arg(b"__keyspace@0__:games"),
+            ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}", id)),
+            ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+        }
+    }
+}
+
+impl Server {
+    pub fn new(redis: MultiplexedConnection, register_update_stream: Sender<ClientInterestSender>) -> Self {
+        Self {
+            redis,
+            register_update_stream
+        }
+    }
+
+    pub async fn new_state(&self) -> (ServerState, Receiver<ClientInterest>) {
+        let (client_interest_sender, ClientInterestReceiver{register_interest, interest}) = client_interest_channel();
+        if let Err(err) = self.register_update_stream.clone().send(client_interest_sender).await {
+            error!("new_state: Send failed: {}", err);
+        }
+        (ServerState{redis: self.redis.clone(), register_interest}, interest)
+    }
+}
+
 pub struct ServerState {
-    client: Client,
     redis: MultiplexedConnection,
+    register_interest: Sender<HashSet<ClientInterest>>,
 }
 
 fn user_key(username: &str) -> String {
@@ -33,18 +113,12 @@ const TAKE_ACTION_LUA_SCRIPT: &'static str = r#"
 "#;
 
 impl ServerState {
-    pub async fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
-        let client = Client::open(params)?;
-        let redis = client.get_multiplexed_async_std_connection().await?;
-        Ok(Self{client, redis})
-    }
-
     pub async fn create_user(&mut self, username: &str, auth: Auth, nickname: &str) -> RedisResult<()> {
         let key = user_key(username);
         if self.redis.hset_nx::<_, _, _, i32>(&key, "auth", AsJson(auth)).await? == 0 {
             return Err(RedisError::from((ErrorKind::ResponseError, "User already exists")));
         }
-        self.redis.hset(&key, "nickname", username).await
+        self.redis.hset(&key, "nickname", nickname).await
     }
 
     pub async fn set_user_auth(&mut self, username: &str, auth: Auth) -> RedisResult<()> {
@@ -93,7 +167,7 @@ impl ServerState {
             let actions: Vec<AsJson<UserAction>> = self.redis.lrange(&key, i, i + GAME_ACTION_BLOCK_SIZE).await?;
             if actions.is_empty() { break; }
             for AsJson(action) in actions {
-                game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action")))?;
+                game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action", err.to_string())))?;
             }
         }
         Ok(())
@@ -111,6 +185,12 @@ impl ServerState {
         debug!("redis: executing: EVAL {:?} 1 {:?} {} {:?}", TAKE_ACTION_LUA_SCRIPT, key, len, serde_json::to_string(&action).unwrap());
         redis::cmd("EVAL").arg(TAKE_ACTION_LUA_SCRIPT).arg(1).arg(key).arg(len).arg(AsJson(action)).query_async(&mut self.redis).await
     }
+
+    pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
+        if let Err(err) = self.register_interest.send(interests).await {
+            error!("register_interests: could not register interest: {}", err);
+        }
+    }
 }
 
 pub enum ActionStatus {