From c6745a22e79f0556a19b0d44a181fb9d8ed78f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Aza=C3=B1a?= <daniloaz@gmail.com> Date: Sat, 30 Mar 2024 13:40:59 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20(Cargo.toml):=20add=20signal-hook?= =?UTF-8?q?=20dependency=20for=20signal=20handling=20=E2=99=BB=EF=B8=8F=20?= =?UTF-8?q?(client=5Flistener.rs):=20make=20ClientListener=20trait=20Send?= =?UTF-8?q?=20to=20allow=20cross-thread=20usage=20=E2=9C=A8=20(main.rs):?= =?UTF-8?q?=20implement=20signal=20handling=20for=20graceful=20shutdown=20?= =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(main.rs):=20refactor=20client=20creation?= =?UTF-8?q?=20to=20use=20Arc<Mutex>=20for=20shared=20state=20=E2=99=BB?= =?UTF-8?q?=EF=B8=8F=20(main.rs):=20add=20retry=20logic=20for=20persistent?= =?UTF-8?q?=20connection=20attempts=20=E2=99=BB=EF=B8=8F=20(subscription?= =?UTF-8?q?=5Flistener.rs):=20make=20SubscriptionListener=20trait=20Send?= =?UTF-8?q?=20to=20allow=20cross-thread=20usage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 1 + src/client_listener.rs | 2 +- src/main.rs | 81 +++++++++++++++++++++++++++++++++--- src/subscription_listener.rs | 2 +- 4 files changed, 78 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a4354c3..1f3faaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,5 +18,6 @@ reqwest = { version = "0", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } serde_urlencoded = "0" +signal-hook = "0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-tungstenite = { version = "0", features = ["native-tls"] } diff --git a/src/client_listener.rs b/src/client_listener.rs index 9d3d6d5..d4e9300 100644 --- a/src/client_listener.rs +++ b/src/client_listener.rs @@ -8,7 +8,7 @@ use std::fmt::Debug; /// has changed. On the other hand, all the notifications for a single `LightstreamerClient`, /// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener` /// will be dispatched by the same thread. -pub trait ClientListener: Debug { +pub trait ClientListener: Debug + Send { /// Event handler that receives a notification when the `ClientListener` instance is removed /// from a `LightstreamerClient` through `LightstreamerClient.removeListener()`. This is the /// last event to be fired on the listener. diff --git a/src/main.rs b/src/main.rs index c120c1f..1c4b05a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use hyper::client; use lightstreamer_client::item_update::ItemUpdate; use lightstreamer_client::ls_client::LightstreamerClient; use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode}; @@ -7,11 +8,48 @@ use futures::stream::StreamExt; use futures::SinkExt; use reqwest::Client; use serde_urlencoded; +use signal_hook::low_level::signal_name; +use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals}; use std::error::Error; use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +/// Sets up a signal hook for SIGINT and SIGTERM. +/// +/// Creates a signal hook for the specified signals and spawns a thread to handle them. +/// When a signal is received, it logs the signal name and performs cleanup before exiting with 0 code +/// to indicate orderly shutdown. +/// +/// # Arguments +/// +/// * `full_path` - The full path to the application configuration file. +/// +/// # Panics +/// +/// The function panics if it fails to create the signal iterator. +/// +async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) { + // Create a signal set of signals to be handled and a signal iterator to monitor them. + let signals = &[SIGINT, SIGTERM]; + let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator"); + + // Create a new thread to handle signals sent to the process + tokio::spawn(async move { + for signal in signals_iterator.forever() { + println!("Received signal: {}", signal_name(signal).unwrap()); + // + // Clean up and prepare to exit... + // ... + let mut client = client.lock().await; + client.disconnect(); + + // Exit with 0 code to indicate orderly shutdown. + std::process::exit(0); + } + }); +} + async fn establish_persistent_http_connection( session_id_shared: Arc<Mutex<String>>, ) -> Result<(), reqwest::Error> { @@ -210,6 +248,9 @@ impl SubscriptionListener for MySubscriptionListener { #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { + // + // Create a new subscription instance. + // let mut my_subscription = Subscription::new( SubscriptionMode::Merge, Some(vec![ @@ -224,15 +265,43 @@ async fn main() -> Result<(), Box<dyn Error>> { my_subscription.set_requested_snapshot(Some(Snapshot::Yes))?; my_subscription.add_listener(Box::new(MySubscriptionListener {})); - let mut client = LightstreamerClient::new( + // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads. + let client = Arc::new(Mutex::new(LightstreamerClient::new( Some("http://push.lightstreamer.com/lightstreamer"), Some("DEMO"), - )?; + )?)); - client.subscribe(my_subscription); - println!("Client: {:?}", client); + // + // Add the subscription to the client. + // + { + let mut client = client.lock().await; + client.subscribe(my_subscription); + } - client.connect(); + // Spawn a new thread to handle SIGINT and SIGTERM process signals. + setup_signal_hook(client.clone()).await; - Ok(()) + // + // Infinite loop that will indefinitely retry failed connections unless + // a SIGTERM or SIGINT signal is received. + // + let mut retry_interval_milis: u64 = 0; + let mut retry_counter: u64 = 0; + loop { + let mut client = client.lock().await; + match client.connect() { + Ok(_) => {} + Err(e) => { + println!("Failed to connect: {:?}", e); + tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await; + retry_interval_milis = (retry_interval_milis + (200 * retry_counter)).min(5000); + retry_counter += 1; + println!( + "Retrying connection in {} seconds...", + format!("{:.2}", retry_interval_milis as f64 / 1000.0) + ); + } + } + } } diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs index aaadfd9..7aa1bfc 100644 --- a/src/subscription_listener.rs +++ b/src/subscription_listener.rs @@ -8,7 +8,7 @@ use crate::item_update::ItemUpdate; /// has changed. On the other hand, all the notifications for a single LightstreamerClient, /// including notifications to ClientListener, SubscriptionListener and ClientMessageListener /// will be dispatched by the same thread. -pub trait SubscriptionListener { +pub trait SubscriptionListener: Send { /// Event handler that is called by Lightstreamer each time a request to clear the snapshot /// pertaining to an item in the Subscription has been received from the Server. /// More precisely, this kind of request can occur in two cases: