attempt reconnection when redis server connection is dropped
authorGeoffrey Allott <geoffrey@allott.email>
Sun, 9 Jan 2022 19:10:27 +0000 (19:10 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Sun, 9 Jan 2022 19:10:27 +0000 (19:10 +0000)
pokerwave.toml
src/config.rs
src/dealer.rs
src/main.rs
src/pubsub.rs

index 9e62b9566fc5beb30ce119b98789b49ab823a4bb..7aa806cb84ae4d3acc05da591fdc388dfb1e386f 100644 (file)
@@ -4,6 +4,7 @@
 
 [redis]
 #addr = "redis://localhost"
+#connect_timeout_seconds = 10
 
 [server]
 #bind = ["localhost:8080"]
index df917f201e0af0fffdd29cccc319c96463e29cab..ce44b72ddd90af5322ffd6ef8ac09f20264868e4 100644 (file)
@@ -36,15 +36,24 @@ fn default_redis_addr() -> String {
     "redis://localhost".to_string()
 }
 
+fn default_redis_connect_timeout_seconds() -> u64 {
+    10
+}
+
 #[derive(Clone, Debug, Deserialize)]
 pub struct RedisConfig {
     #[serde(default = "default_redis_addr")]
     pub addr: String,
+    #[serde(default = "default_redis_connect_timeout_seconds")]
+    pub connect_timeout_seconds: u64,
 }
 
 impl Default for RedisConfig {
     fn default() -> Self {
-        Self { addr: default_redis_addr() }
+        Self {
+            addr: default_redis_addr(),
+            connect_timeout_seconds: default_redis_connect_timeout_seconds(),
+        }
     }
 }
 
index 1c1d326946f69ecb5458dec49390f1191af3ceda..1dad8027f11f56192056871d06631ae3b669d8a2 100644 (file)
@@ -149,7 +149,7 @@ impl Dealer {
     }
 }
 
-pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), std::io::Error> {
+pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), RedisError> {
     let (mut server_state, mut update_stream) = server.new_state().await;
     let mut interests = HashSet::new();
     interests.insert(ClientInterest::GameList);
@@ -176,7 +176,7 @@ pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), s
             }
             Err(err) => {
                 error!("Failed to update game list: {}", err);
-                return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
+                return Err(err);
             }
         }
         if let Some(ClientInterest::GameList) = update_stream.next().await {
index cc36400aac5d1f05fb72b22ab2c4a07bfb110654..079608ea60349bf2e5bfc045431914ed364b0faa 100644 (file)
@@ -3,6 +3,7 @@ extern crate log;
 #[macro_use]
 extern crate serde_derive;
 
+use async_std::task::sleep;
 use std::env::set_current_dir;
 use std::fs::read_to_string;
 use std::future::Future;
@@ -12,7 +13,7 @@ use std::time::Duration;
 
 use clap::{app_from_crate, crate_authors, crate_description, crate_name, crate_version, AppSettings, Arg, SubCommand};
 use futures::{channel::mpsc::channel, future::pending, select, FutureExt, StreamExt};
-use redis::Client;
+use redis::{Client, RedisError};
 use signal_hook::consts::signal::*;
 use signal_hook_async_std::Signals;
 use tide::{
@@ -44,7 +45,7 @@ mod util;
 use crate::client::new_client;
 use crate::config::Config;
 use crate::dealer::{spawn_dealers, Partition};
-use crate::pubsub::handle_client_interest_subscriptions;
+use crate::pubsub::{HandleClientInterestSubscriptionsError, handle_client_interest_subscriptions};
 use crate::server::Server;
 
 async fn handle_signals(mut signals: Signals) {
@@ -124,54 +125,94 @@ async fn main() -> Result<(), tide::Error> {
         }
         _ => {}
     }
+    
+    let mut first_attempt = true;
+
+    loop {
+        if !first_attempt {
+            let signals = Signals::new(&[SIGINT])?;
+            let signal_handler = handle_signals(signals);
+            info!("Attemping redis reconnect in {} seconds...", config.redis.connect_timeout_seconds);
+            let sleep = sleep(Duration::from_secs(config.redis.connect_timeout_seconds));
+            select! {
+                _ = sleep.fuse() => {},
+                _ = signal_handler.fuse() => break,
+            }
+        } else {
+            first_attempt = false;
+        }
 
-    let signals = Signals::new(&[SIGINT])?;
-    let signal_handler = handle_signals(signals);
+        let signals = Signals::new(&[SIGINT])?;
+        let signal_handler = handle_signals(signals);
 
-    const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
-    let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
+        const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
+        let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
 
-    let client = Client::open(config.redis.addr)?;
-    let connection = client.get_multiplexed_async_std_connection().await?;
-    let pubsub = client.get_async_std_connection().await?.into_pubsub();
-    let server = Server::new(connection, register_update_stream_tx);
-    let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
+        info!("Connecting to redis server...");
+        let client = Client::open(&*config.redis.addr)?;
+        let connection = match client.get_multiplexed_async_std_connection().await {
+            Ok(connection) => connection,
+            Err(err) => {
+                error!("Redis connection error: {}", err);
+                continue;
+            }
+        };
+        let pubsub = match client.get_async_std_connection().await {
+            Ok(connection) => connection.into_pubsub(),
+            Err(err) => {
+                error!("Redis connection error: {}", err);
+                continue;
+            }
+        };
 
-    let dealer: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> =
-        if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) };
+        let server = Server::new(connection, register_update_stream_tx);
+        let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
 
-    let server: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> = if run_server {
-        let mut app = tide::with_state(server);
+        let dealer: Pin<Box<dyn Future<Output = Result<(), RedisError>>>> =
+            if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) };
 
