From: Geoffrey Allott Date: Sat, 6 Mar 2021 15:36:25 +0000 (+0000) Subject: move subscription functionality to pubsub module X-Git-Url: https://git.pointlesshacks.com/?a=commitdiff_plain;h=8920e1b7776a1cafd3e56df65fc597a29a0287f0;p=pokerwave.git move subscription functionality to pubsub module --- diff --git a/src/client.rs b/src/client.rs index f6523eb..bbe6cdf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,7 @@ use tide_websockets::{Message, WebSocketConnection}; use crate::api::{ClientMessage, ServerMessage}; use crate::game::{Game, GameList, UserAction}; +use crate::pubsub::ClientInterest; use crate::server::{ActionStatus, Server, ServerState}; use crate::username::Username; use crate::util::dedup::DedupReadyExt; @@ -26,14 +27,6 @@ enum ClientState { LoggedIn { username: Username, state: LoggedInState }, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum ClientInterest { - GameList, - Game { id: i64 }, - User { username: Username }, - Timeout { id: i64 }, -} - #[derive(Debug, Clone)] enum LoggedInState { Idle, diff --git a/src/dealer.rs b/src/dealer.rs index 98b66dc..e43997e 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -4,8 +4,8 @@ use async_std::task::spawn; use futures::{channel::mpsc::Receiver, StreamExt}; use redis::{ErrorKind, RedisError, RedisResult}; -use crate::client::ClientInterest; use crate::game::{DealerAction, Game, GameList, ValidatedUserAction}; +use crate::pubsub::ClientInterest; use crate::server::{ActionStatus, Server, ServerState}; use crate::util::dedup::DedupReadyExt; diff --git a/src/main.rs b/src/main.rs index 68bdd75..e251ad8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,18 +3,8 @@ extern crate log; #[macro_use] extern crate serde_derive; -use std::collections::HashSet; -use std::convert::TryFrom; -use std::mem::swap; - -use futures::{ - channel::mpsc::{channel, Receiver}, - future::select, - pin_mut, select, - stream::FuturesUnordered, - FutureExt, SinkExt, StreamExt, -}; -use redis::{aio::PubSub, Client, Msg}; +use futures::{channel::mpsc::channel, future::select, pin_mut, FutureExt, StreamExt}; +use redis::Client; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; use tide::utils::After; @@ -28,141 +18,17 @@ mod card; mod client; mod dealer; mod game; +mod pubsub; mod rng; mod seats; mod server; mod username; mod util; -use crate::client::{new_client, ClientInterest}; +use crate::client::new_client; use crate::dealer::spawn_dealers; -use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server}; - -pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { - #[derive(Debug)] - struct Client { - sender: ClientInterestSender, - interests: HashSet, - } - debug!("handle_client_interest: init"); - let mut clients: Vec = Vec::new(); - loop { - enum Action { - AddClient { sender: ClientInterestSender }, - RegisterInterest { index: usize, client_interests: HashSet }, - SendInterest { interest: Msg }, - RemoveClient { index: usize }, - NoNewClients, - ConnectionClosed, - } - match { - let mut connection_on_message = connection.on_message(); - let mut next_interest = connection_on_message.next().fuse(); - let mut next_client = new_clients.next().fuse(); - let mut next_client_interest = FuturesUnordered::new(); - for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() { - next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest))); - } - let mut next_client_interest = next_client_interest.select_next_some(); - let action; - select! { - interest = next_interest => { - if let Some(interest) = interest { - action = Action::SendInterest{interest}; - } else { - action = Action::ConnectionClosed; - } - } - sender = next_client => { - if let Some(sender) = sender { - action = Action::AddClient{sender}; - } else { - action = Action::NoNewClients; - } - } - client_interests = next_client_interest => { - match client_interests { - (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests}, - (index, None) => action = Action::RemoveClient{index}, - } - } - } - action - } { - Action::AddClient { sender } => { - debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len()); - clients.push(Client { sender, interests: HashSet::new() }); - } - Action::RegisterInterest { index, mut client_interests } => { - debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index); - let interests_prior: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); - swap(&mut clients[index].interests, &mut client_interests); - let interests_after: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); - for interest in &interests_after - &interests_prior { - debug!("handle_client_interest: Subscribing to {:?}", interest); - if let Err(err) = connection.subscribe(interest).await { - error!("handle_client_interest: Subscribe failed: {}", err); - } - } - for interest in &interests_prior - &interests_after { - debug!("handle_client_interest: Unsubscribing from {:?}", interest); - if let Err(err) = connection.unsubscribe(interest).await { - error!("handle_client_interest: Unsubscribe failed: {}", err); - } - } - let client = &mut clients[index]; - let sender = &mut client.sender; - for interest in &client.interests - &client_interests { - debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index); - if let Err(err) = sender.interest.send(interest.clone()).await { - error!("handle_client_interest: Send failed: {}", err); - } - } - } - Action::SendInterest { interest } => { - debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest); - match TryFrom::try_from(interest) { - Ok(interest) => { - for (index, Client { sender, interests }) in clients.iter_mut().enumerate() { - if interests.contains(&interest) { - debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index); - if let Err(err) = sender.interest.send(interest.clone()).await { - error!("handle_client_interest: Send failed: {}", err); - } - } - } - } - Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {} - Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => { - error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name); - } - } - } - Action::RemoveClient { index } => { - debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index); - let interests_prior: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); - clients.remove(index); - let interests_after: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); - for interest in &interests_prior - &interests_after { - debug!("handle_client_interest: Unsubscribing from {:?}", interest); - if let Err(err) = connection.unsubscribe(interest).await { - error!("handle_client_interest: Unsubscribe failed: {}", err); - } - } - } - Action::NoNewClients => { - debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients); - if clients.is_empty() { - return Ok(()); - } - } - Action::ConnectionClosed => { - debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients); - return Ok(()); - } - } - } -} +use crate::pubsub::handle_client_interest_subscriptions; +use crate::server::Server; async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> { signals.next().await; @@ -197,7 +63,7 @@ async fn main() -> Result<(), Error> { let signals = Signals::new(&[SIGINT])?; let signal_handler = handle_signals(signals); - let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx); + let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx); let handle_new_games = spawn_dealers(app.state().clone()); /*let listener = TlsListener::build() diff --git a/src/pubsub.rs b/src/pubsub.rs new file mode 100644 index 0000000..196d614 --- /dev/null +++ b/src/pubsub.rs @@ -0,0 +1,263 @@ +use std::collections::HashSet; +use std::convert::TryFrom; +use std::mem::swap; + +use futures::{ + channel::mpsc::{channel, Receiver, Sender}, + select, + stream::FuturesUnordered, + FutureExt, SinkExt, StreamExt, +}; +use redis::{aio::PubSub, Msg, RedisWrite, ToRedisArgs}; + +use crate::username::Username; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ClientInterest { + GameList, + Game { id: i64 }, + User { username: Username }, + Timeout { id: i64 }, +} + +#[derive(Debug)] +pub struct ClientInterestSender { + pub register_interest: Receiver>, + pub interest: Sender, +} + +#[derive(Debug)] +pub struct ClientInterestReceiver { + pub register_interest: Sender>, + pub interest: Receiver, +} + +pub fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) { + const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128; + const INTEREST_CHANNEL_BUFFER: usize = 1024; + let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER); + let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER); + let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx }; + let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx }; + (tx, rx) +} + +#[derive(Debug)] +struct Client { + sender: ClientInterestSender, + interests: HashSet, +} + +#[derive(Debug)] +pub enum ClientInterestFromMsgError { + InvalidChannelName { channel_name: String }, + TimeoutMessageWasNotExpired, +} + +impl TryFrom for ClientInterest { + type Error = ClientInterestFromMsgError; + fn try_from(msg: Msg) -> Result { + let channel_name = msg.get_channel_name(); + if channel_name == "__keyspace@0__:game:list" { + Ok(ClientInterest::GameList) + } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") { + match username.parse() { + Ok(username) => Ok(ClientInterest::User { username }), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), + } + } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") { + if let Some(id) = key.strip_suffix(":actions") { + match id.parse() { + Ok(id) => Ok(ClientInterest::Game { id }), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), + } + } else if let Some(id) = key.strip_suffix(":timeout") { + match msg.get_payload::() { + Ok(str) if str == "expired" => match id.parse() { + Ok(id) => Ok(ClientInterest::Timeout { id }), + Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), + }, + _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired), + } + } else { + Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }) + } + } else { + Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }) + } + } +} + +impl ToRedisArgs for ClientInterest { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + match self { + ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"), + ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)), + ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)), + ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)), + } + } +} + +pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { + debug!("handle_client_interest: init"); + let mut clients: Vec = Vec::new(); + loop { + enum Action { + AddClient { sender: ClientInterestSender }, + RegisterInterest { index: usize, client_interests: HashSet }, + SendInterest { interest: Msg }, + RemoveClient { index: usize }, + NoNewClients, + ConnectionClosed, + } + match { + let mut connection_on_message = connection.on_message(); + let mut next_interest = connection_on_message.next().fuse(); + let mut next_client = new_clients.next().fuse(); + let mut next_client_interest = FuturesUnordered::new(); + for (index, Client { ref mut sender, .. }) in clients.iter_mut().enumerate() { + next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest))); + } + let mut next_client_interest = next_client_interest.select_next_some(); + let action; + select! { + interest = next_interest => { + if let Some(interest) = interest { + action = Action::SendInterest{interest}; + } else { + action = Action::ConnectionClosed; + } + } + sender = next_client => { + if let Some(sender) = sender { + action = Action::AddClient{sender}; + } else { + action = Action::NoNewClients; + } + } + client_interests = next_client_interest => { + match client_interests { + (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests}, + (index, None) => action = Action::RemoveClient{index}, + } + } + } + action + } { + Action::AddClient { sender } => { + debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len()); + clients.push(Client { sender, interests: HashSet::new() }); + } + Action::RegisterInterest { index, mut client_interests } => { + debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index); + let interests_prior: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); + swap(&mut clients[index].interests, &mut client_interests); + let interests_after: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); + for interest in &interests_after - &interests_prior { + debug!("handle_client_interest: Subscribing to {:?}", interest); + if let Err(err) = connection.subscribe(interest).await { + error!("handle_client_interest: Subscribe failed: {}", err); + } + } + for interest in &interests_prior - &interests_after { + debug!("handle_client_interest: Unsubscribing from {:?}", interest); + if let Err(err) = connection.unsubscribe(interest).await { + error!("handle_client_interest: Unsubscribe failed: {}", err); + } + } + let client = &mut clients[index]; + let sender = &mut client.sender; + for interest in &client.interests - &client_interests { + debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index); + if let Err(err) = sender.interest.send(interest.clone()).await { + error!("handle_client_interest: Send failed: {}", err); + } + } + } + Action::SendInterest { interest } => { + debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest); + match TryFrom::try_from(interest) { + Ok(interest) => { + for (index, Client { sender, interests }) in clients.iter_mut().enumerate() { + if interests.contains(&interest) { + debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index); + if let Err(err) = sender.interest.send(interest.clone()).await { + error!("handle_client_interest: Send failed: {}", err); + } + } + } + } + Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {} + Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => { + error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name); + } + } + } + Action::RemoveClient { index } => { + debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index); + let interests_prior: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); + clients.remove(index); + let interests_after: HashSet = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect(); + for interest in &interests_prior - &interests_after { + debug!("handle_client_interest: Unsubscribing from {:?}", interest); + if let Err(err) = connection.unsubscribe(interest).await { + error!("handle_client_interest: Unsubscribe failed: {}", err); + } + } + } + Action::NoNewClients => { + debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients); + if clients.is_empty() { + return Ok(()); + } + } + Action::ConnectionClosed => { + debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients); + return Ok(()); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use redis::{Msg, Value}; + + #[test] + fn convert_channel_names_to_interest() { + let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::GameList, interest); + + let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest); + + let msg = + Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::Game { id: 12345 }, interest); + + let msg = + Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]); + let msg = Msg::from_value(&msg).unwrap(); + let interest = ClientInterest::try_from(msg).unwrap(); + assert_eq!(ClientInterest::Timeout { id: 12345 }, interest); + } + + #[test] + fn ignores_timeout_key_set() { + let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]); + let msg = Msg::from_value(&msg).unwrap(); + assert!(ClientInterest::try_from(msg).is_err()); + } +} diff --git a/src/server.rs b/src/server.rs index ab076b6..b51f196 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,15 @@ use std::collections::HashSet; -use std::convert::TryFrom; use futures::{ - channel::mpsc::{channel, Receiver, Sender}, + channel::mpsc::{Receiver, Sender}, SinkExt, }; -use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value}; +use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, Script, ToRedisArgs, Value}; use serde::{Deserialize, Serialize}; use crate::auth::Auth; -use crate::client::ClientInterest; use crate::game::{GameSettings, GameSummary, ValidatedUserAction}; +use crate::pubsub::{client_interest_channel, ClientInterest, ClientInterestReceiver, ClientInterestSender}; use crate::rng::Seed; use crate::username::Username; use crate::util::timestamp::Timestamp; @@ -21,87 +20,11 @@ pub struct Server { register_update_stream: Sender, } -#[derive(Debug)] -pub struct ClientInterestSender { - pub register_interest: Receiver>, - pub interest: Sender, -} - -#[derive(Debug)] -pub struct ClientInterestReceiver { - pub register_interest: Sender>, - pub interest: Receiver, -} - -fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) { - const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128; - const INTEREST_CHANNEL_BUFFER: usize = 1024; - let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER); - let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER); - let tx = ClientInterestSender { register_interest: register_interest_rx, interest: interest_tx }; - let rx = ClientInterestReceiver { register_interest: register_interest_tx, interest: interest_rx }; - (tx, rx) -} - -#[derive(Debug)] -pub enum ClientInterestFromMsgError { - InvalidChannelName { channel_name: String }, - TimeoutMessageWasNotExpired, -} - -impl TryFrom for ClientInterest { - type Error = ClientInterestFromMsgError; - fn try_from(msg: Msg) -> Result { - let channel_name = msg.get_channel_name(); - if channel_name == "__keyspace@0__:game:list" { - Ok(ClientInterest::GameList) - } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") { - match username.parse() { - Ok(username) => Ok(ClientInterest::User { username }), - Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), - } - } else if let Some(key) = channel_name.strip_prefix("__keyspace@0__:game:") { - if let Some(id) = key.strip_suffix(":actions") { - match id.parse() { - Ok(id) => Ok(ClientInterest::Game { id }), - Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), - } - } else if let Some(id) = key.strip_suffix(":timeout") { - match msg.get_payload::() { - Ok(str) if str == "expired" => match id.parse() { - Ok(id) => Ok(ClientInterest::Timeout { id }), - Err(_) => Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }), - }, - _ => Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired), - } - } else { - Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }) - } - } else { - Err(ClientInterestFromMsgError::InvalidChannelName { channel_name: channel_name.to_string() }) - } - } -} - -impl ToRedisArgs for ClientInterest { - fn write_redis_args(&self, out: &mut W) - where - W: ?Sized + RedisWrite, - { - match self { - ClientInterest::GameList => out.write_arg(b"__keyspace@0__:game:list"), - ClientInterest::Game { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:actions", id)), - ClientInterest::User { username } => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)), - ClientInterest::Timeout { id } => out.write_arg_fmt(format!("__keyspace@0__:game:{}:timeout", id)), - } - } -} - const TAKE_ACTION_LUA_SCRIPT: &str = r#" - local len = redis.call('llen', KEYS[1]) + local len = redis.call('LLEN', KEYS[1]) local expected = tonumber(ARGV[1]) if (len == expected) then - return redis.call('rpush', KEYS[1], ARGV[2]) + return redis.call('RPUSH', KEYS[1], ARGV[2]) elseif (len > expected) then return 0 else @@ -300,42 +223,3 @@ where true } } - -#[cfg(test)] -mod tests { - use super::*; - - use redis::{Msg, Value}; - - #[test] - fn convert_channel_names_to_interest() { - let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:list".to_vec()), Value::Data(b"expire".to_vec())]); - let msg = Msg::from_value(&msg).unwrap(); - let interest = ClientInterest::try_from(msg).unwrap(); - assert_eq!(ClientInterest::GameList, interest); - - let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:user:player1".to_vec()), Value::Data(b"set".to_vec())]); - let msg = Msg::from_value(&msg).unwrap(); - let interest = ClientInterest::try_from(msg).unwrap(); - assert_eq!(ClientInterest::User { username: "player1".parse().unwrap() }, interest); - - let msg = - Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:actions".to_vec()), Value::Data(b"rpush".to_vec())]); - let msg = Msg::from_value(&msg).unwrap(); - let interest = ClientInterest::try_from(msg).unwrap(); - assert_eq!(ClientInterest::Game { id: 12345 }, interest); - - let msg = - Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"expired".to_vec())]); - let msg = Msg::from_value(&msg).unwrap(); - let interest = ClientInterest::try_from(msg).unwrap(); - assert_eq!(ClientInterest::Timeout { id: 12345 }, interest); - } - - #[test] - fn ignores_timeout_key_set() { - let msg = Value::Bulk(vec![Value::Data(b"message".to_vec()), Value::Data(b"__keyspace@0__:game:12345:timeout".to_vec()), Value::Data(b"set".to_vec())]); - let msg = Msg::from_value(&msg).unwrap(); - assert!(ClientInterest::try_from(msg).is_err()); - } -}