#[macro_use]
extern crate serde_derive;
+use async_std::task::sleep;
use std::env::set_current_dir;
use std::fs::read_to_string;
use std::future::Future;
use clap::{app_from_crate, crate_authors, crate_description, crate_name, crate_version, AppSettings, Arg, SubCommand};
use futures::{channel::mpsc::channel, future::pending, select, FutureExt, StreamExt};
-use redis::Client;
+use redis::{Client, RedisError};
use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals;
use tide::{
use crate::client::new_client;
use crate::config::Config;
use crate::dealer::{spawn_dealers, Partition};
-use crate::pubsub::handle_client_interest_subscriptions;
+use crate::pubsub::{HandleClientInterestSubscriptionsError, handle_client_interest_subscriptions};
use crate::server::Server;
async fn handle_signals(mut signals: Signals) {
}
_ => {}
}
+
+ let mut first_attempt = true;
+
+ loop {
+ if !first_attempt {
+ let signals = Signals::new(&[SIGINT])?;
+ let signal_handler = handle_signals(signals);
+ info!("Attemping redis reconnect in {} seconds...", config.redis.connect_timeout_seconds);
+ let sleep = sleep(Duration::from_secs(config.redis.connect_timeout_seconds));
+ select! {
+ _ = sleep.fuse() => {},
+ _ = signal_handler.fuse() => break,
+ }
+ } else {
+ first_attempt = false;
+ }
- let signals = Signals::new(&[SIGINT])?;
- let signal_handler = handle_signals(signals);
+ let signals = Signals::new(&[SIGINT])?;
+ let signal_handler = handle_signals(signals);
- const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
- let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
+ const REGISTER_UPDATE_STREAM_CHANNEL_BUFFER: usize = 128;
+ let (register_update_stream_tx, register_update_stream_rx) = channel(REGISTER_UPDATE_STREAM_CHANNEL_BUFFER);
- let client = Client::open(config.redis.addr)?;
- let connection = client.get_multiplexed_async_std_connection().await?;
- let pubsub = client.get_async_std_connection().await?.into_pubsub();
- let server = Server::new(connection, register_update_stream_tx);
- let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
+ info!("Connecting to redis server...");
+ let client = Client::open(&*config.redis.addr)?;
+ let connection = match client.get_multiplexed_async_std_connection().await {
+ Ok(connection) => connection,
+ Err(err) => {
+ error!("Redis connection error: {}", err);
+ continue;
+ }
+ };
+ let pubsub = match client.get_async_std_connection().await {
+ Ok(connection) => connection.into_pubsub(),
+ Err(err) => {
+ error!("Redis connection error: {}", err);
+ continue;
+ }
+ };
- let dealer: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> =
- if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) };
+ let server = Server::new(connection, register_update_stream_tx);
+ let handle_client_interest = handle_client_interest_subscriptions(pubsub, register_update_stream_rx);
- let server: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> = if run_server {
- let mut app = tide::with_state(server);
+ let dealer: Pin<Box<dyn Future<Output = Result<(), RedisError>>>> =
+ if run_dealer { Box::pin(spawn_dealers(server.clone(), partition)) } else { Box::pin(pending()) };
- if config.server.cache {
- app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?;
- } else {
- app.at("/").serve_dir(&config.server.site)?;
- }
- app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?;
- app.at("/api").get(WebSocket::new(new_client));
- app.with(After(serve_404));
-
- let mut listener = ConcurrentListener::new();
- for addrs in config.server.bind {
- if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) {
- listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?;
+ let server: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>> = if run_server {
+ let mut app = tide::with_state(server);
+
+ if config.server.cache {
+ app.at("/").with(After(append_cache_control)).serve_dir(&config.server.site)?;
} else {
- listener.add(addrs.to_listener()?)?;
+ app.at("/").serve_dir(&config.server.site)?;
+ }
+ app.at("/").with(After(append_cache_control)).serve_file(config.server.site.join("index.html"))?;
+ app.at("/api").get(WebSocket::new(new_client));
+ app.with(After(serve_404));
+
+ let mut listener = ConcurrentListener::new();
+ for addrs in &config.server.bind {
+ if let (Some(cert), Some(key)) = (&config.server.cert, &config.server.key) {
+ listener.add(TlsListener::build().addrs(addrs).cert(&cert).key(&key))?;
+ } else {
+ listener.add(addrs.to_listener()?)?;
+ }
}
- }
-
- Box::pin(app.listen(listener))
- } else {
- Box::pin(pending())
- };
- select! {
- _ = signal_handler.fuse() => {},
- _ = handle_client_interest.fuse() => {},
- server = server.fuse() => server?,
- dealer = dealer.fuse() => dealer?,
- };
+ Box::pin(app.listen(listener))
+ } else {
+ Box::pin(pending())
+ };
+
+ select! {
+ _ = signal_handler.fuse() => break,
+ error = handle_client_interest.fuse() => match error {
+ HandleClientInterestSubscriptionsError::ClientInterestSenderHungUp => break,
+ HandleClientInterestSubscriptionsError::RedisConnectionDropped => continue,
+ },
+ server = server.fuse() => server?,
+ dealer = dealer.fuse() => match dealer {
+ Ok(()) => break,
+ Err(error) if error.is_connection_dropped() => {
+ continue
+ }
+ Err(error) => return Err(From::from(error)),
+ }
+ };
+ }
info!("Pokerwave shut down gracefully.");