-        if config.server.cache {
-            app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?;
-        } else {
-            app.at("/").serve_dir(&config.server.site)?;
-        }
-        app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?;
-        app.at("/api").get(WebSocket::new(new_client));
-        app.with(After(serve_404));
-
-        let mut listener = ConcurrentListener::new();
-        for addrs in config.server.bind {
-            if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) {
-                listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?;
+        let server: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> = if run_server {
+            let mut app = tide::with_state(server);
+
+            if config.server.cache {
+                app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?;
             } else {
-                listener.add(addrs.to_listener()?)?;
+                app.at("/").serve_dir(&config.server.site)?;
+            }
+            app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?;
+            app.at("/api").get(WebSocket::new(new_client));
+            app.with(After(serve_404));
+
+            let mut listener = ConcurrentListener::new();
+            for addrs in &config.server.bind {
+                if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) {
+                    listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?;
+                } else {
+                    listener.add(addrs.to_listener()?)?;
+                }
             }
-        }
-
-        Box::pin(app.listen(listener))
-    } else {
-        Box::pin(pending())
-    };
 
-    select! {
-        _ = signal_handler.fuse() => {},
-        _ = handle_client_interest.fuse() => {},
-        server = server.fuse() => server?,
-        dealer = dealer.fuse() => dealer?,
-    };
+            Box::pin(app.listen(listener))
+        } else {
+            Box::pin(pending())
+        };
+
+        select! {
+            _ = signal_handler.fuse() => break,
+            error = handle_client_interest.fuse() => match error {
+                HandleClientInterestSubscriptionsError::ClientInterestSenderHungUp => break,
+                HandleClientInterestSubscriptionsError::RedisConnectionDropped => continue,
+            },
+            server = server.fuse() => server?,
+            dealer = dealer.fuse() => match dealer {
+                Ok(()) => break,
+                Err(error) if error.is_connection_dropped() => {
+                    continue
+                }
+                Err(error) => return Err(From::from(error)),
+            }
+        };
+    }
 
     info!("Pokerwave shut down gracefully.");
 
index 2b17d5bcdd1ced52cfd3533efb1331cd4d65df5d..0e0f358f93bc7688a974c576061424845babe460 100644 (file)
@@ -102,7 +102,12 @@ impl ToRedisArgs for ClientInterest {
     }
 }
 
-pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) {
+pub enum HandleClientInterestSubscriptionsError {
+    ClientInterestSenderHungUp,
+    RedisConnectionDropped,
+}
+
+pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> HandleClientInterestSubscriptionsError {
     debug!("init");
     let mut clients: Vec<Client> = Vec::new();
     loop {
@@ -211,13 +216,15 @@ pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut ne
             }
             Action::NoNewClients => {
                 debug!("Action::NoNewClients - current clients: {:?}", clients);
+                error!("Sender<ClientInterest> hung up");
                 if clients.is_empty() {
-                    return;
+                    return HandleClientInterestSubscriptionsError::ClientInterestSenderHungUp;
                 }
             }
             Action::ConnectionClosed => {
                 debug!("Action::ConnectionClosed - current clients: {:?}", clients);
-                return;
+                error!("Redis connection dropped");
+                return HandleClientInterestSubscriptionsError::RedisConnectionDropped;
             }
         }
     }