diff options
author | 2024-03-30 20:59:54 +0100 | |
---|---|---|
committer | 2024-03-30 20:59:58 +0100 | |
commit | 7af7a7626a8e83fe3f9c3b0d2ad7d2b32da41d45 (patch) | |
tree | 319b9976f6274f42e748d1b501229bb219584f66 /src/main.rs | |
parent | c6745a22e79f0556a19b0d44a181fb9d8ed78f90 (diff) |
WARNING: unstable commit.
🔧 Update .gitignore to exclude .vscode directory
✨ Add futures-util and url dependencies to Cargo.toml
♻️ Refactor error handling into separate error module in Rust project
💡 Add get_password method documentation in connection_details.rs
♻️ Replace String with Transport enum for forced_transport in connection_options.rs
✨ Implement WebSocket connection logic in ls_client.rs with async support
✨ Add ClientStatus, ConnectionType, and DisconnectionType enums to manage client states in ls_client.rs
✨ (main.rs): add Transport enum to LightstreamerClient imports for WebSocket support
♻️ (main.rs): refactor signal handling to use SharedState struct for clean shutdown
✨ (main.rs): implement AtomicBool for graceful disconnect handling
📝 (main.rs): update comments to reflect new signal handling logic
✨ (main.rs): set forced transport to WebSocket streaming in Lightstreamer client options
✨ (util.rs): create new util module with clean_message function for message sanitization
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 213 |
1 files changed, 23 insertions, 190 deletions
diff --git a/src/main.rs b/src/main.rs index 1c4b05a..87f9da4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use hyper::client; use lightstreamer_client::item_update::ItemUpdate; -use lightstreamer_client::ls_client::LightstreamerClient; +use lightstreamer_client::ls_client::{LightstreamerClient, Transport}; use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode}; use lightstreamer_client::subscription_listener::SubscriptionListener; @@ -11,10 +11,16 @@ use serde_urlencoded; use signal_hook::low_level::signal_name; use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals}; use std::error::Error; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +struct SharedState { + client: Arc<Mutex<LightstreamerClient>>, + should_disconnect: Arc<AtomicBool>, +} + /// Sets up a signal hook for SIGINT and SIGTERM. /// /// Creates a signal hook for the specified signals and spawns a thread to handle them. @@ -29,7 +35,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; /// /// The function panics if it fails to create the signal iterator. /// -async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) { +async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) { // Create a signal set of signals to be handled and a signal iterator to monitor them. let signals = &[SIGINT, SIGTERM]; let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator"); @@ -41,198 +47,18 @@ async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) { // // Clean up and prepare to exit... // ... - let mut client = client.lock().await; - client.disconnect(); - - // Exit with 0 code to indicate orderly shutdown. - std::process::exit(0); - } - }); -} - -async fn establish_persistent_http_connection( - session_id_shared: Arc<Mutex<String>>, -) -> Result<(), reqwest::Error> { - let client = Client::new(); - let params = [ - ("LS_adapter_set", "DEMO"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), - ]; - let request_url = - "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0"; - - let response = client.post(request_url).form(¶ms).send().await?; - - if response.status().is_success() { - let mut stream = response.bytes_stream(); - - while let Some(item) = stream.next().await { - match item { - Ok(bytes) => { - let response_text = String::from_utf8(bytes.to_vec()) - .expect("Failed to convert bytes to string"); - if let Some(start) = response_text.find("CONOK,") { - if let Some(end) = response_text.find(",50000,5000,*\r\n") { - let session_id = &response_text[start + 6..end]; - println!("Session ID: {}", session_id); - let mut session_id_lock = session_id_shared.lock().await; - *session_id_lock = session_id.to_string(); - } - } else { - println!("New message: {}", response_text); - } - } - Err(e) => println!("Error while receiving: {:?}", e), - } - } - } else { - println!("Response was not successful: {}", response.status()); - } - - Ok(()) -} - -/* -// Establish a persistent WebSocket connection and handle the session creation -async fn establish_persistent_ws_connection( - session_id_shared: Arc<Mutex<String>>, -) -> Result<(), Box<dyn Error>> { - let ws_url = "wss://push.lightstreamer.com/lightstreamer"; - - let (ws_stream, _) = tokio_tungstenite::connect_async_with_config( - tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned())) - ).await.expect("Failed to connect"); - - let (mut write, mut read) = ws_stream.split(); - - // Session creation parameters - let params = [ - ("LS_op2", "create_session"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"), - ("LS_adapter_set", "DEMO"), - ]; - - let encoded_params = serde_urlencoded::to_string(¶ms)?; - - // Send the create session message - write - .send(Message::Text(format!("{}\n", encoded_params))) - .await?; - - // Listen for messages from the server - while let Some(message) = read.next().await { - match message? { - Message::Text(text) => { - if text.starts_with("CONOK") { - let session_info: Vec<&str> = text.split(",").collect(); - let session_id = session_info.get(1).unwrap_or(&"").to_string(); - *session_id_shared.lock().await = session_id.clone(); - println!("Session established with ID: {}", session_id); - subscribe_to_channel_ws(session_id, write).await?; - break; // Exit after successful subscription - } - } - _ => {} - } - } - - Ok(()) -} -*/ - -async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> { - let client = Client::new(); - //let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt"; - //let params = [("LS_session", &session_id)]; - let subscribe_url = - "http://push.lightstreamer.com/lightstreamer/control.txt?LS_protocol=TLCP-2.0.0"; - let params = [ - ("LS_session", &session_id), - ("LS_op", &"add".to_string()), - ("LS_subId", &"1".to_string()), - ("LS_data_adapter", &"CHAT_ROOM".to_string()), - ("LS_group", &"chat_room".to_string()), - ("LS_schema", &"timestamp message".to_string()), - ("LS_mode", &"DISTINCT".to_string()), - ("LS_reqId", &"1".to_string()), - ]; - - let response = client.post(subscribe_url).form(¶ms).send().await?; - - if response.status().is_success() { - println!("Subscription successful!"); - } else { - println!("Subscription failed: {}", response.status()); - } - - Ok(()) -} - -// Function to subscribe to a channel using WebSocket -async fn subscribe_to_channel_ws( - session_id: String, - mut write: futures::stream::SplitSink< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, - >, - tokio_tungstenite::tungstenite::protocol::Message, - >, -) -> Result<(), Box<dyn Error>> { - // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set - let sub_params = [ - ("LS_table", "1"), - ("LS_op2", "add"), - ("LS_session", &session_id), - ("LS_id", "item1"), - ("LS_schema", "stock_name last_price"), - ("LS_mode", "MERGE"), - ]; - - let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?; - - // Send the subscription message - write.send(Message::Text(encoded_sub_params)).await?; - - println!("Subscribed to channel with session ID: {}", session_id); - - Ok(()) -} - -/* -#[tokio::main] -async fn main() -> Result<(), Box<dyn Error>> { - - let session_id_shared = Arc::new(Mutex::new(String::new())); - let session_id_shared_clone = session_id_shared.clone(); - - let task1 = tokio::spawn(async move { - establish_persistent_http_connection(session_id_shared_clone).await.unwrap(); - }); - - println!("Established connection to Lightstreamer server"); - let task2 = tokio::spawn(async move { - let mut session_established = false; - loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let session_id; { - session_id = session_id_shared.lock().await.clone(); + let shared_state = shared_state.lock().await; + shared_state.should_disconnect.store(true, Ordering::Relaxed); + let mut client = shared_state.client.lock().await; + client.disconnect(); } - if !session_established && !session_id.is_empty() { - println!("Accessed Session ID from another thread: {}", session_id); - session_established = true; - subscribe_to_channel(session_id).await.unwrap(); - } + // Exit with 0 code to indicate orderly shutdown. + std::process::exit(0); } }); - - task1.await?; - task2.await?; - - Ok(()) } -*/ pub struct MySubscriptionListener {} @@ -277,10 +103,17 @@ async fn main() -> Result<(), Box<dyn Error>> { { let mut client = client.lock().await; client.subscribe(my_subscription); + client.connection_options.set_forced_transport(Some(Transport::WsStreaming)); } + let should_disconnect = Arc::new(AtomicBool::new(false)); + let shared_state = Arc::new(Mutex::new(SharedState { + client: client.clone(), + should_disconnect: should_disconnect.clone(), + })); + // Spawn a new thread to handle SIGINT and SIGTERM process signals. - setup_signal_hook(client.clone()).await; + setup_signal_hook(shared_state).await; // // Infinite loop that will indefinitely retry failed connections unless @@ -290,7 +123,7 @@ async fn main() -> Result<(), Box<dyn Error>> { let mut retry_counter: u64 = 0; loop { let mut client = client.lock().await; - match client.connect() { + match client.connect().await { Ok(_) => {} Err(e) => { println!("Failed to connect: {:?}", e); |