move methods into the relevant modules
authorGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 15:16:43 +0000 (15:16 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 15:16:43 +0000 (15:16 +0000)
src/client.rs
src/dealer.rs
src/main.rs

index e9b38d1ad7cab2ae0fc407b2c5d3c8a2be79e129..1ffa0da647cb112e9fbf582467c6c77c43dfe3b1 100644 (file)
@@ -1,19 +1,22 @@
 use std::collections::HashSet;
 
-use futures::stream::{empty, iter, once, Stream, StreamExt};
+use futures::{channel::mpsc::Receiver, future::Either, stream::{self, empty, iter, once, Stream, StreamExt}};
+use tide::Request;
+use tide_websockets::{Message, WebSocketConnection};
 
 use crate::api::{ClientMessage, ServerMessage};
 use crate::game::{Game, GameList, UserAction};
-use crate::server::{ActionStatus, ServerState};
+use crate::server::{ActionStatus, Server, ServerState};
 use crate::username::Username;
+use crate::util::dedup::DedupReadyExt;
 
-pub struct ConnectionState {
+struct ConnectionState {
     server: ServerState,
     client: ClientState,
 }
 
 #[derive(Debug, Clone)]
-pub enum ClientState {
+enum ClientState {
     Connected,
     LoginAuthIssued { username: Username, challenge: String },
     LoggedIn { username: Username, state: LoggedInState },
@@ -28,18 +31,18 @@ pub enum ClientInterest {
 }
 
 #[derive(Debug, Clone)]
-pub enum LoggedInState {
+enum LoggedInState {
     Idle,
     InLobby { game_list: GameList },
     InGame { game: Box<dyn Game> },
 }
 
 impl ConnectionState {
-    pub fn new(server: ServerState) -> Self {
+    fn new(server: ServerState) -> Self {
         Self { server, client: ClientState::Connected }
     }
 
-    pub async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item = ServerMessage> {
+    async fn retrieve_updates(&mut self, update: ClientInterest) -> impl Stream<Item = ServerMessage> {
         match update {
             ClientInterest::GameList => match &mut self.client {
                 ClientState::LoggedIn { state: LoggedInState::InLobby { ref mut game_list }, .. } => {
@@ -80,7 +83,7 @@ impl ConnectionState {
         }
     }
 
-    pub fn interests(&self) -> HashSet<ClientInterest> {
+    fn interests(&self) -> HashSet<ClientInterest> {
         let mut ret = HashSet::new();
         if let ClientState::LoggedIn { username, ref state } = self.client {
             ret.insert(ClientInterest::User { username });
@@ -100,7 +103,7 @@ impl ConnectionState {
         ret
     }
 
-    pub async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
+    async fn apply_message(&mut self, message: ClientMessage) -> ServerMessage {
         let interests_before = self.interests();
         let response = self.message_response(message).await;
         let interests_after = self.interests();
@@ -231,3 +234,45 @@ impl ConnectionState {
         }
     }
 }
+
+pub async fn new_client(request: Request<Server>, stream: WebSocketConnection) -> Result<(), tide::Error> {
+    let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
+    let mut client = ConnectionState::new(server_state);
+    let update_stream = update_stream.dedup_ready();
+    let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
+    while let Some(message) = combined.next().await {
+        let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
+        match message {
+            Either::Left(Ok(Message::Text(input))) => {
+                let response = match serde_json::from_str(&input) {
+                    Ok(message) => client.apply_message(message).await,
+                    Err(err) => ServerMessage::ProtocolError { reason: err.to_string() },
+                };
+                stream.send_json(&response).await?;
+            }
+            Either::Right(update) => {
+                debug!("client received update: {:?}", update);
+                let mut responses = client.retrieve_updates(update).await;
+                while let Some(response) = responses.next().await {
+                    stream.send_json(&response).await?
+                }
+            }
+            Either::Left(Ok(Message::Close(_))) => {
+                info!("Websocket closed");
+                break;
+            }
+            Either::Left(Ok(Message::Binary(_))) => {
+                error!("Websocket received unexpected binary data");
+                break;
+            }
+            Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
+                continue;
+            }
+            Either::Left(Err(err)) => {
+                error!("Websocket error: {}", err);
+                break;
+            }
+        }
+    }
+    Ok(())
+}
index 13d6a69a6e954509c714f3e7375ea7910df3bd9d..98b66dc9734e38d6d7643a838c647b7383a9c79b 100644 (file)
@@ -1,20 +1,21 @@
 use std::collections::HashSet;
 
+use async_std::task::spawn;
 use futures::{channel::mpsc::Receiver, StreamExt};
 use redis::{ErrorKind, RedisError, RedisResult};
 
 use crate::client::ClientInterest;
-use crate::game::{DealerAction, Game, ValidatedUserAction};
-use crate::server::{ActionStatus, ServerState};
+use crate::game::{DealerAction, Game, GameList, ValidatedUserAction};
+use crate::server::{ActionStatus, Server, ServerState};
 use crate::util::dedup::DedupReadyExt;
 
-pub struct Dealer {
+struct Dealer {
     server: ServerState,
     dealer: DealerState,
 }
 
 #[derive(Debug, Clone)]
-pub struct DealerState {
+struct DealerState {
     game: Box<dyn Game>,
 }
 
@@ -24,7 +25,7 @@ enum Termination {
 }
 
 impl Dealer {
-    pub async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
+    async fn new(mut server: ServerState, id: i64) -> RedisResult<Self> {
         let mut interests = HashSet::new();
         interests.insert(ClientInterest::Game { id });
         interests.insert(ClientInterest::Timeout { id });
@@ -37,7 +38,7 @@ impl Dealer {
         Ok(dealer)
     }
 
-    pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
+    async fn start(mut self, update_stream: Receiver<ClientInterest>) {
         let mut update_stream = update_stream.dedup_ready();
         while update_stream.next().await.is_some() {
             match self.retrieve_updates().await {
@@ -102,3 +103,36 @@ impl Dealer {
         }
     }
 }
+
+pub async fn spawn_dealers(server: Server) -> Result<(), std::io::Error> {
+    let (mut server_state, mut update_stream) = server.new_state().await;
+    let mut interests = HashSet::new();
+    interests.insert(ClientInterest::GameList);
+    server_state.register_interests(interests).await;
+    let mut game_list = GameList::new("".to_string());
+    loop {
+        let games_len = game_list.games_len();
+        match server_state.game_list(games_len).await {
+            Ok(games) => {
+                for game in games {
+                    info!("Starting new game {:?}", game);
+                    let id = game.id();
+                    game_list.push(game);
+                    let (server_state, update_stream) = server.new_state().await;
+                    if let Ok(dealer) = Dealer::new(server_state, id).await {
+                        info!("Spawning new dealer for game {}", id);
+                        spawn(dealer.start(update_stream));
+                    }
+                }
+            }
+            Err(err) => {
+                error!("Failed to update game list: {}", err);
+                return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
+            }
+        }
+        if let Some(ClientInterest::GameList) = update_stream.next().await {
+            continue;
+        }
+        return Ok(());
+    }
+}
index b1dd49a7ef82d53fda796ed990deb82ffd0e0ec8..f8cc952811889b418bf1b0b4de8ee603bca4b38a 100644 (file)
@@ -7,21 +7,20 @@ use std::collections::HashSet;
 use std::convert::TryFrom;
 use std::mem::swap;
 
-use async_std::task::spawn;
 use futures::{
     channel::mpsc::{channel, Receiver},
-    future::{select, Either},
+    future::select,
     pin_mut, select,
-    stream::{self, FuturesUnordered},
+    stream::FuturesUnordered,
     FutureExt, SinkExt, StreamExt,
 };
 use redis::{aio::PubSub, Client, Msg};
 use signal_hook::consts::signal::*;
 use signal_hook_async_std::Signals;
 use tide::utils::After;
-use tide::{Body, Error, Request, Response, StatusCode};
+use tide::{Body, Error, Response, StatusCode};
 //use tide_rustls::TlsListener;
-use tide_websockets::{Message, WebSocket, WebSocketConnection};
+use tide_websockets::WebSocket;
 
 mod api;
 mod auth;
@@ -35,12 +34,9 @@ mod server;
 mod username;
 mod util;
 
-use crate::api::ServerMessage;
-use crate::client::{ClientInterest, ConnectionState};
-use crate::dealer::Dealer;
-use crate::game::GameList;
-use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
-use crate::util::dedup::DedupReadyExt;
+use crate::client::{ClientInterest, new_client};
+use crate::dealer::spawn_dealers;
+use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server};
 
 pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
     #[derive(Debug)]
@@ -168,81 +164,6 @@ pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Rec
     }
 }
 
-async fn handle_new_games(server: Server) -> Result<(), std::io::Error> {
-    let (mut server_state, mut update_stream) = server.new_state().await;
-    let mut interests = HashSet::new();
-    interests.insert(ClientInterest::GameList);
-    server_state.register_interests(interests).await;
-    let mut game_list = GameList::new("".to_string());
-    loop {
-        let games_len = game_list.games_len();
-        match server_state.game_list(games_len).await {
-            Ok(games) => {
-                for game in games {
-                    info!("Starting new game {:?}", game);
-                    let id = game.id();
-                    game_list.push(game);
-                    let (server_state, update_stream) = server.new_state().await;
-                    if let Ok(dealer) = Dealer::new(server_state, id).await {
-                        info!("Spawning new dealer for game {}", id);
-                        spawn(dealer.start(update_stream));
-                    }
-                }
-            }
-            Err(err) => {
-                error!("Failed to update game list: {}", err);
-                return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
-            }
-        }
-        if let Some(ClientInterest::GameList) = update_stream.next().await {
-            continue;
-        }
-        return Ok(());
-    }
-}
-
-pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
-    let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
-    let mut client = ConnectionState::new(server_state);
-    let update_stream = update_stream.dedup_ready();
-    let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
-    while let Some(message) = combined.next().await {
-        let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
-        match message {
-            Either::Left(Ok(Message::Text(input))) => {
-                let response = match serde_json::from_str(&input) {
-                    Ok(message) => client.apply_message(message).await,
-                    Err(err) => ServerMessage::ProtocolError { reason: err.to_string() },
-                };
-                stream.send_json(&response).await?;
-            }
-            Either::Right(update) => {
-                debug!("client received update: {:?}", update);
-                let mut responses = client.retrieve_updates(update).await;
-                while let Some(response) = responses.next().await {
-                    stream.send_json(&response).await?
-                }
-            }
-            Either::Left(Ok(Message::Close(_))) => {
-                info!("Websocket closed");
-                break;
-            }
-            Either::Left(Ok(Message::Binary(_))) => {
-                error!("Websocket received unexpected binary data");
-                break;
-            }
-            Either::Left(Ok(Message::Ping(_))) | Either::Left(Ok(Message::Pong(_))) => {
-                continue;
-            }
-            Either::Left(Err(err)) => {
-                error!("Websocket error: {}", err);
-                break;
-            }
-        }
-    }
-    Ok(())
-}
-
 async fn handle_signals(mut signals: Signals) -> Result<(), std::io::Error> {
     signals.next().await;
     info!("Shutting down...");
@@ -270,14 +191,14 @@ async fn main() -> Result<(), Error> {
 
     app.at("/").serve_dir("site/")?;
     app.at("/").serve_file("site/index.html")?;
-    app.at("/api").get(WebSocket::new(handle_websocket_request));
+    app.at("/api").get(WebSocket::new(new_client));
     app.with(After(serve_404));
 
     let signals = Signals::new(&[SIGINT])?;
     let signal_handler = handle_signals(signals);
 
     let handle_client_interest = handle_client_interest(pubsub, register_update_stream_rx);
-    let handle_new_games = handle_new_games(app.state().clone());
+    let handle_new_games = spawn_dealers(app.state().clone());
 
     /*let listener = TlsListener::build()
         .addrs("localhost:4433")