From: Geoffrey Allott Date: Sat, 6 Mar 2021 15:16:43 +0000 (+0000) Subject: move methods into the relevant modules X-Git-Url: https://git.pointlesshacks.com/?a=commitdiff_plain;h=650f2940bb7a66b7ac95682aed7a18380198ffee;p=pokerwave.git move methods into the relevant modules --- diff --git a/src/client.rs b/src/client.rs index e9b38d1..1ffa0da 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,19 +1,22 @@ use std::collections::HashSet; -use futures::stream::{empty, iter, once, Stream, StreamExt}; +use futures::{channel::mpsc::Receiver, future::Either, stream::{self, empty, iter, once, Stream, StreamExt}}; +use tide::Request; +use tide_websockets::{Message, WebSocketConnection}; use crate::api::{ClientMessage, ServerMessage}; use crate::game::{Game, GameList, UserAction}; -use crate::server::{ActionStatus, ServerState}; +use crate::server::{ActionStatus, Server, ServerState}; use crate::username::Username; +use crate::util::dedup::DedupReadyExt; -pub struct ConnectionState { +struct ConnectionState { server: ServerState, client: ClientState, } #[derive(Debug, Clone)] -pub enum ClientState { +enum ClientState { Connected, LoginAuthIssued { username: Username, challenge: String }, LoggedIn { username: Username, state: LoggedInState }, @@ -28,18 +31,18 @@ pub enum ClientInterest { } #[derive(Debug, Clone)] -pub enum LoggedInState { +enum LoggedInState { Idle, InLobby { game_list: GameList }, InGame { game: Box }, } impl ConnectionState { - pub fn new(server: ServerState) -> Self { + fn new(server: ServerState) -> Self { Self { server, client: ClientState::Connected } } - pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream { + async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream { match update { ClientInterest::GameList => match &mut self.client { ClientState::LoggedIn { state: LoggedInState::InLobby { ref mut game_list }, .. } => { @@ -80,7 +83,7 @@ impl ConnectionState { } } - pub fn interests(&self) -> HashSet { + fn interests(&self) -> HashSet { let mut ret = HashSet::new(); if let ClientState::LoggedIn { username, ref state } = self.client { ret.insert(ClientInterest::User { username }); @@ -100,7 +103,7 @@ impl ConnectionState { ret } - pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage { + async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage { let interests_before = self.interests(); let response = self.message_response(message).await; let interests_after = self.interests(); @@ -231,3 +234,45 @@ impl ConnectionState { } } } + +pub async fn new_client(request: Request, stream: WebSocketConnection) -> Result<(), tide::Error> { + let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await; + let mut client = ConnectionState::new(server_state); + let update_stream = update_stream.dedup_ready(); + let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right)); + while let Some(message) = combined.next().await { + let message: Either, ClientInterest> = message; + match message { + Either::Left(Ok(Message::Text(input))) => { + let response = match serde_json::from_str(&input) { + Ok(message) => client.apply_message(message).await, + Err(err) => ServerMessage::ProtocolError { reason: err.to_string() }, + }; + stream.send_json(&response).await?; + } + Either::Right(update) => { + debug!("client received update: {:?}", update); + let mut responses = client.retrieve_updates(update).await; + while let Some(response) = responses.next().await { + stream.send_json(&response).await? + } + } + Either::Left(Ok(Message::Close(_))) => { + info!("Websocket closed"); + break; + } + Either::Left(Ok(Message::Binary(_))) => { + error!("Websocket received unexpected binary data"); + break; + } + Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => { + continue; + } + Either::Left(Err(err)) => { + error!("Websocket error: {}", err); + break; + } + } + } + Ok(()) +} diff --git a/src/dealer.rs b/src/dealer.rs index 13d6a69..98b66dc 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -1,20 +1,21 @@ use std::collections::HashSet; +use async_std::task::spawn; use futures::{channel::mpsc::Receiver, StreamExt}; use redis::{ErrorKind, RedisError, RedisResult}; use crate::client::ClientInterest; -use crate::game::{DealerAction, Game, ValidatedUserAction}; -use crate::server::{ActionStatus, ServerState}; +use crate::game::{DealerAction, Game, GameList, ValidatedUserAction}; +use crate::server::{ActionStatus, Server, ServerState}; use crate::util::dedup::DedupReadyExt; -pub struct Dealer { +struct Dealer { server: ServerState, dealer: DealerState, } #[derive(Debug, Clone)] -pub struct DealerState { +struct DealerState { game: Box, } @@ -24,7 +25,7 @@ enum Termination { } impl Dealer { - pub async fn new(mut server: ServerState, id: i64) -> RedisResult { + async fn new(mut server: ServerState, id: i64) -> RedisResult { let mut interests = HashSet::new(); interests.insert(ClientInterest::Game { id }); interests.insert(ClientInterest::Timeout { id }); @@ -37,7 +38,7 @@ impl Dealer { Ok(dealer) } - pub async fn start(mut self, update_stream: Receiver) { + async fn start(mut self, update_stream: Receiver) { let mut update_stream = update_stream.dedup_ready(); while update_stream.next().await.is_some() { match self.retrieve_updates().await { @@ -102,3 +103,36 @@ impl Dealer { } } } + +pub async fn spawn_dealers(server: Server) -> Result<(), std::io::Error> { + let (mut server_state, mut update_stream) = server.new_state().await; + let mut interests = HashSet::new(); + interests.insert(ClientInterest::GameList); + server_state.register_interests(interests).await; + let mut game_list = GameList::new("".to_string()); + loop { + let games_len = game_list.games_len(); + match server_state.game_list(games_len).await { + Ok(games) => { + for game in games { + info!("Starting new game {:?}", game); + let id = game.id(); + game_list.push(game); + let (server_state, update_stream) = server.new_state().await; + if let Ok(dealer) = Dealer::new(server_state, id).await { + info!("Spawning new dealer for game {}", id); + spawn(dealer.start(update_stream)); + } + } + } + Err(err) => { + error!("Failed to update game list: {}", err); + return Err(std::io::Error::new(std::io::ErrorKind::Other, err)); + } + } + if let Some(ClientInterest::GameList) = update_stream.next().await { + continue; + } + return Ok(()); + } +} diff --git a/src/main.rs b/src/main.rs index b1dd49a..f8cc952 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,21 +7,20 @@ use std::collections::HashSet; use std::convert::TryFrom; use std::mem::swap; -use async_std::task::spawn; use futures::{ channel::mpsc::{channel, Receiver}, - future::{select, Either}, + future::select, pin_mut, select, - stream::{self, FuturesUnordered}, + stream::FuturesUnordered, FutureExt, SinkExt, StreamExt, }; use redis::{aio::PubSub, Client, Msg}; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; use tide::utils::After; -use tide::{Body, Error, Request, Response, StatusCode}; +use tide::{Body, Error, Response, StatusCode}; //use tide_rustls::TlsListener; -use tide_websockets::{Message, WebSocket, WebSocketConnection}; +use tide_websockets::WebSocket; mod api; mod auth; @@ -35,12 +34,9 @@ mod server; mod username; mod util; -use crate::api::ServerMessage; -use crate::client::{ClientInterest, ConnectionState}; -use crate::dealer::Dealer; -use crate::game::GameList; -use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState}; -use crate::util::dedup::DedupReadyExt; +use crate::client::{ClientInterest, new_client}; +use crate::dealer::spawn_dealers; +use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server}; pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { #[derive(Debug)] @@ -168,81 +164,6 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec } } -async fn handle_new_games(server: Server) -> Result<(), std::io::Error> { - let (mut server_state, mut update_stream) = server.new_state().await; - let mut interests = HashSet::new(); - interests.insert(ClientInterest::GameList); - server_state.register_interests(interests).await; - let mut game_list = GameList::new("".to_string()); - loop { - let games_len = game_list.games_len(); - match server_state.game_list(games_len).await { - Ok(games) => { - for game in games { - info!("Starting new game {:?}", game); - let id = game.id(); - game_list.push(game); - let (server_state, update_stream) = server.new_state().await; - if let Ok(dealer) = Dealer::new(server_state, id).await { - info!("Spawning new dealer for game {}", id); - spawn(dealer.start(update_stream)); - } - } - } - Err(err) => { - error!("Failed to update game list: {}", err); - return Err(std::io::Error::new(std::io::ErrorKind::Other, err)); - } - } - if let Some(ClientInterest::GameList) = update_stream.next().await { - continue; - } - return Ok(()); - } -} - -pub async fn handle_websocket_request(request: Request, stream: WebSocketConnection) -> Result<(), Error> { - let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await; - let mut client = ConnectionState::new(server_state); - let update_stream = update_stream.dedup_ready(); - let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right)); - while let Some(message) = combined.next().await { - let message: Either, ClientInterest> = message; - match message { - Either::Left(Ok(Message::Text(input))) => { - let response = match serde_json::from_str(&input) { - Ok(message) => client.apply_message(message).await, - Err(err) => ServerMessage::ProtocolError { reason: err.to_string() }, - }; - stream.send_json(&response).await?; - } - Either::Right(update) => { - debug!("client received update: {:?}", update); - let mut responses = client.retrieve_updates(update).await; - while let Some(response) = responses.next().await { - stream.send_json(&response).await? - } - } - Either::Left(Ok(Message::Close(_))) => { - info!("Websocket closed"); - break; - } - Either::Left(Ok(Message::Binary(_))) => { - error!("Websocket received unexpected binary data"); - break; - } - Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => { - continue; - } - Either::Left(Err(err)) => { - error!("Websocket error: {}", err); - break; - } - } - } - Ok(()) -} - async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> { signals.next().await; info!("Shutting down..."); @@ -270,14 +191,14 @@ async fn main() -> Result<(), Error> { app.at("/").serve_dir("site/")?; app.at("/").serve_file("site/index.html")?; - app.at("/api").get(WebSocket::new(handle_websocket_request)); + app.at("/api").get(WebSocket::new(new_client)); app.with(After(serve_404)); let signals = Signals::new(&[SIGINT])?; let signal_handler = handle_signals(signals); let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx); - let handle_new_games = handle_new_games(app.state().clone()); + let handle_new_games = spawn_dealers(app.state().clone()); /*let listener = TlsListener::build() .addrs("localhost:4433")