diff options
author | 2024-04-04 13:26:50 +0200 | |
---|---|---|
committer | 2024-04-04 13:26:50 +0200 | |
commit | 2565f3be410c71c35e0d70bbea1c7be627f4ecd2 (patch) | |
tree | da8960baf0ffb86f0c1aebdfddb9ef419b391cb4 | |
parent | c094bf9c8bad3db0d6ae9b53f9087e081a3ba63b (diff) |
♻️ (ls_client.rs): refactor subscription logic for clarity and maintainability
♻️ (ls_client.rs): remove hardcoded base_params and use dynamic params for session creation
♻️ (ls_client.rs): remove commented-out subscription code for cleanup
✨ (ls_client.rs): handle new server messages for connection errors and subscription confirmations
🚚 (ls_client.rs): remove unnecessary println at the end of connect function
✨ (main.rs): change adapter set from "QUOTE_ADAPTER" to "DEMO" for client initialization
✨ (subscription.rs): add Default trait implementation for Snapshot enum
✨ (subscription.rs): implement ToString trait for Snapshot and SubscriptionMode enums
🐛 (subscription.rs): fix error message in set_item_group method to be more descriptive
-rw-r--r-- | src/ls_client.rs | 232 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/subscription.rs | 33 |
3 files changed, 148 insertions, 119 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs index 23b9319..e3317f7 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -206,20 +206,6 @@ impl LightstreamerClient { ))); } - let mut base_params: Vec<(&str, &str)> = Vec::new(); - - // - // Build the request base parameters. - // - if let Some(adapter_set) = &self.adapter_set { - base_params.push(("LS_adapter_set", adapter_set)); - } - - base_params.extend([ - ("LS_protocol", "TLCP-2.5.0"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), - ]); - // // Add optional parameters // @@ -401,70 +387,87 @@ impl LightstreamerClient { let message_fields: Vec<&str> = clean_text.split(",").collect(); match *message_fields.first().unwrap_or(&"") { // - // Connection confirmation from server. - // - "wsok" => { - println!("Connection confirmed by server"); - // - // Request session creation. - // - let mut params: Vec<(&str, &str)> = vec![]; - if let Some(adapter_set) = self.connection_details.get_adapter_set() { - params.push(("LS_adapter_set", adapter_set)); - } - params.extend(base_params.iter().cloned()); - let encoded_params = serde_urlencoded::to_string(¶ms)?; - write_stream - .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) - .await?; - }, - // - // Notifications from server. + // Errors from server. // - "cons" | "servname" | "clientip" => { - println!("Received notification from server: {}", clean_text); - // Don't do anything with these notifications for now. + "conerr" | "reqerr" => { + println!("Received connection error from server: {}", clean_text); + break; }, // // Session created successfully. // "conok" => { if let Some(session_id) = message_fields.get(1).as_deref() { + println!("Session creation confirmed by server: '{}'", clean_text); println!("Session created with ID: {:?}", session_id); // // Subscribe to the desired items. // - /* - let ls_item_group = self.get_subscriptions().iter().map(|subscription| { - format!( - "{},{},{}", - subscription.get_mode().to_string(), - subscription.get_items().join(","), - subscription.get_fields().join(",") - ) - }).collect::<Vec<String>>().join(";"); - */ - request_id += 1; - let ls_req_id = request_id.to_string(); - subscription_id += 1; - let ls_sub_id = subscription_id.to_string(); - let mut params: Vec<(&str, &str)> = vec![ - ("LS_session", *session_id), - ("LS_reqId", &ls_req_id), - ("LS_subId", &ls_sub_id), - ("LS_op", "add"), - ("LS_table", "1"), - ("LS_id", "1"), - ("LS_mode", "MERGE"), - ("LS_schema", "stock_name,last_price"), - ("LS_snapshot", "true"), - ]; - params.extend(base_params.iter().cloned()); - let encoded_params = serde_urlencoded::to_string(¶ms)?; - println!("Encoded params: {}", encoded_params); - write_stream - .send(Message::Text(format!("control\r\n{}\n", encoded_params))) - .await?; + while let Some(subscription) = self.subscriptions.get(subscription_id) { + // + // Gather all the necessary subscription parameters. + // + request_id += 1; + let ls_req_id = request_id.to_string(); + subscription_id += 1; + let ls_sub_id = subscription_id.to_string(); + let ls_mode = subscription.get_mode().to_string(); + let ls_group = match subscription.get_item_group() { + Some(item_group) => item_group.to_string(), + None => match subscription.get_items() { + Some(items) => { + items.join(",") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No item group or items found in subscription.", + ))); + }, + }, + }; + let ls_schema = match subscription.get_field_schema() { + Some(field_schema) => field_schema.to_string(), + None => match subscription.get_fields() { + Some(fields) => { + fields.join(",") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No field schema or fields found in subscription.", + ))); + }, + }, + }; + let ls_data_adapter = match subscription.get_data_adapter() { + Some(data_adapter) => data_adapter.to_string(), + None => "".to_string(), + }; + let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); + // + // Prepare the subscription request. + // + let mut params: Vec<(&str, &str)> = vec![ + //("LS_session", session_id), + ("LS_reqId", &ls_req_id), + ("LS_op", "add"), + ("LS_subId", &ls_sub_id), + ("LS_mode", &ls_mode), + ("LS_group", &ls_group), + ("LS_schema", &ls_schema), + ("LS_data_adapter", &ls_data_adapter), + ("LS_ack", "false"), + ]; + if ls_snapshot != "" { + params.push(("LS_snapshot", &ls_snapshot)); + } + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("control\r\n{}", encoded_params))) + .await?; + println!("Sent subscription request: '{}'", encoded_params); + } } else { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -472,14 +475,59 @@ impl LightstreamerClient { ))); } }, + // + // Notifications from server. + // + "cons" | "servname" | "clientip" => { + println!("Received notification from server: {}", clean_text); + // Don't do anything with these notifications for now. + }, "probe" => { println!("Received probe message from server: '{}'", clean_text); }, + // + // Subscription confirmation from server. + // + "subok" => { + println!("Subscription confirmed by server: '{}'", clean_text); + }, + // + // Data updates from server. + // + "u" => { + println!("Received data update from server: '{}'", clean_text); + }, + // + // Connection confirmation from server. + // + "wsok" => { + println!("Connection confirmed by server: '{}'", clean_text); + // + // Request session creation. + // + let ls_adapter_set = match self.connection_details.get_adapter_set() { + Some(adapter_set) => adapter_set, + None => { + return Err(Box::new(IllegalStateException::new( + "No adapter set found in connection details.", + ))); + }, + }; + let params: Vec<(&str, &str)> = vec![ + ("LS_adapter_set", &ls_adapter_set), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), + ("LS_protocol", "TLCP-2.5.0"), + ]; + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) + .await?; + }, unexpected_message => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!( - "Unexpected message received from server: '{}'", + "Unexpected message received from server: '{:?}'", unexpected_message ), ))); @@ -514,54 +562,6 @@ impl LightstreamerClient { } } - /* - // - // Perform subscription. - // - if let Some(session_id) = session_id { - let mut params = base_params.clone(); - params.extend([ - ("LS_session", session_id.as_str()), - ("LS_op", "add"), - ("LS_table", "1"), - ("LS_id", "1"), - ("LS_mode", "MERGE"), - ("LS_schema", "stock_name,last_price"), - ("LS_data_adapter", "QUOTE_ADAPTER"), - ("LS_snapshot", "true"), - ]); - let encoded_params = serde_urlencoded::to_string(&base_params)?; - write_stream - .send(Message::Text(format!("control\r\n{}\n", encoded_params))) - .await?; - if let Some(result) = read_stream.next().await { - match result? { - Message::Text(text) => { - let clean_text = clean_message(&text); - if clean_text.starts_with("subok") { - println!("Subscription confirmed by server"); - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unexpected message received from server: {}", clean_text), - ))); - } - } - non_text_message => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Unexpected non-text message from server: {:?}", - non_text_message - ), - ))); - } - } - } - */ - - println!("Ending function connect() to Lightstreamer server"); - Ok(()) } diff --git a/src/main.rs b/src/main.rs index c95f9e8..87fd834 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> { // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads. let client = Arc::new(Mutex::new(LightstreamerClient::new( Some("http://push.lightstreamer.com/lightstreamer"), - Some("QUOTE_ADAPTER"), + Some("DEMO"), )?)); // diff --git a/src/subscription.rs b/src/subscription.rs index be10380..e008804 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -3,14 +3,32 @@ use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; /// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum Snapshot { Yes, No, Number(usize), + #[default] None, } +impl Default for &Snapshot { + fn default() -> Self { + &Snapshot::None + } +} + +impl ToString for Snapshot { + fn to_string(&self) -> String { + match self { + Snapshot::Yes => "true".to_string(), + Snapshot::No => "false".to_string(), + Snapshot::Number(n) => n.to_string(), + Snapshot::None => "none".to_string(), + } + } +} + /// Enum representing the subscription mode. #[derive(Debug, PartialEq, Eq)] pub enum SubscriptionMode { @@ -32,6 +50,17 @@ impl SubscriptionMode { } } +impl ToString for SubscriptionMode { + fn to_string(&self) -> String { + match self { + SubscriptionMode::Merge => "MERGE".to_string(), + SubscriptionMode::Distinct => "DISTINCT".to_string(), + SubscriptionMode::Raw => "RAW".to_string(), + SubscriptionMode::Command => "COMMAND".to_string(), + } + } +} + /// Struct representing a Subscription to be submitted to a Lightstreamer Server. /// It contains subscription details and the listeners needed to process the real-time data. pub struct Subscription { @@ -187,7 +216,7 @@ impl Subscription { /// - `group`: A String to be expanded into an item list by the Metadata Adapter. pub fn set_item_group(&mut self, group: String) -> Result<(), String> { if self.is_active { - return Err("Subscription is active".to_string()); + return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string()); } self.item_group = Some(group); Ok(()) |