From 2565f3be410c71c35e0d70bbea1c7be627f4ecd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Aza=C3=B1a?= Date: Thu, 4 Apr 2024 13:26:50 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ls=5Fclient.rs):=20refact?= =?UTF-8?q?or=20subscription=20logic=20for=20clarity=20and=20maintainabili?= =?UTF-8?q?ty=20=E2=99=BB=EF=B8=8F=20(ls=5Fclient.rs):=20remove=20hardcode?= =?UTF-8?q?d=20base=5Fparams=20and=20use=20dynamic=20params=20for=20sessio?= =?UTF-8?q?n=20creation=20=E2=99=BB=EF=B8=8F=20(ls=5Fclient.rs):=20remove?= =?UTF-8?q?=20commented-out=20subscription=20code=20for=20cleanup=20?= =?UTF-8?q?=E2=9C=A8=20(ls=5Fclient.rs):=20handle=20new=20server=20message?= =?UTF-8?q?s=20for=20connection=20errors=20and=20subscription=20confirmati?= =?UTF-8?q?ons=20=F0=9F=9A=9A=20(ls=5Fclient.rs):=20remove=20unnecessary?= =?UTF-8?q?=20println=20at=20the=20end=20of=20connect=20function=20?= =?UTF-8?q?=E2=9C=A8=20(main.rs):=20change=20adapter=20set=20from=20"QUOTE?= =?UTF-8?q?=5FADAPTER"=20to=20"DEMO"=20for=20client=20initialization=20?= =?UTF-8?q?=E2=9C=A8=20(subscription.rs):=20add=20Default=20trait=20implem?= =?UTF-8?q?entation=20for=20Snapshot=20enum=20=E2=9C=A8=20(subscription.rs?= =?UTF-8?q?):=20implement=20ToString=20trait=20for=20Snapshot=20and=20Subs?= =?UTF-8?q?criptionMode=20enums=20=F0=9F=90=9B=20(subscription.rs):=20fix?= =?UTF-8?q?=20error=20message=20in=20set=5Fitem=5Fgroup=20method=20to=20be?= =?UTF-8?q?=20more=20descriptive?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ls_client.rs | 248 ++++++++++++++++++++++---------------------- src/main.rs | 2 +- src/subscription.rs | 33 +++++- 3 files changed, 156 insertions(+), 127 deletions(-) diff --git a/src/ls_client.rs b/src/ls_client.rs index 23b9319..e3317f7 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -206,20 +206,6 @@ impl LightstreamerClient { ))); } - let mut base_params: Vec<(&str, &str)> = Vec::new(); - - // - // Build the request base parameters. - // - if let Some(adapter_set) = &self.adapter_set { - base_params.push(("LS_adapter_set", adapter_set)); - } - - base_params.extend([ - ("LS_protocol", "TLCP-2.5.0"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), - ]); - // // Add optional parameters // @@ -401,22 +387,93 @@ impl LightstreamerClient { let message_fields: Vec<&str> = clean_text.split(",").collect(); match *message_fields.first().unwrap_or(&"") { // - // Connection confirmation from server. + // Errors from server. // - "wsok" => { - println!("Connection confirmed by server"); - // - // Request session creation. - // - let mut params: Vec<(&str, &str)> = vec![]; - if let Some(adapter_set) = self.connection_details.get_adapter_set() { - params.push(("LS_adapter_set", adapter_set)); + "conerr" | "reqerr" => { + println!("Received connection error from server: {}", clean_text); + break; + }, + // + // Session created successfully. + // + "conok" => { + if let Some(session_id) = message_fields.get(1).as_deref() { + println!("Session creation confirmed by server: '{}'", clean_text); + println!("Session created with ID: {:?}", session_id); + // + // Subscribe to the desired items. + // + while let Some(subscription) = self.subscriptions.get(subscription_id) { + // + // Gather all the necessary subscription parameters. + // + request_id += 1; + let ls_req_id = request_id.to_string(); + subscription_id += 1; + let ls_sub_id = subscription_id.to_string(); + let ls_mode = subscription.get_mode().to_string(); + let ls_group = match subscription.get_item_group() { + Some(item_group) => item_group.to_string(), + None => match subscription.get_items() { + Some(items) => { + items.join(",") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No item group or items found in subscription.", + ))); + }, + }, + }; + let ls_schema = match subscription.get_field_schema() { + Some(field_schema) => field_schema.to_string(), + None => match subscription.get_fields() { + Some(fields) => { + fields.join(",") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No field schema or fields found in subscription.", + ))); + }, + }, + }; + let ls_data_adapter = match subscription.get_data_adapter() { + Some(data_adapter) => data_adapter.to_string(), + None => "".to_string(), + }; + let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); + // + // Prepare the subscription request. + // + let mut params: Vec<(&str, &str)> = vec![ + //("LS_session", session_id), + ("LS_reqId", &ls_req_id), + ("LS_op", "add"), + ("LS_subId", &ls_sub_id), + ("LS_mode", &ls_mode), + ("LS_group", &ls_group), + ("LS_schema", &ls_schema), + ("LS_data_adapter", &ls_data_adapter), + ("LS_ack", "false"), + ]; + if ls_snapshot != "" { + params.push(("LS_snapshot", &ls_snapshot)); + } + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("control\r\n{}", encoded_params))) + .await?; + println!("Sent subscription request: '{}'", encoded_params); + } + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Session ID not found in 'conok' message from server", + ))); } - params.extend(base_params.iter().cloned()); - let encoded_params = serde_urlencoded::to_string(¶ms)?; - write_stream - .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) - .await?; }, // // Notifications from server. @@ -425,61 +482,52 @@ impl LightstreamerClient { println!("Received notification from server: {}", clean_text); // Don't do anything with these notifications for now. }, - // - // Session created successfully. - // - "conok" => { - if let Some(session_id) = message_fields.get(1).as_deref() { - println!("Session created with ID: {:?}", session_id); - // - // Subscribe to the desired items. - // - /* - let ls_item_group = self.get_subscriptions().iter().map(|subscription| { - format!( - "{},{},{}", - subscription.get_mode().to_string(), - subscription.get_items().join(","), - subscription.get_fields().join(",") - ) - }).collect::>().join(";"); - */ - request_id += 1; - let ls_req_id = request_id.to_string(); - subscription_id += 1; - let ls_sub_id = subscription_id.to_string(); - let mut params: Vec<(&str, &str)> = vec![ - ("LS_session", *session_id), - ("LS_reqId", &ls_req_id), - ("LS_subId", &ls_sub_id), - ("LS_op", "add"), - ("LS_table", "1"), - ("LS_id", "1"), - ("LS_mode", "MERGE"), - ("LS_schema", "stock_name,last_price"), - ("LS_snapshot", "true"), - ]; - params.extend(base_params.iter().cloned()); - let encoded_params = serde_urlencoded::to_string(¶ms)?; - println!("Encoded params: {}", encoded_params); - write_stream - .send(Message::Text(format!("control\r\n{}\n", encoded_params))) - .await?; - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Session ID not found in 'conok' message from server", - ))); - } - }, "probe" => { println!("Received probe message from server: '{}'", clean_text); }, + // + // Subscription confirmation from server. + // + "subok" => { + println!("Subscription confirmed by server: '{}'", clean_text); + }, + // + // Data updates from server. + // + "u" => { + println!("Received data update from server: '{}'", clean_text); + }, + // + // Connection confirmation from server. + // + "wsok" => { + println!("Connection confirmed by server: '{}'", clean_text); + // + // Request session creation. + // + let ls_adapter_set = match self.connection_details.get_adapter_set() { + Some(adapter_set) => adapter_set, + None => { + return Err(Box::new(IllegalStateException::new( + "No adapter set found in connection details.", + ))); + }, + }; + let params: Vec<(&str, &str)> = vec![ + ("LS_adapter_set", &ls_adapter_set), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), + ("LS_protocol", "TLCP-2.5.0"), + ]; + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) + .await?; + }, unexpected_message => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!( - "Unexpected message received from server: '{}'", + "Unexpected message received from server: '{:?}'", unexpected_message ), ))); @@ -514,54 +562,6 @@ impl LightstreamerClient { } } - /* - // - // Perform subscription. - // - if let Some(session_id) = session_id { - let mut params = base_params.clone(); - params.extend([ - ("LS_session", session_id.as_str()), - ("LS_op", "add"), - ("LS_table", "1"), - ("LS_id", "1"), - ("LS_mode", "MERGE"), - ("LS_schema", "stock_name,last_price"), - ("LS_data_adapter", "QUOTE_ADAPTER"), - ("LS_snapshot", "true"), - ]); - let encoded_params = serde_urlencoded::to_string(&base_params)?; - write_stream - .send(Message::Text(format!("control\r\n{}\n", encoded_params))) - .await?; - if let Some(result) = read_stream.next().await { - match result? { - Message::Text(text) => { - let clean_text = clean_message(&text); - if clean_text.starts_with("subok") { - println!("Subscription confirmed by server"); - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unexpected message received from server: {}", clean_text), - ))); - } - } - non_text_message => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Unexpected non-text message from server: {:?}", - non_text_message - ), - ))); - } - } - } - */ - - println!("Ending function connect() to Lightstreamer server"); - Ok(()) } diff --git a/src/main.rs b/src/main.rs index c95f9e8..87fd834 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,7 @@ async fn main() -> Result<(), Box> { // Create a new Lightstreamer client instance and wrap it in an Arc> so it can be shared across threads. let client = Arc::new(Mutex::new(LightstreamerClient::new( Some("http://push.lightstreamer.com/lightstreamer"), - Some("QUOTE_ADAPTER"), + Some("DEMO"), )?)); // diff --git a/src/subscription.rs b/src/subscription.rs index be10380..e008804 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -3,14 +3,32 @@ use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; /// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum Snapshot { Yes, No, Number(usize), + #[default] None, } +impl Default for &Snapshot { + fn default() -> Self { + &Snapshot::None + } +} + +impl ToString for Snapshot { + fn to_string(&self) -> String { + match self { + Snapshot::Yes => "true".to_string(), + Snapshot::No => "false".to_string(), + Snapshot::Number(n) => n.to_string(), + Snapshot::None => "none".to_string(), + } + } +} + /// Enum representing the subscription mode. #[derive(Debug, PartialEq, Eq)] pub enum SubscriptionMode { @@ -32,6 +50,17 @@ impl SubscriptionMode { } } +impl ToString for SubscriptionMode { + fn to_string(&self) -> String { + match self { + SubscriptionMode::Merge => "MERGE".to_string(), + SubscriptionMode::Distinct => "DISTINCT".to_string(), + SubscriptionMode::Raw => "RAW".to_string(), + SubscriptionMode::Command => "COMMAND".to_string(), + } + } +} + /// Struct representing a Subscription to be submitted to a Lightstreamer Server. /// It contains subscription details and the listeners needed to process the real-time data. pub struct Subscription { @@ -187,7 +216,7 @@ impl Subscription { /// - `group`: A String to be expanded into an item list by the Metadata Adapter. pub fn set_item_group(&mut self, group: String) -> Result<(), String> { if self.is_active { - return Err("Subscription is active".to_string()); + return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string()); } self.item_group = Some(group); Ok(())