diff options
author | 2024-04-03 20:55:19 +0200 | |
---|---|---|
committer | 2024-04-03 20:55:19 +0200 | |
commit | c094bf9c8bad3db0d6ae9b53f9087e081a3ba63b (patch) | |
tree | d321e6f22b7bdaaa61c271d9931dcf52cd95e14b | |
parent | facada6f8d5aecb5ce0bc2a13042622f93f15807 (diff) |
♻️ (ls_client.rs): refactor base_params to Vec for consistent param handling
✨ (ls_client.rs): add conditional adapter_set param to base_params
♻️ (ls_client.rs): remove redundant adapter_set insertion
♻️ (ls_client.rs): refactor message processing loop for clarity and extensibility
🔧 (main.rs): extract MAX_CONNECTION_ATTEMPTS as constant for better configurability
✨ (main.rs): change adapter_set from "DEMO" to "QUOTE_ADAPTER" for client initialization
-rw-r--r-- | src/ls_client.rs | 224 | ||||
-rw-r--r-- | src/main.rs | 6 |
2 files changed, 135 insertions, 95 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs index 51e4786..23b9319 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -206,21 +206,20 @@ impl LightstreamerClient { ))); } - let mut base_params = HashMap::new(); + let mut base_params: Vec<(&str, &str)> = Vec::new(); // // Build the request base parameters. // + if let Some(adapter_set) = &self.adapter_set { + base_params.push(("LS_adapter_set", adapter_set)); + } base_params.extend([ ("LS_protocol", "TLCP-2.5.0"), ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), ]); - if let Some(adapter_set) = self.connection_details.get_adapter_set() { - base_params.insert("LS_adapter_set", adapter_set); - } - // // Add optional parameters // @@ -235,9 +234,6 @@ impl LightstreamerClient { params.insert("LS_password", password); } - if let Some(adapter_set) = &self.adapter_set { - params.insert("LS_adapter_set", adapter_set); - } if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() { params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string()); @@ -382,68 +378,143 @@ impl LightstreamerClient { // // Confirm the connection by sending a 'wsok' message to the server. // + + // + // Initiate communication with the server by sending a 'wsok' message. + // write_stream .send(Message::Text("wsok".into())) .await?; - if let Some(result) = read_stream.next().await { - match result? { - Message::Text(text) => { - let clean_text = clean_message(&text); - if clean_text == "wsok" { - println!("Connection confirmed by server"); - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unexpected message received from server: {}", clean_text), - ))); - } - } - non_text_message => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Unexpected non-text message from server: {:?}", - non_text_message - ), - ))); - } - } - } // - // Session creation. + // Start reading and processing messages from the server. // + let mut request_id: usize = 0; let mut session_id: Option<String> = None; - let encoded_params = serde_urlencoded::to_string(&base_params)?; - write_stream - .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) - .await?; - if let Some(result) = read_stream.next().await { - match result? { - Message::Text(text) => { - let clean_text = clean_message(&text); - if clean_text.starts_with("conok") { - let session_info: Vec<&str> = clean_text.split(",").collect(); - session_id = session_info.get(1).map(|s| s.to_string()); - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unexpected message received from server: {}", clean_text), - ))); + let mut subscription_id: usize = 0; + loop { + tokio::select! { + message = read_stream.next() => { + match message { + Some(Ok(Message::Text(text))) => { + let clean_text = clean_message(&text); + let message_fields: Vec<&str> = clean_text.split(",").collect(); + match *message_fields.first().unwrap_or(&"") { + // + // Connection confirmation from server. + // + "wsok" => { + println!("Connection confirmed by server"); + // + // Request session creation. + // + let mut params: Vec<(&str, &str)> = vec![]; + if let Some(adapter_set) = self.connection_details.get_adapter_set() { + params.push(("LS_adapter_set", adapter_set)); + } + params.extend(base_params.iter().cloned()); + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) + .await?; + }, + // + // Notifications from server. + // + "cons" | "servname" | "clientip" => { + println!("Received notification from server: {}", clean_text); + // Don't do anything with these notifications for now. + }, + // + // Session created successfully. + // + "conok" => { + if let Some(session_id) = message_fields.get(1).as_deref() { + println!("Session created with ID: {:?}", session_id); + // + // Subscribe to the desired items. + // + /* + let ls_item_group = self.get_subscriptions().iter().map(|subscription| { + format!( + "{},{},{}", + subscription.get_mode().to_string(), + subscription.get_items().join(","), + subscription.get_fields().join(",") + ) + }).collect::<Vec<String>>().join(";"); + */ + request_id += 1; + let ls_req_id = request_id.to_string(); + subscription_id += 1; + let ls_sub_id = subscription_id.to_string(); + let mut params: Vec<(&str, &str)> = vec![ + ("LS_session", *session_id), + ("LS_reqId", &ls_req_id), + ("LS_subId", &ls_sub_id), + ("LS_op", "add"), + ("LS_table", "1"), + ("LS_id", "1"), + ("LS_mode", "MERGE"), + ("LS_schema", "stock_name,last_price"), + ("LS_snapshot", "true"), + ]; + params.extend(base_params.iter().cloned()); + let encoded_params = serde_urlencoded::to_string(¶ms)?; + println!("Encoded params: {}", encoded_params); + write_stream + .send(Message::Text(format!("control\r\n{}\n", encoded_params))) + .await?; + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Session ID not found in 'conok' message from server", + ))); + } + }, + "probe" => { + println!("Received probe message from server: '{}'", clean_text); + }, + unexpected_message => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Unexpected message received from server: '{}'", + unexpected_message + ), + ))); + }, + } + }, + Some(Ok(non_text_message)) => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Unexpected non-text message from server: {:?}", + non_text_message + ), + ))); + }, + Some(Err(err)) => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Error reading message from server: {}", err), + ))); + }, + None => { + println!("No more messages from server"); + break; + }, } - } - non_text_message => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Unexpected non-text message from server: {:?}", - non_text_message - ), - ))); - } + }, + _ = shutdown_signal.notified() => { + println!("Received shutdown signal"); + break; + }, } } + /* // // Perform subscription. // @@ -487,40 +558,7 @@ impl LightstreamerClient { } } } - - - // Listen for messages from the server - loop { - tokio::select! { - message = read_stream.next() => { - match message { - Some(Ok(Message::Text(text))) => { - println!("Received message from server: {}", text); - }, - Some(Ok(non_text_message)) => { - println!("Received non-text message from server: {:?}", non_text_message); - }, - Some(Err(err)) => { - return Err(Box::new( - std::io::Error::new(std::io::ErrorKind::InvalidData, format!( - "Error reading message from server: {}", - err - )), - )); - }, - None => { - println!("No more messages from server"); - break; - }, - } - }, - _ = shutdown_signal.notified() => { - println!("Received shutdown signal"); - break; - }, - } - } - } + */ println!("Ending function connect() to Lightstreamer server"); diff --git a/src/main.rs b/src/main.rs index 8431333..c95f9e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use tokio::sync::{Notify, Mutex}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +const MAX_CONNECTION_ATTEMPTS: u64 = 1; + /// Sets up a signal hook for SIGINT and SIGTERM. /// /// Creates a signal hook for the specified signals and spawns a thread to handle them. @@ -79,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> { // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads. let client = Arc::new(Mutex::new(LightstreamerClient::new( Some("http://push.lightstreamer.com/lightstreamer"), - Some("DEMO"), + Some("QUOTE_ADAPTER"), )?)); // @@ -102,7 +104,7 @@ async fn main() -> Result<(), Box<dyn Error>> { // let mut retry_interval_milis: u64 = 0; let mut retry_counter: u64 = 0; - while retry_counter < 5 { + while retry_counter < MAX_CONNECTION_ATTEMPTS { let mut client = client.lock().await; match client.connect(Arc::clone(&shutdown_signal)).await { Ok(_) => { |