aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-03-31 21:39:46 +0200
committerLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-03-31 21:39:46 +0200
commitfacada6f8d5aecb5ce0bc2a13042622f93f15807 (patch)
treef872c48279d636df7bbea6edb56de4d5130c8fe5 /src/main.rs
parent7af7a7626a8e83fe3f9c3b0d2ad7d2b32da41d45 (diff)
♻️ (error.rs): remove unnecessary error conversions for IllegalArgumentException and IllegalStateException
♻️ (ls_client.rs): refactor connect method to accept shutdown signal and return generic error ✨ (ls_client.rs): add support for graceful shutdown using Notify ✨ (ls_client.rs): implement session creation and subscription logic in connect method ♻️ (main.rs): replace SharedState with Notify for handling shutdown signal ✨ (main.rs): add retry logic with a maximum of 5 retries for the client connection in main function ✨ (main.rs): ensure graceful client disconnect and orderly shutdown of the application
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs51
1 files changed, 22 insertions, 29 deletions
diff --git a/src/main.rs b/src/main.rs
index 87f9da4..8431333 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -13,14 +13,9 @@ use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
-use tokio::sync::Mutex;
+use tokio::sync::{Notify, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
-struct SharedState {
- client: Arc<Mutex<LightstreamerClient>>,
- should_disconnect: Arc<AtomicBool>,
-}
-
/// Sets up a signal hook for SIGINT and SIGTERM.
///
/// Creates a signal hook for the specified signals and spawns a thread to handle them.
@@ -35,7 +30,7 @@ struct SharedState {
///
/// The function panics if it fails to create the signal iterator.
///
-async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) {
+async fn setup_signal_hook(shutdown_signal: Arc<Notify>) {
// Create a signal set of signals to be handled and a signal iterator to monitor them.
let signals = &[SIGINT, SIGTERM];
let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
@@ -44,18 +39,8 @@ async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) {
tokio::spawn(async move {
for signal in signals_iterator.forever() {
println!("Received signal: {}", signal_name(signal).unwrap());
- //
- // Clean up and prepare to exit...
- // ...
- {
- let shared_state = shared_state.lock().await;
- shared_state.should_disconnect.store(true, Ordering::Relaxed);
- let mut client = shared_state.client.lock().await;
- client.disconnect();
- }
-
- // Exit with 0 code to indicate orderly shutdown.
- std::process::exit(0);
+ let _ = shutdown_signal.notify_one();
+ break;
}
});
}
@@ -106,14 +91,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
}
- let should_disconnect = Arc::new(AtomicBool::new(false));
- let shared_state = Arc::new(Mutex::new(SharedState {
- client: client.clone(),
- should_disconnect: should_disconnect.clone(),
- }));
-
+ // Create a new Notify instance to send a shutdown signal to the signal handler thread.
+ let shutdown_signal = Arc::new(tokio::sync::Notify::new());
// Spawn a new thread to handle SIGINT and SIGTERM process signals.
- setup_signal_hook(shared_state).await;
+ setup_signal_hook(Arc::clone(&shutdown_signal)).await;
//
// Infinite loop that will indefinitely retry failed connections unless
@@ -121,10 +102,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
//
let mut retry_interval_milis: u64 = 0;
let mut retry_counter: u64 = 0;
- loop {
+ while retry_counter < 5 {
let mut client = client.lock().await;
- match client.connect().await {
- Ok(_) => {}
+ match client.connect(Arc::clone(&shutdown_signal)).await {
+ Ok(_) => {
+ client.disconnect().await;
+ break;
+ }
Err(e) => {
println!("Failed to connect: {:?}", e);
tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await;
@@ -137,4 +121,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
}
+
+ if retry_counter == 5 {
+ println!("Failed to connect after {} retries. Exiting...", retry_counter);
+ } else {
+ println!("Exiting orderly from Lightstreamer client...");
+ }
+
+ // Exit using std::process::exit() to avoid waiting for existing tokio tasks to complete.
+ std::process::exit(0);
}