use crate::game::{Action, Game, GameSettings, GameSummary, UserAction};
#[derive(Debug, Clone, Deserialize)]
-enum Scope {
+pub enum Scope {
Global,
Game { id: u32 },
Hand { id: u32 },
}
impl Auth {
- pub fn verify(&self, challenge: &str, signature: &str) -> bool {
+ pub fn verify(&self, _challenge: &str, signature: &str) -> bool {
match self {
Auth::NoLogin => false,
Auth::Plain{password} => signature == password,
-use std::sync::{Arc, RwLock};
+use std::collections::HashSet;
use futures::stream::{Stream, StreamExt, empty, iter, once};
},
}
-pub enum ClientUpdate {
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum ClientInterest {
GameList,
- Game,
- Users,
+ Game { id: u32 },
+ User { username: String },
}
#[derive(Debug, Clone)]
}
}
- pub async fn retrieve_updates(&mut self, update: ClientUpdate) -> impl Stream<Item=ServerMessage> {
+ pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item=ServerMessage> {
match update {
- ClientUpdate::GameList => empty().boxed(), // TODO
- ClientUpdate::Game => match &mut self.client {
- ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} => {
+ ClientInterest::GameList => empty().boxed(), // TODO
+ ClientInterest::Game{id} => match &mut self.client {
+ ClientState::LoggedIn{ref username, state: LoggedInState::InGame{ref mut game}} if game.id() == id => {
let from = game.actions_len();
match self.server.update_game_state(game).await {
Ok(()) => iter(game.update_view_for(username, from)).map(|action| ServerMessage::NewAction{action}).boxed(),
}
_ => empty().boxed(),
}
- ClientUpdate::Users => empty().boxed(), // TODO
+ ClientInterest::User{..} => empty().boxed(), // TODO
}
}
+ pub fn interests(&self) -> HashSet<ClientInterest> {
+ let mut ret = HashSet::new();
+ if let ClientState::LoggedIn{ref username, ref state} = &self.client {
+ let username = username.to_string();
+ ret.insert(ClientInterest::User{username});
+ ret.insert(ClientInterest::GameList);
+ if let LoggedInState::InGame{ref game} = state {
+ ret.insert(ClientInterest::Game{id: game.id()});
+ for username in game.players() {
+ let username = username.to_string();
+ ret.insert(ClientInterest::User{username});
+ }
+ }
+ }
+ ret
+ }
+
pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
match (&mut self.client, message) {
(_, ClientMessage::CreateUser{username, auth, nickname}) => {
(ClientState::LoginAuthIssued{username, challenge}, ClientMessage::LoginAuthResponse{signature}) => {
if self.server.verify(&username, &challenge, &signature).await {
self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby};
+ self.server.register_interests(self.interests()).await;
ServerMessage::LoginSuccess
} else {
self.client = ClientState::Connected;
ServerMessage::LoginFailure{reason: "Invalid username or password".to_string()}
}
}
- (ClientState::LoggedIn{username, ..}, ClientMessage::GetGameList{tags}) => {
+ (ClientState::LoggedIn{..}, ClientMessage::GetGameList{..}) => {
match self.server.get_game_list().await {
Ok(games) => ServerMessage::GameList{games},
Err(err) => ServerMessage::GameListFailure{reason: err.to_string()},
}
}
- (ClientState::LoggedIn{username, ..}, ClientMessage::CreateGame{settings}) => {
+ (ClientState::LoggedIn{..}, ClientMessage::CreateGame{settings}) => {
match self.server.create_game(settings).await {
Ok(id) => ServerMessage::CreateGameSuccess{id},
Err(err) => ServerMessage::CreateGameFailure{reason: err.to_string()},
Ok(game) => {
let game_view = game.view_for(&username);
self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InGame{game}};
+ self.server.register_interests(self.interests()).await;
ServerMessage::JoinGameSuccess{game: game_view}
}
Err(err) => ServerMessage::JoinGameFailure{reason: err.to_string()},
Err(err) => return ServerMessage::TakeActionFailure{reason: err.to_string()},
}
Ok(ActionStatus::Interrupted) => {
+ debug!("Action {:?} was interrupted - updating game state", action);
if let Err(err) = self.server.update_game_state(game).await {
return ServerMessage::TakeActionFailure{reason: err.to_string()};
}
}
(ClientState::LoggedIn{username, state: LoggedInState::InGame{..}}, ClientMessage::LeaveGame) => {
self.client = ClientState::LoggedIn{username: username.clone(), state: LoggedInState::InLobby};
+ self.server.register_interests(self.interests()).await;
ServerMessage::LeaveGameSuccess
}
- (ClientState::LoggedIn{username, ..}, ClientMessage::Logout) => {
+ (ClientState::LoggedIn{..}, ClientMessage::Logout) => {
self.client = ClientState::Connected;
+ self.server.register_interests(self.interests()).await;
ServerMessage::LogoutSuccess
}
(ClientState::LoggedIn{username, ..}, ClientMessage::ChangeAuth{auth}) => {
+use std::collections::HashSet;
use std::fmt::Debug;
use crate::card::Card;
self.state.actions_len()
}
+ pub fn players(&self) -> HashSet<&str> {
+ self.state.players()
+ }
+
pub fn view_for(&self, username: &str) -> Game {
Game {
summary: self.summary.clone(),
Action::Join{seat: 0, chips: 0} if self.state.user_has_joined(username) => Err(ActionError::AlreadyJoined),
Action::Join{seat: 0, chips: 0} if self.state.num_players() + 1 > max_users => Err(ActionError::NoSeatAvailable),
Action::Join{seat: 0, chips: 0} => Ok(()),
- Action::Message{ref message} if self.state.user_has_joined(username) => Ok(()),
+ Action::Message{..} if self.state.user_has_joined(username) => Ok(()),
Action::Message{..} => Err(ActionError::NotAuthorised),
Action::Leave if self.state.user_has_joined(username) => Ok(()),
Action::Leave => Err(ActionError::NotAuthorised),
+use std::collections::HashSet;
+
use crate::game::{Action, UserAction};
#[derive(Debug, Clone, Serialize, Deserialize)]
logged_in
}
+ pub fn players(&self) -> HashSet<&str> {
+ let mut players = HashSet::new();
+ for action in &self.actions {
+ match action.action {
+ Action::Join{..} => players.insert(&*action.username),
+ Action::Leave => players.remove(&*action.username),
+ _ => continue,
+ };
+ }
+ players
+ }
+
pub fn num_players(&self) -> u32 {
let mut num_players = 0;
for action in &self.actions {
#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;
-use std::pin::Pin;
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::mem::swap;
-use async_std::prelude::*;
-use futures::{future::select, future::Either, pin_mut, Stream, StreamExt, stream};
+use futures::{channel::mpsc::{channel, Receiver}, future::{Either, select}, select, FutureExt, SinkExt, StreamExt, pin_mut, stream::{self, FuturesUnordered, pending}};
+use redis::{aio::PubSub, Client, Msg};
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;
use tide::{Body, Request, Error, Response, StatusCode};
mod server;
use crate::api::ServerMessage;
-use crate::server::ServerState;
-use crate::client::{ClientUpdate, ConnectionState};
+use crate::server::{ClientInterestSender, Server, ServerState};
+use crate::client::{ClientInterest, ConnectionState};
+
+pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
+ #[derive(Debug)]
+ struct Client {
+ sender: ClientInterestSender,
+ interests: HashSet<ClientInterest>,
+ }
+ debug!("handle_client_interest: init");
+ let mut clients: Vec<Client> = Vec::new();
+ loop {
+ enum Action {
+ AddClient { sender: ClientInterestSender },
+ RegisterInterest { index: usize, client_interests: HashSet<ClientInterest> },
+ SendInterest { interest: Msg },
+ RemoveClient { index: usize },
+ NoNewClients,
+ ConnectionClosed,
+ }
+ match {
+ let mut connection_on_message = connection.on_message();
+ let mut next_interest = connection_on_message.next().fuse();
+ let mut next_client = new_clients.next().fuse();
+ let mut next_client_interest = FuturesUnordered::new();
+ for (index, Client{ref mut sender, ..}) in clients.iter_mut().enumerate() {
+ next_client_interest.push(sender.register_interest.next().map(move |interest| (index, interest)));
+ }
+ let mut next_client_interest = next_client_interest.select_next_some();
+ let mut action = Action::ConnectionClosed;
+ select! {
+ interest = next_interest => {
+ if let Some(interest) = interest {
+ action = Action::SendInterest{interest};
+ } else {
+ action = Action::ConnectionClosed;
+ }
+ }
+ sender = next_client => {
+ if let Some(sender) = sender {
+ action = Action::AddClient{sender};
+ } else {
+ action = Action::NoNewClients;
+ }
+ }
+ client_interests = next_client_interest => {
+ match client_interests {
+ (index, Some(client_interests)) => action = Action::RegisterInterest{index, client_interests},
+ (index, None) => action = Action::RemoveClient{index},
+ }
+ }
+ }
+ action
+ } {
+ Action::AddClient{sender} => {
+ debug!("handle_client_interest: Action::AddClient {{ sender: {:?} }}", sender);
+ clients.push(Client{sender, interests: HashSet::new()});
+ }
+ Action::RegisterInterest{index, mut client_interests} => {
+ debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
+ let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+ swap(&mut clients[index].interests, &mut client_interests);
+ let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+ for interest in &interests_after - &interests_prior {
+ debug!("handle_client_interest: Subscribing to {:?}", interest);
+ if let Err(err) = connection.subscribe(interest).await {
+ error!("handle_client_interest: Subscribe failed: {}", err);
+ }
+ }
+ for interest in &interests_prior - &interests_after {
+ debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ if let Err(err) = connection.unsubscribe(interest).await {
+ error!("handle_client_interest: Unsubscribe failed: {}", err);
+ }
+ }
+ }
+ Action::SendInterest{interest} => {
+ debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
+ if let Ok(interest) = TryFrom::try_from(interest) {
+ for Client{sender, interests} in &mut clients {
+ if interests.contains(&interest) {
+ debug!("handle_client_interest: Sending {:?} to {:?}", interest, sender);
+ if let Err(err) = sender.interest.send(interest.clone()).await {
+ error!("handle_client_interest: Send failed: {}", err);
+ }
+ }
+ }
+ }
+ }
+ Action::RemoveClient{index} => {
+ debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
+ let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+ clients.remove(index);
+ let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client{ref interests, ..}| interests.clone()).flatten().collect();
+ for interest in &interests_prior - &interests_after {
+ debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ if let Err(err) = connection.unsubscribe(interest).await {
+ error!("handle_client_interest: Unsubscribe failed: {}", err);
+ }
+ }
+ }
+ Action::NoNewClients => {
+ debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
+ if clients.is_empty() {
+ return Ok(());
+ }
+ }
+ Action::ConnectionClosed => {
+ debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
+ return Ok(());
+ }
+ }
+ }
+}
-pub async fn handle_websocket_request(request: Request<ServerState>, mut stream: WebSocketConnection) -> Result<(), Error> {
- let mut client = ConnectionState::new(request.state().clone());
- let update_stream: Pin<Box<dyn Stream<Item=ClientUpdate> + Send>>;
- update_stream = stream::empty().boxed(); // TODO
+pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
+ let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
+ let mut client = ConnectionState::new(server_state);
let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
while let Some(message) = combined.next().await {
- let message: Either<Result<Message, tide_websockets::Error>, ClientUpdate> = message;
+ let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
match message {
Either::Left(Ok(Message::Text(input))) => {
let response = match serde_json::from_str(&input) {
stream.send_json(&response).await?;
}
Either::Right(update) => {
+ debug!("client received update: {:?}", update);
let mut responses = client.retrieve_updates(update).await;
while let Some(response) = responses.next().await {
stream.send_json(&response).await?
}
}
- Either::Left(Ok(_)) => {
- error!("Websocket received non-text");
+ Either::Left(Ok(Message::Close(_))) => {
+ info!("Websocket closed");
+ break;
+ }
+ Either::Left(Ok(Message::Binary(_))) => {
+ error!("Websocket received unexpected binary data");
break;
}
+ Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
+ continue;
+ }
Either::Left(Err(err)) => {
error!("Websocket error: {}", err);
break;
async fn main() -> Result<(), Error> {
env_logger::init();
- let mut app = tide::with_state(ServerState::new("redis://localhost/").await?);
+ const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
+ let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
+
+ let client = Client::open("redis://localhost/")?;
+ let connection = client.get_multiplexed_async_std_connection().await?;
+ let pubsub = client.get_async_std_connection().await?.into_pubsub();
+ let mut app = tide::with_state(Server::new(connection, register_update_stream_tx));
app.at("/").serve_dir("site/")?;
app.at("/").serve_file("site/index.html")?;
let signals = Signals::new(&[SIGINT])?;
let signal_handler = handle_signals(signals);
+ let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
+
let listener = TlsListener::build()
.addrs("localhost:4433")
.cert("cert/cert.pem")
.key("cert/key.pem");
let app = app.listen(listener);
- pin_mut!(signal_handler, app);
+ pin_mut!(app, handle_client_interest, signal_handler);
- select(app, signal_handler).await.factor_first().0?;
+ select(select(app, handle_client_interest).map(|f| f.factor_first().0), signal_handler).await.factor_first().0?;
info!("Pokerwave shut down gracefully.");
-use futures::Stream;
-use redis::{AsyncCommands, Client, ErrorKind, FromRedisValue, IntoConnectionInfo, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection};
+use std::collections::HashSet;
+use std::convert::TryFrom;
+
+use futures::{channel::mpsc::{Receiver, Sender, channel}, SinkExt};
+use redis::{AsyncCommands, ErrorKind, FromRedisValue, Msg, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value, aio::MultiplexedConnection};
use serde::{Serialize, Deserialize};
use crate::auth::Auth;
+use crate::client::ClientInterest;
use crate::game::{Game, GameSettings, GameSummary, UserAction};
#[derive(Clone)]
+pub struct Server {
+ redis: MultiplexedConnection,
+ register_update_stream: Sender<ClientInterestSender>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestSender {
+ pub register_interest: Receiver<HashSet<ClientInterest>>,
+ pub interest: Sender<ClientInterest>,
+}
+
+#[derive(Debug)]
+pub struct ClientInterestReceiver {
+ pub register_interest: Sender<HashSet<ClientInterest>>,
+ pub interest: Receiver<ClientInterest>,
+}
+
+fn client_interest_channel() -> (ClientInterestSender, ClientInterestReceiver) {
+ const REGISTER_INTEREST_CHANNEL_BUFFER: usize = 128;
+ const INTEREST_CHANNEL_BUFFER: usize = 1024;
+ let (register_interest_tx, register_interest_rx) = channel(REGISTER_INTEREST_CHANNEL_BUFFER);
+ let (interest_tx, interest_rx) = channel(INTEREST_CHANNEL_BUFFER);
+ let tx = ClientInterestSender{register_interest: register_interest_rx, interest: interest_tx};
+ let rx = ClientInterestReceiver{register_interest: register_interest_tx, interest: interest_rx};
+ (tx, rx)
+}
+
+pub enum ClientInterestFromMsgError {
+ InvalidChannelName{channel_name: String},
+}
+
+impl TryFrom<Msg> for ClientInterest {
+ type Error = ClientInterestFromMsgError;
+ fn try_from(msg: Msg) -> Result<Self, Self::Error> {
+ let channel_name = msg.get_channel_name();
+ if channel_name == "__keyspace@0__:games" {
+ Ok(ClientInterest::GameList)
+ } else if let Some(username) = channel_name.strip_prefix("__keyspace@0__:user:") {
+ Ok(ClientInterest::User{username: username.to_string()})
+ } else if let Some(Ok(id)) = channel_name.strip_prefix("__keyspace@0__:game:").map(str::parse) {
+ Ok(ClientInterest::Game{id})
+ } else {
+ Err(ClientInterestFromMsgError::InvalidChannelName{channel_name: channel_name.to_string()})
+ }
+ }
+}
+
+impl ToRedisArgs for ClientInterest {
+ fn write_redis_args<W>(&self, out: &mut W)
+ where W: ?Sized + RedisWrite
+ {
+ match self {
+ ClientInterest::GameList => out.write_arg(b"__keyspace@0__:games"),
+ ClientInterest::Game{id} => out.write_arg_fmt(format!("__keyspace@0__:game:{}", id)),
+ ClientInterest::User{username} => out.write_arg_fmt(format!("__keyspace@0__:user:{}", username)),
+ }
+ }
+}
+
+impl Server {
+ pub fn new(redis: MultiplexedConnection, register_update_stream: Sender<ClientInterestSender>) -> Self {
+ Self {
+ redis,
+ register_update_stream
+ }
+ }
+
+ pub async fn new_state(&self) -> (ServerState, Receiver<ClientInterest>) {
+ let (client_interest_sender, ClientInterestReceiver{register_interest, interest}) = client_interest_channel();
+ if let Err(err) = self.register_update_stream.clone().send(client_interest_sender).await {
+ error!("new_state: Send failed: {}", err);
+ }
+ (ServerState{redis: self.redis.clone(), register_interest}, interest)
+ }
+}
+
pub struct ServerState {
- client: Client,
redis: MultiplexedConnection,
+ register_interest: Sender<HashSet<ClientInterest>>,
}
fn user_key(username: &str) -> String {
"#;
impl ServerState {
- pub async fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
- let client = Client::open(params)?;
- let redis = client.get_multiplexed_async_std_connection().await?;
- Ok(Self{client, redis})
- }
-
pub async fn create_user(&mut self, username: &str, auth: Auth, nickname: &str) -> RedisResult<()> {
let key = user_key(username);
if self.redis.hset_nx::<_, _, _, i32>(&key, "auth", AsJson(auth)).await? == 0 {
return Err(RedisError::from((ErrorKind::ResponseError, "User already exists")));
}
- self.redis.hset(&key, "nickname", username).await
+ self.redis.hset(&key, "nickname", nickname).await
}
pub async fn set_user_auth(&mut self, username: &str, auth: Auth) -> RedisResult<()> {
let actions: Vec<AsJson<UserAction>> = self.redis.lrange(&key, i, i + GAME_ACTION_BLOCK_SIZE).await?;
if actions.is_empty() { break; }
for AsJson(action) in actions {
- game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action")))?;
+ game.take_action(action).map_err(|err| RedisError::from((ErrorKind::ResponseError, "Invalid action", err.to_string())))?;
}
}
Ok(())
debug!("redis: executing: EVAL {:?} 1 {:?} {} {:?}", TAKE_ACTION_LUA_SCRIPT, key, len, serde_json::to_string(&action).unwrap());
redis::cmd("EVAL").arg(TAKE_ACTION_LUA_SCRIPT).arg(1).arg(key).arg(len).arg(AsJson(action)).query_async(&mut self.redis).await
}
+
+ pub async fn register_interests(&mut self, interests: HashSet<ClientInterest>) {
+ if let Err(err) = self.register_interest.send(interests).await {
+ error!("register_interests: could not register interest: {}", err);
+ }
+ }
}
pub enum ActionStatus {