aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client_listener.rs2
-rw-r--r--src/main.rs85
-rw-r--r--src/subscription_listener.rs2
3 files changed, 79 insertions, 10 deletions
diff --git a/src/client_listener.rs b/src/client_listener.rs
index 9d3d6d5..d4e9300 100644
--- a/src/client_listener.rs
+++ b/src/client_listener.rs
@@ -8,7 +8,7 @@ use std::fmt::Debug;
/// has changed. On the other hand, all the notifications for a single `LightstreamerClient`,
/// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener`
/// will be dispatched by the same thread.
-pub trait ClientListener: Debug {
+pub trait ClientListener: Debug + Send {
/// Event handler that receives a notification when the `ClientListener` instance is removed
/// from a `LightstreamerClient` through `LightstreamerClient.removeListener()`. This is the
/// last event to be fired on the listener.
diff --git a/src/main.rs b/src/main.rs
index c120c1f..1c4b05a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,4 @@
+use hyper::client;
use lightstreamer_client::item_update::ItemUpdate;
use lightstreamer_client::ls_client::LightstreamerClient;
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
@@ -7,11 +8,48 @@ use futures::stream::StreamExt;
use futures::SinkExt;
use reqwest::Client;
use serde_urlencoded;
+use signal_hook::low_level::signal_name;
+use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+/// Sets up a signal hook for SIGINT and SIGTERM.
+///
+/// Creates a signal hook for the specified signals and spawns a thread to handle them.
+/// When a signal is received, it logs the signal name and performs cleanup before exiting with 0 code
+/// to indicate orderly shutdown.
+///
+/// # Arguments
+///
+/// * `full_path` - The full path to the application configuration file.
+///
+/// # Panics
+///
+/// The function panics if it fails to create the signal iterator.
+///
+async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
+ // Create a signal set of signals to be handled and a signal iterator to monitor them.
+ let signals = &[SIGINT, SIGTERM];
+ let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
+
+ // Create a new thread to handle signals sent to the process
+ tokio::spawn(async move {
+ for signal in signals_iterator.forever() {
+ println!("Received signal: {}", signal_name(signal).unwrap());
+ //
+ // Clean up and prepare to exit...
+ // ...
+ let mut client = client.lock().await;
+ client.disconnect();
+
+ // Exit with 0 code to indicate orderly shutdown.
+ std::process::exit(0);
+ }
+ });
+}
+
async fn establish_persistent_http_connection(
session_id_shared: Arc<Mutex<String>>,
) -> Result<(), reqwest::Error> {
@@ -210,6 +248,9 @@ impl SubscriptionListener for MySubscriptionListener {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
+ //
+ // Create a new subscription instance.
+ //
let mut my_subscription = Subscription::new(
SubscriptionMode::Merge,
Some(vec![
@@ -224,15 +265,43 @@ async fn main() -> Result<(), Box<dyn Error>> {
my_subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
my_subscription.add_listener(Box::new(MySubscriptionListener {}));
- let mut client = LightstreamerClient::new(
+ // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
+ let client = Arc::new(Mutex::new(LightstreamerClient::new(
Some("http://push.lightstreamer.com/lightstreamer"),
Some("DEMO"),
- )?;
-
- client.subscribe(my_subscription);
- println!("Client: {:?}", client);
-
- client.connect();
+ )?));
+
+ //
+ // Add the subscription to the client.
+ //
+ {
+ let mut client = client.lock().await;
+ client.subscribe(my_subscription);
+ }
- Ok(())
+ // Spawn a new thread to handle SIGINT and SIGTERM process signals.
+ setup_signal_hook(client.clone()).await;
+
+ //
+ // Infinite loop that will indefinitely retry failed connections unless
+ // a SIGTERM or SIGINT signal is received.
+ //
+ let mut retry_interval_milis: u64 = 0;
+ let mut retry_counter: u64 = 0;
+ loop {
+ let mut client = client.lock().await;
+ match client.connect() {
+ Ok(_) => {}
+ Err(e) => {
+ println!("Failed to connect: {:?}", e);
+ tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await;
+ retry_interval_milis = (retry_interval_milis + (200 * retry_counter)).min(5000);
+ retry_counter += 1;
+ println!(
+ "Retrying connection in {} seconds...",
+ format!("{:.2}", retry_interval_milis as f64 / 1000.0)
+ );
+ }
+ }
+ }
}
diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs
index aaadfd9..7aa1bfc 100644
--- a/src/subscription_listener.rs
+++ b/src/subscription_listener.rs
@@ -8,7 +8,7 @@ use crate::item_update::ItemUpdate;
/// has changed. On the other hand, all the notifications for a single LightstreamerClient,
/// including notifications to ClientListener, SubscriptionListener and ClientMessageListener
/// will be dispatched by the same thread.
-pub trait SubscriptionListener {
+pub trait SubscriptionListener: Send {
/// Event handler that is called by Lightstreamer each time a request to clear the snapshot
/// pertaining to an item in the Subscription has been received from the Server.
/// More precisely, this kind of request can occur in two cases: