move subscription functionality to pubsub module
authorGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 15:36:25 +0000 (15:36 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 15:36:25 +0000 (15:36 +0000)
src/client.rs
src/dealer.rs
src/main.rs
src/pubsub.rs [new file with mode: 0644]
src/server.rs

index f6523eb393bfa326bfb78e2e3e5b7d72a6beb11d..bbe6cdf1f0233fcf3aa2b40863ce879d478d4c13 100644 (file)
@@ -10,6 +10,7 @@ use tide_websockets::{Message, WebSocketConnection};
 
 use crate::api::{ClientMessage, ServerMessage};
 use crate::game::{Game, GameList, UserAction};
+use crate::pubsub::ClientInterest;
 use crate::server::{ActionStatus, Server, ServerState};
 use crate::username::Username;
 use crate::util::dedup::DedupReadyExt;
@@ -26,14 +27,6 @@ enum ClientState {
     LoggedIn { username: Username, state: LoggedInState },
 }
 
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum ClientInterest {
-    GameList,
-    Game { id: i64 },
-    User { username: Username },
-    Timeout { id: i64 },
-}
-
 #[derive(Debug, Clone)]
 enum LoggedInState {
     Idle,
index 98b66dc9734e38d6d7643a838c647b7383a9c79b..e43997e14b5524aa6d611526a1889bfef18a32b4 100644 (file)
@@ -4,8 +4,8 @@ use async_std::task::spawn;
 use futures::{channel::mpsc::Receiver, StreamExt};
 use redis::{ErrorKind, RedisError, RedisResult};
 
-use crate::client::ClientInterest;
 use crate::game::{DealerAction, Game, GameList, ValidatedUserAction};
+use crate::pubsub::ClientInterest;
 use crate::server::{ActionStatus, Server, ServerState};
 use crate::util::dedup::DedupReadyExt;
 
index 68bdd75ea1f6388efd94b0a14bbb42e5c28e40de..e251ad823d979d1ab09c018e6bcf504930764966 100644 (file)
@@ -3,18 +3,8 @@ extern crate log;
 #[macro_use]
 extern crate serde_derive;
 
-use std::collections::HashSet;
-use std::convert::TryFrom;
-use std::mem::swap;
-
-use futures::{
-    channel::mpsc::{channel, Receiver},
-    future::select,
-    pin_mut, select,
-    stream::FuturesUnordered,
-    FutureExt, SinkExt, StreamExt,
-};
-use redis::{aio::PubSub, Client, Msg};
+use futures::{channel::mpsc::channel, future::select, pin_mut, FutureExt, StreamExt};
+use redis::Client;
 use signal_hook::consts::signal::*;
 use signal_hook_async_std::Signals;
 use tide::utils::After;
@@ -28,141 +18,17 @@ mod card;
 mod client;
 mod dealer;
 mod game;
+mod pubsub;
 mod rng;
 mod seats;
 mod server;
 mod username;
 mod util;
 
-use crate::client::{new_client, ClientInterest};
+use crate::client::new_client;
 use crate::dealer::spawn_dealers;
-use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server};
-
-pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
-    #[derive(Debug)]
-    struct Client {
-        sender: ClientInterestSender,
-        interests: HashSet<ClientInterest>,
-    }
-    debug!("handle_client_interest: init");
-    let mut clients: Vec<Client> = Vec::new();
-    loop {
-        enum Action {
-            AddClient { sender: ClientInterestSender },
-            RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
-            SendInterest { interest: Msg },
-            RemoveClient { index: usize },
-            NoNewClients,
-            ConnectionClosed,
-        }
-        match {
-            let mut connection_on_message = connection.on_message();
-            let mut next_interest = connection_on_message.next().fuse();
-            let mut next_client = new_clients.next().fuse();
-            let mut next_client_interest = FuturesUnordered::new();
-            for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() {
-                next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
-            }
-            let mut next_client_interest = next_client_interest.select_next_some();
-            let action;
-            select! {
-                interest = next_interest => {
-                    if let Some(interest) = interest {
-                        action = Action::SendInterest{interest};
-                    } else {
-                        action = Action::ConnectionClosed;
-                    }
-                }
-                sender = next_client => {
-                    if let Some(sender) = sender {
-                        action = Action::AddClient{sender};
-                    } else {
-                        action = Action::NoNewClients;
-                    }
-                }
-                client_interests = next_client_interest => {
-                    match client_interests {
-                        (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
-                        (index, None) => action = Action::RemoveClient{index},
-                    }
-                }
-            }
-            action
-        } {
-            Action::AddClient { sender } => {
-                debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len());
-                clients.push(Client { sender, interests: HashSet::new() });
-            }
-            Action::RegisterInterest { index, mut client_interests } => {
-                debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
-                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
-                swap(&mut clients[index].interests, &mut client_interests);
-                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
-                for interest in &interests_after - &interests_prior {
-                    debug!("handle_client_interest: Subscribing to {:?}", interest);
-                    if let Err(err) = connection.subscribe(interest).await {
-                        error!("handle_client_interest: Subscribe failed: {}", err);
-                    }
-                }
-                for interest in &interests_prior - &interests_after {
-                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
-                    if let Err(err) = connection.unsubscribe(interest).await {
-                        error!("handle_client_interest: Unsubscribe failed: {}", err);
-                    }
-                }
-                let client = &mut clients[index];
-                let sender = &mut client.sender;
-                for interest in &client.interests - &client_interests {
-                    debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index);
-                    if let Err(err) = sender.interest.send(interest.clone()).await {
-                        error!("handle_client_interest: Send failed: {}", err);
-                    }
-                }
-            }
-            Action::SendInterest { interest } => {
-                debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
-                match TryFrom::try_from(interest) {
-                    Ok(interest) => {
-                        for (index, Client { sender, interests }) in clients.iter_mut().enumerate() {
-                            if interests.contains(&interest) {
-                                debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
-                                if let Err(err) = sender.interest.send(interest.clone()).await {
-                                    error!("handle_client_interest: Send failed: {}", err);
-                                }
-                            }
-                        }
-                    }
-                    Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}
-                    Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => {
-                        error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
-                    }
-                }
-            }
-            Action::RemoveClient { index } => {
-                debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
-                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
-                clients.remove(index);
-                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
-                for interest in &interests_prior - &interests_after {
-                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
-                    if let Err(err) = connection.unsubscribe(interest).await {
-                        error!("handle_client_interest: Unsubscribe failed: {}", err);
-                    }
-                }
-            }
-            Action::NoNewClients => {
-                debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
-                if clients.is_empty() {
-                    return Ok(());
-                }
-            }
-            Action::ConnectionClosed => {
-                debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
-                return Ok(());
-            }
-        }
-    }
-}
+use crate::pubsub::handle_client_interest_subscriptions;
+use crate::server::Server;
 
 async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> {
     signals.next().await;
@@ -197,7 +63,7 @@ async fn main() -> Result<(), Error> {
     let signals = Signals::new(&[SIGINT])?;
     let signal_handler = handle_signals(signals);
 
-    let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
+    let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
     let handle_new_games = spawn_dealers(app.state().clone());
 
     /*let listener = TlsListener::build()
diff --git a/src/pubsub.rs b/src/pubsub.rs
new file mode 100644 (file)
index 0000000..196d614
--- /dev/null
@@ -0,0 +1,263 @@
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::mem::swap;
+
+use futures::{
+    channel::mpsc::{channel, Receiver, Sender},
+    select,
+    stream::FuturesUnordered,
+    FutureExt, SinkExt, StreamExt,
+};
+use redis::{aio::PubSub, Msg, RedisWrite, ToRedisArgs};
+
+use crate::username::Username;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum ClientInterest {
+    GameList,
+    Game { id: i64 },
+    User { username: Username },
+    Timeout { id: i64 },
+}
+
+#[derive(Debug)]
+pub struct ClientInterestSender {
+    pub register_interest: Receiver<HashSet<ClientInterest>>,
+    pub interest: Sender<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestReceiver {
+    pub register_interest: Sender<HashSet<ClientInterest>>,
+    pub interest: Receiver<ClientInterest>,
+}
+
+pub fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
+    const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
+    const INTEREST_CHANNEL_BUFFER: usize = 1024;
+    let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
+    let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
+    let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx };
+    let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx };
+    (tx, rx)
+}
+
+#[derive(Debug)]
+struct Client {
+    sender: ClientInterestSender,
+    interests: HashSet<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub enum ClientInterestFromMsgError {
+    InvalidChannelName { channel_name: String },
+    TimeoutMessageWasNotExpired,
+}
+
+impl TryFrom<Msg> for ClientInterest {
+    type Error = ClientInterestFromMsgError;
+    fn try_from(msg: Msg) -> Result<Self, Self::Error> {
+        let channel_name = msg.get_channel_name();
+        if channel_name == "__keyspace@0__:game:list" {
+            Ok(ClientInterest::GameList)
+        } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
+            match username.parse() {
+                Ok(username) => Ok(ClientInterest::User { username }),
+                Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+            }
+        } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
+            if let Some(id) = key.strip_suffix(":actions") {
+                match id.parse() {
+                    Ok(id) => Ok(ClientInterest::Game { id }),
+                    Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+                }
+            } else if let Some(id) = key.strip_suffix(":timeout") {
+                match msg.get_payload::<String>() {
+                    Ok(str) if str == "expired" => match id.parse() {
+                        Ok(id) => Ok(ClientInterest::Timeout { id }),
+                        Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+                    },
+                    _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
+                }
+            } else {
+                Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
+            }
+        } else {
+            Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
+        }
+    }
+}
+
+impl ToRedisArgs for ClientInterest {
+    fn write_redis_args<W>(&self, out: &mut W)
+    where
+        W: ?Sized + RedisWrite,
+    {
+        match self {
+            ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
+            ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
+            ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+            ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
+        }
+    }
+}
+
+pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
+    debug!("handle_client_interest: init");
+    let mut clients: Vec<Client> = Vec::new();
+    loop {
+        enum Action {
+            AddClient { sender: ClientInterestSender },
+            RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
+            SendInterest { interest: Msg },
+            RemoveClient { index: usize },
+            NoNewClients,
+            ConnectionClosed,
+        }
+        match {
+            let mut connection_on_message = connection.on_message();
+            let mut next_interest = connection_on_message.next().fuse();
+            let mut next_client = new_clients.next().fuse();
+            let mut next_client_interest = FuturesUnordered::new();
+            for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() {
+                next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
+            }
+            let mut next_client_interest = next_client_interest.select_next_some();
+            let action;
+            select! {
+                interest = next_interest => {
+                    if let Some(interest) = interest {
+                        action = Action::SendInterest{interest};
+                    } else {
+                        action = Action::ConnectionClosed;
+                    }
+                }
+                sender = next_client => {
+                    if let Some(sender) = sender {
+                        action = Action::AddClient{sender};
+                    } else {
+                        action = Action::NoNewClients;
+                    }
+                }
+                client_interests = next_client_interest => {
+                    match client_interests {
+                        (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
+                        (index, None) => action = Action::RemoveClient{index},
+                    }
+                }
+            }
+            action
+        } {
+            Action::AddClient { sender } => {
+                debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len());
+                clients.push(Client { sender, interests: HashSet::new() });
+            }
+            Action::RegisterInterest { index, mut client_interests } => {
+                debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
+                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+                swap(&mut clients[index].interests, &mut client_interests);
+                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+                for interest in &interests_after - &interests_prior {
+                    debug!("handle_client_interest: Subscribing to {:?}", interest);
+                    if let Err(err) = connection.subscribe(interest).await {
+                        error!("handle_client_interest: Subscribe failed: {}", err);
+                    }
+                }
+                for interest in &interests_prior - &interests_after {
+                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+                    if let Err(err) = connection.unsubscribe(interest).await {
+                        error!("handle_client_interest: Unsubscribe failed: {}", err);
+                    }
+                }
+                let client = &mut clients[index];
+                let sender = &mut client.sender;
+                for interest in &client.interests - &client_interests {
+                    debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index);
+                    if let Err(err) = sender.interest.send(interest.clone()).await {
+                        error!("handle_client_interest: Send failed: {}", err);
+                    }
+                }
+            }
+            Action::SendInterest { interest } => {
+                debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
+                match TryFrom::try_from(interest) {
+                    Ok(interest) => {
+                        for (index, Client { sender, interests }) in clients.iter_mut().enumerate() {
+                            if interests.contains(&interest) {
+                                debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
+                                if let Err(err) = sender.interest.send(interest.clone()).await {
+                                    error!("handle_client_interest: Send failed: {}", err);
+                                }
+                            }
+                        }
+                    }
+                    Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}
+                    Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => {
+                        error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
+                    }
+                }
+            }
+            Action::RemoveClient { index } => {
+                debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
+                let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+                clients.remove(index);
+                let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+                for interest in &interests_prior - &interests_after {
+                    debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+                    if let Err(err) = connection.unsubscribe(interest).await {
+                        error!("handle_client_interest: Unsubscribe failed: {}", err);
+                    }
+                }
+            }
+            Action::NoNewClients => {
+                debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
+                if clients.is_empty() {
+                    return Ok(());
+                }
+            }
+            Action::ConnectionClosed => {
+                debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
+                return Ok(());
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use redis::{Msg, Value};
+
+    #[test]
+    fn convert_channel_names_to_interest() {
+        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::GameList, interest);
+
+        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest);
+
+        let msg =
+            Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::Game { id: 12345 }, interest);
+
+        let msg =
+            Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]);
+        let msg = Msg::from_value(&msg).unwrap();
+        let interest = ClientInterest::try_from(msg).unwrap();
+        assert_eq!(ClientInterest::Timeout { id: 12345 }, interest);
+    }
+
+    #[test]
+    fn ignores_timeout_key_set() {
+        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]);
+        let msg = Msg::from_value(&msg).unwrap();
+        assert!(ClientInterest::try_from(msg).is_err());
+    }
+}
index ab076b6c2d6213bfaf4abe041d96457fca48ff38..b51f19638e7407fae7f949dd25591d7420662c7c 100644 (file)
@@ -1,16 +1,15 @@
 use std::collections::HashSet;
-use std::convert::TryFrom;
 
 use futures::{
-    channel::mpsc::{channel, Receiver, Sender},
+    channel::mpsc::{Receiver, Sender},
     SinkExt,
 };
-use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value};
+use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value};
 use serde::{Deserialize, Serialize};
 
 use crate::auth::Auth;
-use crate::client::ClientInterest;
 use crate::game::{GameSettings, GameSummary, ValidatedUserAction};
+use crate::pubsub::{client_interest_channel, ClientInterest, ClientInterestReceiver, ClientInterestSender};
 use crate::rng::Seed;
 use crate::username::Username;
 use crate::util::timestamp::Timestamp;
@@ -21,87 +20,11 @@ pub struct Server {
     register_update_stream: Sender<ClientInterestSender>,
 }
 
-#[derive(Debug)]
-pub struct ClientInterestSender {
-    pub register_interest: Receiver<HashSet<ClientInterest>>,
-    pub interest: Sender<ClientInterest>,
-}
-
-#[derive(Debug)]
-pub struct ClientInterestReceiver {
-    pub register_interest: Sender<HashSet<ClientInterest>>,
-    pub interest: Receiver<ClientInterest>,
-}
-
-fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
-    const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
-    const INTEREST_CHANNEL_BUFFER: usize = 1024;
-    let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
-    let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
-    let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx };
-    let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx };
-    (tx, rx)
-}
-
-#[derive(Debug)]
-pub enum ClientInterestFromMsgError {
-    InvalidChannelName { channel_name: String },
-    TimeoutMessageWasNotExpired,
-}
-
-impl TryFrom<Msg> for ClientInterest {
-    type Error = ClientInterestFromMsgError;
-    fn try_from(msg: Msg) -> Result<Self, Self::Error> {
-        let channel_name = msg.get_channel_name();
-        if channel_name == "__keyspace@0__:game:list" {
-            Ok(ClientInterest::GameList)
-        } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
-            match username.parse() {
-                Ok(username) => Ok(ClientInterest::User { username }),
-                Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
-            }
-        } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
-            if let Some(id) = key.strip_suffix(":actions") {
-                match id.parse() {
-                    Ok(id) => Ok(ClientInterest::Game { id }),
-                    Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
-                }
-            } else if let Some(id) = key.strip_suffix(":timeout") {
-                match msg.get_payload::<String>() {
-                    Ok(str) if str == "expired" => match id.parse() {
-                        Ok(id) => Ok(ClientInterest::Timeout { id }),
-                        Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
-                    },
-                    _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
-                }
-            } else {
-                Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
-            }
-        } else {
-            Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
-        }
-    }
-}
-
-impl ToRedisArgs for ClientInterest {
-    fn write_redis_args<W>(&self, out: &mut W)
-    where
-        W: ?Sized + RedisWrite,
-    {
-        match self {
-            ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
-            ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
-            ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
-            ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
-        }
-    }
-}
-
 const TAKE_ACTION_LUA_SCRIPT: &str = r#"
-    local len = redis.call('llen', KEYS[1])
+    local len = redis.call('LLEN', KEYS[1])
     local expected = tonumber(ARGV[1])
     if (len == expected) then
-        return redis.call('rpush', KEYS[1], ARGV[2])
+        return redis.call('RPUSH', KEYS[1], ARGV[2])
     elseif (len > expected) then
         return 0
     else
@@ -300,42 +223,3 @@ where
         true
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    use redis::{Msg, Value};
-
-    #[test]
-    fn convert_channel_names_to_interest() {
-        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]);
-        let msg = Msg::from_value(&msg).unwrap();
-        let interest = ClientInterest::try_from(msg).unwrap();
-        assert_eq!(ClientInterest::GameList, interest);
-
-        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]);
-        let msg = Msg::from_value(&msg).unwrap();
-        let interest = ClientInterest::try_from(msg).unwrap();
-        assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest);
-
-        let msg =
-            Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]);
-        let msg = Msg::from_value(&msg).unwrap();
-        let interest = ClientInterest::try_from(msg).unwrap();
-        assert_eq!(ClientInterest::Game { id: 12345 }, interest);
-
-        let msg =
-            Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]);
-        let msg = Msg::from_value(&msg).unwrap();
-        let interest = ClientInterest::try_from(msg).unwrap();
-        assert_eq!(ClientInterest::Timeout { id: 12345 }, interest);
-    }
-
-    #[test]
-    fn ignores_timeout_key_set() {
-        let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]);
-        let msg = Msg::from_value(&msg).unwrap();
-        assert!(ClientInterest::try_from(msg).is_err());
-    }
-}