use crate::api::{ClientMessage, ServerMessage};
use crate::game::{Game, GameList, UserAction};
+use crate::pubsub::ClientInterest;
use crate::server::{ActionStatus, Server, ServerState};
use crate::username::Username;
use crate::util::dedup::DedupReadyExt;
LoggedIn { username: Username, state: LoggedInState },
}
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum ClientInterest {
- GameList,
- Game { id: i64 },
- User { username: Username },
- Timeout { id: i64 },
-}
-
#[derive(Debug, Clone)]
enum LoggedInState {
Idle,
use futures::{channel::mpsc::Receiver, StreamExt};
use redis::{ErrorKind, RedisError, RedisResult};
-use crate::client::ClientInterest;
use crate::game::{DealerAction, Game, GameList, ValidatedUserAction};
+use crate::pubsub::ClientInterest;
use crate::server::{ActionStatus, Server, ServerState};
use crate::util::dedup::DedupReadyExt;
#[macro_use]
extern crate serde_derive;
-use std::collections::HashSet;
-use std::convert::TryFrom;
-use std::mem::swap;
-
-use futures::{
- channel::mpsc::{channel, Receiver},
- future::select,
- pin_mut, select,
- stream::FuturesUnordered,
- FutureExt, SinkExt, StreamExt,
-};
-use redis::{aio::PubSub, Client, Msg};
+use futures::{channel::mpsc::channel, future::select, pin_mut, FutureExt, StreamExt};
+use redis::Client;
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;
use tide::utils::After;
mod client;
mod dealer;
mod game;
+mod pubsub;
mod rng;
mod seats;
mod server;
mod username;
mod util;
-use crate::client::{new_client, ClientInterest};
+use crate::client::new_client;
use crate::dealer::spawn_dealers;
-use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server};
-
-pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
- #[derive(Debug)]
- struct Client {
- sender: ClientInterestSender,
- interests: HashSet<ClientInterest>,
- }
- debug!("handle_client_interest: init");
- let mut clients: Vec<Client> = Vec::new();
- loop {
- enum Action {
- AddClient { sender: ClientInterestSender },
- RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
- SendInterest { interest: Msg },
- RemoveClient { index: usize },
- NoNewClients,
- ConnectionClosed,
- }
- match {
- let mut connection_on_message = connection.on_message();
- let mut next_interest = connection_on_message.next().fuse();
- let mut next_client = new_clients.next().fuse();
- let mut next_client_interest = FuturesUnordered::new();
- for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() {
- next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
- }
- let mut next_client_interest = next_client_interest.select_next_some();
- let action;
- select! {
- interest = next_interest => {
- if let Some(interest) = interest {
- action = Action::SendInterest{interest};
- } else {
- action = Action::ConnectionClosed;
- }
- }
- sender = next_client => {
- if let Some(sender) = sender {
- action = Action::AddClient{sender};
- } else {
- action = Action::NoNewClients;
- }
- }
- client_interests = next_client_interest => {
- match client_interests {
- (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
- (index, None) => action = Action::RemoveClient{index},
- }
- }
- }
- action
- } {
- Action::AddClient { sender } => {
- debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len());
- clients.push(Client { sender, interests: HashSet::new() });
- }
- Action::RegisterInterest { index, mut client_interests } => {
- debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
- let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
- swap(&mut clients[index].interests, &mut client_interests);
- let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
- for interest in &interests_after - &interests_prior {
- debug!("handle_client_interest: Subscribing to {:?}", interest);
- if let Err(err) = connection.subscribe(interest).await {
- error!("handle_client_interest: Subscribe failed: {}", err);
- }
- }
- for interest in &interests_prior - &interests_after {
- debug!("handle_client_interest: Unsubscribing from {:?}", interest);
- if let Err(err) = connection.unsubscribe(interest).await {
- error!("handle_client_interest: Unsubscribe failed: {}", err);
- }
- }
- let client = &mut clients[index];
- let sender = &mut client.sender;
- for interest in &client.interests - &client_interests {
- debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index);
- if let Err(err) = sender.interest.send(interest.clone()).await {
- error!("handle_client_interest: Send failed: {}", err);
- }
- }
- }
- Action::SendInterest { interest } => {
- debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
- match TryFrom::try_from(interest) {
- Ok(interest) => {
- for (index, Client { sender, interests }) in clients.iter_mut().enumerate() {
- if interests.contains(&interest) {
- debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
- if let Err(err) = sender.interest.send(interest.clone()).await {
- error!("handle_client_interest: Send failed: {}", err);
- }
- }
- }
- }
- Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}
- Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => {
- error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
- }
- }
- }
- Action::RemoveClient { index } => {
- debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
- let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
- clients.remove(index);
- let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
- for interest in &interests_prior - &interests_after {
- debug!("handle_client_interest: Unsubscribing from {:?}", interest);
- if let Err(err) = connection.unsubscribe(interest).await {
- error!("handle_client_interest: Unsubscribe failed: {}", err);
- }
- }
- }
- Action::NoNewClients => {
- debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
- if clients.is_empty() {
- return Ok(());
- }
- }
- Action::ConnectionClosed => {
- debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
- return Ok(());
- }
- }
- }
-}
+use crate::pubsub::handle_client_interest_subscriptions;
+use crate::server::Server;
async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> {
signals.next().await;
let signals = Signals::new(&[SIGINT])?;
let signal_handler = handle_signals(signals);
- let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
+ let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
let handle_new_games = spawn_dealers(app.state().clone());
/*let listener = TlsListener::build()
--- /dev/null
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::mem::swap;
+
+use futures::{
+ channel::mpsc::{channel, Receiver, Sender},
+ select,
+ stream::FuturesUnordered,
+ FutureExt, SinkExt, StreamExt,
+};
+use redis::{aio::PubSub, Msg, RedisWrite, ToRedisArgs};
+
+use crate::username::Username;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum ClientInterest {
+ GameList,
+ Game { id: i64 },
+ User { username: Username },
+ Timeout { id: i64 },
+}
+
+#[derive(Debug)]
+pub struct ClientInterestSender {
+ pub register_interest: Receiver<HashSet<ClientInterest>>,
+ pub interest: Sender<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestReceiver {
+ pub register_interest: Sender<HashSet<ClientInterest>>,
+ pub interest: Receiver<ClientInterest>,
+}
+
+pub fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
+ const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
+ const INTEREST_CHANNEL_BUFFER: usize = 1024;
+ let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
+ let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
+ let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx };
+ let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx };
+ (tx, rx)
+}
+
+#[derive(Debug)]
+struct Client {
+ sender: ClientInterestSender,
+ interests: HashSet<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub enum ClientInterestFromMsgError {
+ InvalidChannelName { channel_name: String },
+ TimeoutMessageWasNotExpired,
+}
+
+impl TryFrom<Msg> for ClientInterest {
+ type Error = ClientInterestFromMsgError;
+ fn try_from(msg: Msg) -> Result<Self, Self::Error> {
+ let channel_name = msg.get_channel_name();
+ if channel_name == "__keyspace@0__:game:list" {
+ Ok(ClientInterest::GameList)
+ } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
+ match username.parse() {
+ Ok(username) => Ok(ClientInterest::User { username }),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+ }
+ } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
+ if let Some(id) = key.strip_suffix(":actions") {
+ match id.parse() {
+ Ok(id) => Ok(ClientInterest::Game { id }),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+ }
+ } else if let Some(id) = key.strip_suffix(":timeout") {
+ match msg.get_payload::<String>() {
+ Ok(str) if str == "expired" => match id.parse() {
+ Ok(id) => Ok(ClientInterest::Timeout { id }),
+ Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
+ },
+ _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
+ }
+ } else {
+ Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
+ }
+ } else {
+ Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
+ }
+ }
+}
+
+impl ToRedisArgs for ClientInterest {
+ fn write_redis_args<W>(&self, out: &mut W)
+ where
+ W: ?Sized + RedisWrite,
+ {
+ match self {
+ ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
+ ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
+ ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+ ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
+ }
+ }
+}
+
+pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
+ debug!("handle_client_interest: init");
+ let mut clients: Vec<Client> = Vec::new();
+ loop {
+ enum Action {
+ AddClient { sender: ClientInterestSender },
+ RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
+ SendInterest { interest: Msg },
+ RemoveClient { index: usize },
+ NoNewClients,
+ ConnectionClosed,
+ }
+ match {
+ let mut connection_on_message = connection.on_message();
+ let mut next_interest = connection_on_message.next().fuse();
+ let mut next_client = new_clients.next().fuse();
+ let mut next_client_interest = FuturesUnordered::new();
+ for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() {
+ next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
+ }
+ let mut next_client_interest = next_client_interest.select_next_some();
+ let action;
+ select! {
+ interest = next_interest => {
+ if let Some(interest) = interest {
+ action = Action::SendInterest{interest};
+ } else {
+ action = Action::ConnectionClosed;
+ }
+ }
+ sender = next_client => {
+ if let Some(sender) = sender {
+ action = Action::AddClient{sender};
+ } else {
+ action = Action::NoNewClients;
+ }
+ }
+ client_interests = next_client_interest => {
+ match client_interests {
+ (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
+ (index, None) => action = Action::RemoveClient{index},
+ }
+ }
+ }
+ action
+ } {
+ Action::AddClient { sender } => {
+ debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len());
+ clients.push(Client { sender, interests: HashSet::new() });
+ }
+ Action::RegisterInterest { index, mut client_interests } => {
+ debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
+ let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+ swap(&mut clients[index].interests, &mut client_interests);
+ let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+ for interest in &interests_after - &interests_prior {
+ debug!("handle_client_interest: Subscribing to {:?}", interest);
+ if let Err(err) = connection.subscribe(interest).await {
+ error!("handle_client_interest: Subscribe failed: {}", err);
+ }
+ }
+ for interest in &interests_prior - &interests_after {
+ debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ if let Err(err) = connection.unsubscribe(interest).await {
+ error!("handle_client_interest: Unsubscribe failed: {}", err);
+ }
+ }
+ let client = &mut clients[index];
+ let sender = &mut client.sender;
+ for interest in &client.interests - &client_interests {
+ debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index);
+ if let Err(err) = sender.interest.send(interest.clone()).await {
+ error!("handle_client_interest: Send failed: {}", err);
+ }
+ }
+ }
+ Action::SendInterest { interest } => {
+ debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
+ match TryFrom::try_from(interest) {
+ Ok(interest) => {
+ for (index, Client { sender, interests }) in clients.iter_mut().enumerate() {
+ if interests.contains(&interest) {
+ debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
+ if let Err(err) = sender.interest.send(interest.clone()).await {
+ error!("handle_client_interest: Send failed: {}", err);
+ }
+ }
+ }
+ }
+ Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}
+ Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => {
+ error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
+ }
+ }
+ }
+ Action::RemoveClient { index } => {
+ debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
+ let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+ clients.remove(index);
+ let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
+ for interest in &interests_prior - &interests_after {
+ debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ if let Err(err) = connection.unsubscribe(interest).await {
+ error!("handle_client_interest: Unsubscribe failed: {}", err);
+ }
+ }
+ }
+ Action::NoNewClients => {
+ debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
+ if clients.is_empty() {
+ return Ok(());
+ }
+ }
+ Action::ConnectionClosed => {
+ debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
+ return Ok(());
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use redis::{Msg, Value};
+
+ #[test]
+ fn convert_channel_names_to_interest() {
+ let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::GameList, interest);
+
+ let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest);
+
+ let msg =
+ Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::Game { id: 12345 }, interest);
+
+ let msg =
+ Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]);
+ let msg = Msg::from_value(&msg).unwrap();
+ let interest = ClientInterest::try_from(msg).unwrap();
+ assert_eq!(ClientInterest::Timeout { id: 12345 }, interest);
+ }
+
+ #[test]
+ fn ignores_timeout_key_set() {
+ let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]);
+ let msg = Msg::from_value(&msg).unwrap();
+ assert!(ClientInterest::try_from(msg).is_err());
+ }
+}
use std::collections::HashSet;
-use std::convert::TryFrom;
use futures::{
- channel::mpsc::{channel, Receiver, Sender},
+ channel::mpsc::{Receiver, Sender},
SinkExt,
};
-use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value};
+use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value};
use serde::{Deserialize, Serialize};
use crate::auth::Auth;
-use crate::client::ClientInterest;
use crate::game::{GameSettings, GameSummary, ValidatedUserAction};
+use crate::pubsub::{client_interest_channel, ClientInterest, ClientInterestReceiver, ClientInterestSender};
use crate::rng::Seed;
use crate::username::Username;
use crate::util::timestamp::Timestamp;
register_update_stream: Sender<ClientInterestSender>,
}
-#[derive(Debug)]
-pub struct ClientInterestSender {
- pub register_interest: Receiver<HashSet<ClientInterest>>,
- pub interest: Sender<ClientInterest>,
-}
-
-#[derive(Debug)]
-pub struct ClientInterestReceiver {
- pub register_interest: Sender<HashSet<ClientInterest>>,
- pub interest: Receiver<ClientInterest>,
-}
-
-fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
- const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
- const INTEREST_CHANNEL_BUFFER: usize = 1024;
- let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
- let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
- let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx };
- let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx };
- (tx, rx)
-}
-
-#[derive(Debug)]
-pub enum ClientInterestFromMsgError {
- InvalidChannelName { channel_name: String },
- TimeoutMessageWasNotExpired,
-}
-
-impl TryFrom<Msg> for ClientInterest {
- type Error = ClientInterestFromMsgError;
- fn try_from(msg: Msg) -> Result<Self, Self::Error> {
- let channel_name = msg.get_channel_name();
- if channel_name == "__keyspace@0__:game:list" {
- Ok(ClientInterest::GameList)
- } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
- match username.parse() {
- Ok(username) => Ok(ClientInterest::User { username }),
- Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
- }
- } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") {
- if let Some(id) = key.strip_suffix(":actions") {
- match id.parse() {
- Ok(id) => Ok(ClientInterest::Game { id }),
- Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
- }
- } else if let Some(id) = key.strip_suffix(":timeout") {
- match msg.get_payload::<String>() {
- Ok(str) if str == "expired" => match id.parse() {
- Ok(id) => Ok(ClientInterest::Timeout { id }),
- Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }),
- },
- _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired),
- }
- } else {
- Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
- }
- } else {
- Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() })
- }
- }
-}
-
-impl ToRedisArgs for ClientInterest {
- fn write_redis_args<W>(&self, out: &mut W)
- where
- W: ?Sized + RedisWrite,
- {
- match self {
- ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"),
- ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)),
- ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
- ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)),
- }
- }
-}
-
const TAKE_ACTION_LUA_SCRIPT: &str = r#"
- local len = redis.call('llen', KEYS[1])
+ local len = redis.call('LLEN', KEYS[1])
local expected = tonumber(ARGV[1])
if (len == expected) then
- return redis.call('rpush', KEYS[1], ARGV[2])
+ return redis.call('RPUSH', KEYS[1], ARGV[2])
elseif (len > expected) then
return 0
else
true
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use redis::{Msg, Value};
-
- #[test]
- fn convert_channel_names_to_interest() {
- let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]);
- let msg = Msg::from_value(&msg).unwrap();
- let interest = ClientInterest::try_from(msg).unwrap();
- assert_eq!(ClientInterest::GameList, interest);
-
- let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]);
- let msg = Msg::from_value(&msg).unwrap();
- let interest = ClientInterest::try_from(msg).unwrap();
- assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest);
-
- let msg =
- Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]);
- let msg = Msg::from_value(&msg).unwrap();
- let interest = ClientInterest::try_from(msg).unwrap();
- assert_eq!(ClientInterest::Game { id: 12345 }, interest);
-
- let msg =
- Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]);
- let msg = Msg::from_value(&msg).unwrap();
- let interest = ClientInterest::try_from(msg).unwrap();
- assert_eq!(ClientInterest::Timeout { id: 12345 }, interest);
- }
-
- #[test]
- fn ignores_timeout_key_set() {
- let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]);
- let msg = Msg::from_value(&msg).unwrap();
- assert!(ClientInterest::try_from(msg).is_err());
- }
-}