From 7aa8811e8349407907bf8d5cc92ef154e2d851d7 Mon Sep 17 00:00:00 2001 From: Geoffrey Allott Date: Sun, 9 Jan 2022 19:10:27 +0000 Subject: [PATCH] attempt reconnection when redis server connection is dropped --- pokerwave.toml | 1 + src/config.rs | 11 ++++- src/dealer.rs | 4 +- src/main.rs | 123 ++++++++++++++++++++++++++++++++----------------- src/pubsub.rs | 13 ++++-- 5 files changed, 105 insertions(+), 47 deletions(-) diff --git a/pokerwave.toml b/pokerwave.toml index 9e62b95..7aa806c 100644 --- a/pokerwave.toml +++ b/pokerwave.toml @@ -4,6 +4,7 @@ [redis] #addr = "redis://localhost" +#connect_timeout_seconds = 10 [server] #bind = ["localhost:8080"] diff --git a/src/config.rs b/src/config.rs index df917f2..ce44b72 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,15 +36,24 @@ fn default_redis_addr() -> String { "redis://localhost".to_string() } +fn default_redis_connect_timeout_seconds() -> u64 { + 10 +} + #[derive(Clone, Debug, Deserialize)] pub struct RedisConfig { #[serde(default = "default_redis_addr")] pub addr: String, + #[serde(default = "default_redis_connect_timeout_seconds")] + pub connect_timeout_seconds: u64, } impl Default for RedisConfig { fn default() -> Self { - Self { addr: default_redis_addr() } + Self { + addr: default_redis_addr(), + connect_timeout_seconds: default_redis_connect_timeout_seconds(), + } } } diff --git a/src/dealer.rs b/src/dealer.rs index 1c1d326..1dad802 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -149,7 +149,7 @@ impl Dealer { } } -pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), std::io::Error> { +pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), RedisError> { let (mut server_state, mut update_stream) = server.new_state().await; let mut interests = HashSet::new(); interests.insert(ClientInterest::GameList); @@ -176,7 +176,7 @@ pub async fn spawn_dealers(server: Server, partition: Partition) -> Result<(), s } Err(err) => { error!("Failed to update game list: {}", err); - return Err(std::io::Error::new(std::io::ErrorKind::Other, err)); + return Err(err); } } if let Some(ClientInterest::GameList) = update_stream.next().await { diff --git a/src/main.rs b/src/main.rs index cc36400..079608e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ extern crate log; #[macro_use] extern crate serde_derive; +use async_std::task::sleep; use std::env::set_current_dir; use std::fs::read_to_string; use std::future::Future; @@ -12,7 +13,7 @@ use std::time::Duration; use clap::{app_from_crate, crate_authors, crate_description, crate_name, crate_version, AppSettings, Arg, SubCommand}; use futures::{channel::mpsc::channel, future::pending, select, FutureExt, StreamExt}; -use redis::Client; +use redis::{Client, RedisError}; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; use tide::{ @@ -44,7 +45,7 @@ mod util; use crate::client::new_client; use crate::config::Config; use crate::dealer::{spawn_dealers, Partition}; -use crate::pubsub::handle_client_interest_subscriptions; +use crate::pubsub::{HandleClientInterestSubscriptionsError, handle_client_interest_subscriptions}; use crate::server::Server; async fn handle_signals(mut signals: Signals) { @@ -124,54 +125,94 @@ async fn main() -> Result<(), tide::Error> { } _ => {} } + + let mut first_attempt = true; + + loop { + if !first_attempt { + let signals = Signals::new(&[SIGINT])?; + let signal_handler = handle_signals(signals); + info!("Attemping redis reconnect in {} seconds...", config.redis.connect_timeout_seconds); + let sleep = sleep(Duration::from_secs(config.redis.connect_timeout_seconds)); + select! { + _ = sleep.fuse() => {}, + _ = signal_handler.fuse() => break, + } + } else { + first_attempt = false; + } - let signals = Signals::new(&[SIGINT])?; - let signal_handler = handle_signals(signals); + let signals = Signals::new(&[SIGINT])?; + let signal_handler = handle_signals(signals); - const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128; - let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER); + const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128; + let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER); - let client = Client::open(config.redis.addr)?; - let connection = client.get_multiplexed_async_std_connection().await?; - let pubsub = client.get_async_std_connection().await?.into_pubsub(); - let server = Server::new(connection, register_update_stream_tx); - let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx); + info!("Connecting to redis server..."); + let client = Client::open(&*config.redis.addr)?; + let connection = match client.get_multiplexed_async_std_connection().await { + Ok(connection) => connection, + Err(err) => { + error!("Redis connection error: {}", err); + continue; + } + }; + let pubsub = match client.get_async_std_connection().await { + Ok(connection) => connection.into_pubsub(), + Err(err) => { + error!("Redis connection error: {}", err); + continue; + } + }; - let dealer: Pin>>> = - if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) }; + let server = Server::new(connection, register_update_stream_tx); + let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx); - let server: Pin>>> = if run_server { - let mut app = tide::with_state(server); + let dealer: Pin>>> = + if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) }; - if config.server.cache { - app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?; - } else { - app.at("/").serve_dir(&config.server.site)?; - } - app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?; - app.at("/api").get(WebSocket::new(new_client)); - app.with(After(serve_404)); - - let mut listener = ConcurrentListener::new(); - for addrs in config.server.bind { - if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) { - listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?; + let server: Pin>>> = if run_server { + let mut app = tide::with_state(server); + + if config.server.cache { + app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?; } else { - listener.add(addrs.to_listener()?)?; + app.at("/").serve_dir(&config.server.site)?; + } + app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?; + app.at("/api").get(WebSocket::new(new_client)); + app.with(After(serve_404)); + + let mut listener = ConcurrentListener::new(); + for addrs in &config.server.bind { + if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) { + listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?; + } else { + listener.add(addrs.to_listener()?)?; + } } - } - - Box::pin(app.listen(listener)) - } else { - Box::pin(pending()) - }; - select! { - _ = signal_handler.fuse() => {}, - _ = handle_client_interest.fuse() => {}, - server = server.fuse() => server?, - dealer = dealer.fuse() => dealer?, - }; + Box::pin(app.listen(listener)) + } else { + Box::pin(pending()) + }; + + select! { + _ = signal_handler.fuse() => break, + error = handle_client_interest.fuse() => match error { + HandleClientInterestSubscriptionsError::ClientInterestSenderHungUp => break, + HandleClientInterestSubscriptionsError::RedisConnectionDropped => continue, + }, + server = server.fuse() => server?, + dealer = dealer.fuse() => match dealer { + Ok(()) => break, + Err(error) if error.is_connection_dropped() => { + continue + } + Err(error) => return Err(From::from(error)), + } + }; + } info!("Pokerwave shut down gracefully."); diff --git a/src/pubsub.rs b/src/pubsub.rs index 2b17d5b..0e0f358 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -102,7 +102,12 @@ impl ToRedisArgs for ClientInterest { } } -pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver) { +pub enum HandleClientInterestSubscriptionsError { + ClientInterestSenderHungUp, + RedisConnectionDropped, +} + +pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver) -> HandleClientInterestSubscriptionsError { debug!("init"); let mut clients: Vec = Vec::new(); loop { @@ -211,13 +216,15 @@ pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut ne } Action::NoNewClients => { debug!("Action::NoNewClients - current clients: {:?}", clients); + error!("Sender hung up"); if clients.is_empty() { - return; + return HandleClientInterestSubscriptionsError::ClientInterestSenderHungUp; } } Action::ConnectionClosed => { debug!("Action::ConnectionClosed - current clients: {:?}", clients); - return; + error!("Redis connection dropped"); + return HandleClientInterestSubscriptionsError::RedisConnectionDropped; } } } -- 2.34.1