use std::collections::HashSet;
-use futures::stream::{empty, iter, once, Stream, StreamExt};
+use futures::{channel::mpsc::Receiver, future::Either, stream::{self, empty, iter, once, Stream, StreamExt}};
+use tide::Request;
+use tide_websockets::{Message, WebSocketConnection};
use crate::api::{ClientMessage, ServerMessage};
use crate::game::{Game, GameList, UserAction};
-use crate::server::{ActionStatus, ServerState};
+use crate::server::{ActionStatus, Server, ServerState};
use crate::username::Username;
+use crate::util::dedup::DedupReadyExt;
-pub struct ConnectionState {
+struct ConnectionState {
server: ServerState,
client: ClientState,
}
#[derive(Debug, Clone)]
-pub enum ClientState {
+enum ClientState {
Connected,
LoginAuthIssued { username: Username, challenge: String },
LoggedIn { username: Username, state: LoggedInState },
}
#[derive(Debug, Clone)]
-pub enum LoggedInState {
+enum LoggedInState {
Idle,
InLobby { game_list: GameList },
InGame { game: Box<dyn Game> },
}
impl ConnectionState {
- pub fn new(server: ServerState) -> Self {
+ fn new(server: ServerState) -> Self {
Self { server, client: ClientState::Connected }
}
- pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item = ServerMessage> {
+ async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item = ServerMessage> {
match update {
ClientInterest::GameList => match &mut self.client {
ClientState::LoggedIn { state: LoggedInState::InLobby { ref mut game_list }, .. } => {
}
}
- pub fn interests(&self) -> HashSet<ClientInterest> {
+ fn interests(&self) -> HashSet<ClientInterest> {
let mut ret = HashSet::new();
if let ClientState::LoggedIn { username, ref state } = self.client {
ret.insert(ClientInterest::User { username });
ret
}
- pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
+ async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
let interests_before = self.interests();
let response = self.message_response(message).await;
let interests_after = self.interests();
}
}
}
+
+pub async fn new_client(request: Request<Server>, stream: WebSocketConnection) -> Result<(), tide::Error> {
+ let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
+ let mut client = ConnectionState::new(server_state);
+ let update_stream = update_stream.dedup_ready();
+ let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
+ while let Some(message) = combined.next().await {
+ let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
+ match message {
+ Either::Left(Ok(Message::Text(input))) => {
+ let response = match serde_json::from_str(&input) {
+ Ok(message) => client.apply_message(message).await,
+ Err(err) => ServerMessage::ProtocolError { reason: err.to_string() },
+ };
+ stream.send_json(&response).await?;
+ }
+ Either::Right(update) => {
+ debug!("client received update: {:?}", update);
+ let mut responses = client.retrieve_updates(update).await;
+ while let Some(response) = responses.next().await {
+ stream.send_json(&response).await?
+ }
+ }
+ Either::Left(Ok(Message::Close(_))) => {
+ info!("Websocket closed");
+ break;
+ }
+ Either::Left(Ok(Message::Binary(_))) => {
+ error!("Websocket received unexpected binary data");
+ break;
+ }
+ Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
+ continue;
+ }
+ Either::Left(Err(err)) => {
+ error!("Websocket error: {}", err);
+ break;
+ }
+ }
+ }
+ Ok(())
+}
use std::collections::HashSet;
+use async_std::task::spawn;
use futures::{channel::mpsc::Receiver, StreamExt};
use redis::{ErrorKind, RedisError, RedisResult};
use crate::client::ClientInterest;
-use crate::game::{DealerAction, Game, ValidatedUserAction};
-use crate::server::{ActionStatus, ServerState};
+use crate::game::{DealerAction, Game, GameList, ValidatedUserAction};
+use crate::server::{ActionStatus, Server, ServerState};
use crate::util::dedup::DedupReadyExt;
-pub struct Dealer {
+struct Dealer {
server: ServerState,
dealer: DealerState,
}
#[derive(Debug, Clone)]
-pub struct DealerState {
+struct DealerState {
game: Box<dyn Game>,
}
}
impl Dealer {
- pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
+ async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
let mut interests = HashSet::new();
interests.insert(ClientInterest::Game { id });
interests.insert(ClientInterest::Timeout { id });
Ok(dealer)
}
- pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
+ async fn start(mut self, update_stream: Receiver<ClientInterest>) {
let mut update_stream = update_stream.dedup_ready();
while update_stream.next().await.is_some() {
match self.retrieve_updates().await {
}
}
}
+
+pub async fn spawn_dealers(server: Server) -> Result<(), std::io::Error> {
+ let (mut server_state, mut update_stream) = server.new_state().await;
+ let mut interests = HashSet::new();
+ interests.insert(ClientInterest::GameList);
+ server_state.register_interests(interests).await;
+ let mut game_list = GameList::new("".to_string());
+ loop {
+ let games_len = game_list.games_len();
+ match server_state.game_list(games_len).await {
+ Ok(games) => {
+ for game in games {
+ info!("Starting new game {:?}", game);
+ let id = game.id();
+ game_list.push(game);
+ let (server_state, update_stream) = server.new_state().await;
+ if let Ok(dealer) = Dealer::new(server_state, id).await {
+ info!("Spawning new dealer for game {}", id);
+ spawn(dealer.start(update_stream));
+ }
+ }
+ }
+ Err(err) => {
+ error!("Failed to update game list: {}", err);
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
+ }
+ }
+ if let Some(ClientInterest::GameList) = update_stream.next().await {
+ continue;
+ }
+ return Ok(());
+ }
+}
use std::convert::TryFrom;
use std::mem::swap;
-use async_std::task::spawn;
use futures::{
channel::mpsc::{channel, Receiver},
- future::{select, Either},
+ future::select,
pin_mut, select,
- stream::{self, FuturesUnordered},
+ stream::FuturesUnordered,
FutureExt, SinkExt, StreamExt,
};
use redis::{aio::PubSub, Client, Msg};
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;
use tide::utils::After;
-use tide::{Body, Error, Request, Response, StatusCode};
+use tide::{Body, Error, Response, StatusCode};
//use tide_rustls::TlsListener;
-use tide_websockets::{Message, WebSocket, WebSocketConnection};
+use tide_websockets::WebSocket;
mod api;
mod auth;
mod username;
mod util;
-use crate::api::ServerMessage;
-use crate::client::{ClientInterest, ConnectionState};
-use crate::dealer::Dealer;
-use crate::game::GameList;
-use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
-use crate::util::dedup::DedupReadyExt;
+use crate::client::{ClientInterest, new_client};
+use crate::dealer::spawn_dealers;
+use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server};
pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
#[derive(Debug)]
}
}
-async fn handle_new_games(server: Server) -> Result<(), std::io::Error> {
- let (mut server_state, mut update_stream) = server.new_state().await;
- let mut interests = HashSet::new();
- interests.insert(ClientInterest::GameList);
- server_state.register_interests(interests).await;
- let mut game_list = GameList::new("".to_string());
- loop {
- let games_len = game_list.games_len();
- match server_state.game_list(games_len).await {
- Ok(games) => {
- for game in games {
- info!("Starting new game {:?}", game);
- let id = game.id();
- game_list.push(game);
- let (server_state, update_stream) = server.new_state().await;
- if let Ok(dealer) = Dealer::new(server_state, id).await {
- info!("Spawning new dealer for game {}", id);
- spawn(dealer.start(update_stream));
- }
- }
- }
- Err(err) => {
- error!("Failed to update game list: {}", err);
- return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
- }
- }
- if let Some(ClientInterest::GameList) = update_stream.next().await {
- continue;
- }
- return Ok(());
- }
-}
-
-pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
- let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
- let mut client = ConnectionState::new(server_state);
- let update_stream = update_stream.dedup_ready();
- let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
- while let Some(message) = combined.next().await {
- let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
- match message {
- Either::Left(Ok(Message::Text(input))) => {
- let response = match serde_json::from_str(&input) {
- Ok(message) => client.apply_message(message).await,
- Err(err) => ServerMessage::ProtocolError { reason: err.to_string() },
- };
- stream.send_json(&response).await?;
- }
- Either::Right(update) => {
- debug!("client received update: {:?}", update);
- let mut responses = client.retrieve_updates(update).await;
- while let Some(response) = responses.next().await {
- stream.send_json(&response).await?
- }
- }
- Either::Left(Ok(Message::Close(_))) => {
- info!("Websocket closed");
- break;
- }
- Either::Left(Ok(Message::Binary(_))) => {
- error!("Websocket received unexpected binary data");
- break;
- }
- Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
- continue;
- }
- Either::Left(Err(err)) => {
- error!("Websocket error: {}", err);
- break;
- }
- }
- }
- Ok(())
-}
-
async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> {
signals.next().await;
info!("Shutting down...");
app.at("/").serve_dir("site/")?;
app.at("/").serve_file("site/index.html")?;
- app.at("/api").get(WebSocket::new(handle_websocket_request));
+ app.at("/api").get(WebSocket::new(new_client));
app.with(After(serve_404));
let signals = Signals::new(&[SIGINT])?;
let signal_handler = handle_signals(signals);
let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
- let handle_new_games = handle_new_games(app.state().clone());
+ let handle_new_games = spawn_dealers(app.state().clone());
/*let listener = TlsListener::build()
.addrs("localhost:4433")