✨ (Cargo.toml): add signal-hook dependency for signal handling
♻️ (client_listener.rs): make ClientListener trait Send to allow cross-thread usage ✨ (main.rs): implement signal handling for graceful shutdown ♻️ (main.rs): refactor client creation to use Arc<Mutex> for shared state ♻️ (main.rs): add retry logic for persistent connection attempts ♻️ (subscription_listener.rs): make SubscriptionListener trait Send to allow cross-thread usage
This commit is contained in:
parent
88c8a8d7c4
commit
c6745a22e7
@ -18,5 +18,6 @@ reqwest = { version = "0", features = ["json", "stream"] }
|
|||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = { version = "1" }
|
serde_json = { version = "1" }
|
||||||
serde_urlencoded = "0"
|
serde_urlencoded = "0"
|
||||||
|
signal-hook = "0"
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
tokio-tungstenite = { version = "0", features = ["native-tls"] }
|
tokio-tungstenite = { version = "0", features = ["native-tls"] }
|
||||||
|
@ -8,7 +8,7 @@ use std::fmt::Debug;
|
|||||||
/// has changed. On the other hand, all the notifications for a single `LightstreamerClient`,
|
/// has changed. On the other hand, all the notifications for a single `LightstreamerClient`,
|
||||||
/// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener`
|
/// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener`
|
||||||
/// will be dispatched by the same thread.
|
/// will be dispatched by the same thread.
|
||||||
pub trait ClientListener: Debug {
|
pub trait ClientListener: Debug + Send {
|
||||||
/// Event handler that receives a notification when the `ClientListener` instance is removed
|
/// Event handler that receives a notification when the `ClientListener` instance is removed
|
||||||
/// from a `LightstreamerClient` through `LightstreamerClient.removeListener()`. This is the
|
/// from a `LightstreamerClient` through `LightstreamerClient.removeListener()`. This is the
|
||||||
/// last event to be fired on the listener.
|
/// last event to be fired on the listener.
|
||||||
|
81
src/main.rs
81
src/main.rs
@ -1,3 +1,4 @@
|
|||||||
|
use hyper::client;
|
||||||
use lightstreamer_client::item_update::ItemUpdate;
|
use lightstreamer_client::item_update::ItemUpdate;
|
||||||
use lightstreamer_client::ls_client::LightstreamerClient;
|
use lightstreamer_client::ls_client::LightstreamerClient;
|
||||||
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
|
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
|
||||||
@ -7,11 +8,48 @@ use futures::stream::StreamExt;
|
|||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde_urlencoded;
|
use serde_urlencoded;
|
||||||
|
use signal_hook::low_level::signal_name;
|
||||||
|
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||||
|
|
||||||
|
/// Sets up a signal hook for SIGINT and SIGTERM.
|
||||||
|
///
|
||||||
|
/// Creates a signal hook for the specified signals and spawns a thread to handle them.
|
||||||
|
/// When a signal is received, it logs the signal name and performs cleanup before exiting with 0 code
|
||||||
|
/// to indicate orderly shutdown.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `full_path` - The full path to the application configuration file.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// The function panics if it fails to create the signal iterator.
|
||||||
|
///
|
||||||
|
async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
|
||||||
|
// Create a signal set of signals to be handled and a signal iterator to monitor them.
|
||||||
|
let signals = &[SIGINT, SIGTERM];
|
||||||
|
let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
|
||||||
|
|
||||||
|
// Create a new thread to handle signals sent to the process
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for signal in signals_iterator.forever() {
|
||||||
|
println!("Received signal: {}", signal_name(signal).unwrap());
|
||||||
|
//
|
||||||
|
// Clean up and prepare to exit...
|
||||||
|
// ...
|
||||||
|
let mut client = client.lock().await;
|
||||||
|
client.disconnect();
|
||||||
|
|
||||||
|
// Exit with 0 code to indicate orderly shutdown.
|
||||||
|
std::process::exit(0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async fn establish_persistent_http_connection(
|
async fn establish_persistent_http_connection(
|
||||||
session_id_shared: Arc<Mutex<String>>,
|
session_id_shared: Arc<Mutex<String>>,
|
||||||
) -> Result<(), reqwest::Error> {
|
) -> Result<(), reqwest::Error> {
|
||||||
@ -210,6 +248,9 @@ impl SubscriptionListener for MySubscriptionListener {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn Error>> {
|
async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
//
|
||||||
|
// Create a new subscription instance.
|
||||||
|
//
|
||||||
let mut my_subscription = Subscription::new(
|
let mut my_subscription = Subscription::new(
|
||||||
SubscriptionMode::Merge,
|
SubscriptionMode::Merge,
|
||||||
Some(vec![
|
Some(vec![
|
||||||
@ -224,15 +265,43 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
my_subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
|
my_subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
|
||||||
my_subscription.add_listener(Box::new(MySubscriptionListener {}));
|
my_subscription.add_listener(Box::new(MySubscriptionListener {}));
|
||||||
|
|
||||||
let mut client = LightstreamerClient::new(
|
// Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
|
||||||
|
let client = Arc::new(Mutex::new(LightstreamerClient::new(
|
||||||
Some("http://push.lightstreamer.com/lightstreamer"),
|
Some("http://push.lightstreamer.com/lightstreamer"),
|
||||||
Some("DEMO"),
|
Some("DEMO"),
|
||||||
)?;
|
)?));
|
||||||
|
|
||||||
client.subscribe(my_subscription);
|
//
|
||||||
println!("Client: {:?}", client);
|
// Add the subscription to the client.
|
||||||
|
//
|
||||||
|
{
|
||||||
|
let mut client = client.lock().await;
|
||||||
|
client.subscribe(my_subscription);
|
||||||
|
}
|
||||||
|
|
||||||
client.connect();
|
// Spawn a new thread to handle SIGINT and SIGTERM process signals.
|
||||||
|
setup_signal_hook(client.clone()).await;
|
||||||
|
|
||||||
Ok(())
|
//
|
||||||
|
// Infinite loop that will indefinitely retry failed connections unless
|
||||||
|
// a SIGTERM or SIGINT signal is received.
|
||||||
|
//
|
||||||
|
let mut retry_interval_milis: u64 = 0;
|
||||||
|
let mut retry_counter: u64 = 0;
|
||||||
|
loop {
|
||||||
|
let mut client = client.lock().await;
|
||||||
|
match client.connect() {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Failed to connect: {:?}", e);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await;
|
||||||
|
retry_interval_milis = (retry_interval_milis + (200 * retry_counter)).min(5000);
|
||||||
|
retry_counter += 1;
|
||||||
|
println!(
|
||||||
|
"Retrying connection in {} seconds...",
|
||||||
|
format!("{:.2}", retry_interval_milis as f64 / 1000.0)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ use crate::item_update::ItemUpdate;
|
|||||||
/// has changed. On the other hand, all the notifications for a single LightstreamerClient,
|
/// has changed. On the other hand, all the notifications for a single LightstreamerClient,
|
||||||
/// including notifications to ClientListener, SubscriptionListener and ClientMessageListener
|
/// including notifications to ClientListener, SubscriptionListener and ClientMessageListener
|
||||||
/// will be dispatched by the same thread.
|
/// will be dispatched by the same thread.
|
||||||
pub trait SubscriptionListener {
|
pub trait SubscriptionListener: Send {
|
||||||
/// Event handler that is called by Lightstreamer each time a request to clear the snapshot
|
/// Event handler that is called by Lightstreamer each time a request to clear the snapshot
|
||||||
/// pertaining to an item in the Subscription has been received from the Server.
|
/// pertaining to an item in the Subscription has been received from the Server.
|
||||||
/// More precisely, this kind of request can occur in two cases:
|
/// More precisely, this kind of request can occur in two cases:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user