aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-04 13:26:50 +0200
committerLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-04 13:26:50 +0200
commit2565f3be410c71c35e0d70bbea1c7be627f4ecd2 (patch)
treeda8960baf0ffb86f0c1aebdfddb9ef419b391cb4
parentc094bf9c8bad3db0d6ae9b53f9087e081a3ba63b (diff)
♻️ (ls_client.rs): refactor subscription logic for clarity and maintainability
♻️ (ls_client.rs): remove hardcoded base_params and use dynamic params for session creation ♻️ (ls_client.rs): remove commented-out subscription code for cleanup ✨ (ls_client.rs): handle new server messages for connection errors and subscription confirmations 🚚 (ls_client.rs): remove unnecessary println at the end of connect function ✨ (main.rs): change adapter set from "QUOTE_ADAPTER" to "DEMO" for client initialization ✨ (subscription.rs): add Default trait implementation for Snapshot enum ✨ (subscription.rs): implement ToString trait for Snapshot and SubscriptionMode enums 🐛 (subscription.rs): fix error message in set_item_group method to be more descriptive
-rw-r--r--src/ls_client.rs232
-rw-r--r--src/main.rs2
-rw-r--r--src/subscription.rs33
3 files changed, 148 insertions, 119 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs
index 23b9319..e3317f7 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -206,20 +206,6 @@ impl LightstreamerClient {
)));
}
- let mut base_params: Vec<(&str, &str)> = Vec::new();
-
- //
- // Build the request base parameters.
- //
- if let Some(adapter_set) = &self.adapter_set {
- base_params.push(("LS_adapter_set", adapter_set));
- }
-
- base_params.extend([
- ("LS_protocol", "TLCP-2.5.0"),
- ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
- ]);
-
//
// Add optional parameters
//
@@ -401,70 +387,87 @@ impl LightstreamerClient {
let message_fields: Vec<&str> = clean_text.split(",").collect();
match *message_fields.first().unwrap_or(&"") {
//
- // Connection confirmation from server.
- //
- "wsok" => {
- println!("Connection confirmed by server");
- //
- // Request session creation.
- //
- let mut params: Vec<(&str, &str)> = vec![];
- if let Some(adapter_set) = self.connection_details.get_adapter_set() {
- params.push(("LS_adapter_set", adapter_set));
- }
- params.extend(base_params.iter().cloned());
- let encoded_params = serde_urlencoded::to_string(&params)?;
- write_stream
- .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
- .await?;
- },
- //
- // Notifications from server.
+ // Errors from server.
//
- "cons" | "servname" | "clientip" => {
- println!("Received notification from server: {}", clean_text);
- // Don't do anything with these notifications for now.
+ "conerr" | "reqerr" => {
+ println!("Received connection error from server: {}", clean_text);
+ break;
},
//
// Session created successfully.
//
"conok" => {
if let Some(session_id) = message_fields.get(1).as_deref() {
+ println!("Session creation confirmed by server: '{}'", clean_text);
println!("Session created with ID: {:?}", session_id);
//
// Subscribe to the desired items.
//
- /*
- let ls_item_group = self.get_subscriptions().iter().map(|subscription| {
- format!(
- "{},{},{}",
- subscription.get_mode().to_string(),
- subscription.get_items().join(","),
- subscription.get_fields().join(",")
- )
- }).collect::<Vec<String>>().join(";");
- */
- request_id += 1;
- let ls_req_id = request_id.to_string();
- subscription_id += 1;
- let ls_sub_id = subscription_id.to_string();
- let mut params: Vec<(&str, &str)> = vec![
- ("LS_session", *session_id),
- ("LS_reqId", &ls_req_id),
- ("LS_subId", &ls_sub_id),
- ("LS_op", "add"),
- ("LS_table", "1"),
- ("LS_id", "1"),
- ("LS_mode", "MERGE"),
- ("LS_schema", "stock_name,last_price"),
- ("LS_snapshot", "true"),
- ];
- params.extend(base_params.iter().cloned());
- let encoded_params = serde_urlencoded::to_string(&params)?;
- println!("Encoded params: {}", encoded_params);
- write_stream
- .send(Message::Text(format!("control\r\n{}\n", encoded_params)))
- .await?;
+ while let Some(subscription) = self.subscriptions.get(subscription_id) {
+ //
+ // Gather all the necessary subscription parameters.
+ //
+ request_id += 1;
+ let ls_req_id = request_id.to_string();
+ subscription_id += 1;
+ let ls_sub_id = subscription_id.to_string();
+ let ls_mode = subscription.get_mode().to_string();
+ let ls_group = match subscription.get_item_group() {
+ Some(item_group) => item_group.to_string(),
+ None => match subscription.get_items() {
+ Some(items) => {
+ items.join(",")
+ },
+ None => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "No item group or items found in subscription.",
+ )));
+ },
+ },
+ };
+ let ls_schema = match subscription.get_field_schema() {
+ Some(field_schema) => field_schema.to_string(),
+ None => match subscription.get_fields() {
+ Some(fields) => {
+ fields.join(",")
+ },
+ None => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "No field schema or fields found in subscription.",
+ )));
+ },
+ },
+ };
+ let ls_data_adapter = match subscription.get_data_adapter() {
+ Some(data_adapter) => data_adapter.to_string(),
+ None => "".to_string(),
+ };
+ let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
+ //
+ // Prepare the subscription request.
+ //
+ let mut params: Vec<(&str, &str)> = vec![
+ //("LS_session", session_id),
+ ("LS_reqId", &ls_req_id),
+ ("LS_op", "add"),
+ ("LS_subId", &ls_sub_id),
+ ("LS_mode", &ls_mode),
+ ("LS_group", &ls_group),
+ ("LS_schema", &ls_schema),
+ ("LS_data_adapter", &ls_data_adapter),
+ ("LS_ack", "false"),
+ ];
+ if ls_snapshot != "" {
+ params.push(("LS_snapshot", &ls_snapshot));
+ }
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ write_stream
+ .send(Message::Text(format!("control\r\n{}", encoded_params)))
+ .await?;
+ println!("Sent subscription request: '{}'", encoded_params);
+ }
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
@@ -472,14 +475,59 @@ impl LightstreamerClient {
)));
}
},
+ //
+ // Notifications from server.
+ //
+ "cons" | "servname" | "clientip" => {
+ println!("Received notification from server: {}", clean_text);
+ // Don't do anything with these notifications for now.
+ },
"probe" => {
println!("Received probe message from server: '{}'", clean_text);
},
+ //
+ // Subscription confirmation from server.
+ //
+ "subok" => {
+ println!("Subscription confirmed by server: '{}'", clean_text);
+ },
+ //
+ // Data updates from server.
+ //
+ "u" => {
+ println!("Received data update from server: '{}'", clean_text);
+ },
+ //
+ // Connection confirmation from server.
+ //
+ "wsok" => {
+ println!("Connection confirmed by server: '{}'", clean_text);
+ //
+ // Request session creation.
+ //
+ let ls_adapter_set = match self.connection_details.get_adapter_set() {
+ Some(adapter_set) => adapter_set,
+ None => {
+ return Err(Box::new(IllegalStateException::new(
+ "No adapter set found in connection details.",
+ )));
+ },
+ };
+ let params: Vec<(&str, &str)> = vec![
+ ("LS_adapter_set", &ls_adapter_set),
+ ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
+ ("LS_protocol", "TLCP-2.5.0"),
+ ];
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ write_stream
+ .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
+ .await?;
+ },
unexpected_message => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
- "Unexpected message received from server: '{}'",
+ "Unexpected message received from server: '{:?}'",
unexpected_message
),
)));
@@ -514,54 +562,6 @@ impl LightstreamerClient {
}
}
- /*
- //
- // Perform subscription.
- //
- if let Some(session_id) = session_id {
- let mut params = base_params.clone();
- params.extend([
- ("LS_session", session_id.as_str()),
- ("LS_op", "add"),
- ("LS_table", "1"),
- ("LS_id", "1"),
- ("LS_mode", "MERGE"),
- ("LS_schema", "stock_name,last_price"),
- ("LS_data_adapter", "QUOTE_ADAPTER"),
- ("LS_snapshot", "true"),
- ]);
- let encoded_params = serde_urlencoded::to_string(&base_params)?;
- write_stream
- .send(Message::Text(format!("control\r\n{}\n", encoded_params)))
- .await?;
- if let Some(result) = read_stream.next().await {
- match result? {
- Message::Text(text) => {
- let clean_text = clean_message(&text);
- if clean_text.starts_with("subok") {
- println!("Subscription confirmed by server");
- } else {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Unexpected message received from server: {}", clean_text),
- )));
- }
- }
- non_text_message => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Unexpected non-text message from server: {:?}",
- non_text_message
- ),
- )));
- }
- }
- }
- */
-
- println!("Ending function connect() to Lightstreamer server");
-
Ok(())
}
diff --git a/src/main.rs b/src/main.rs
index c95f9e8..87fd834 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
let client = Arc::new(Mutex::new(LightstreamerClient::new(
Some("http://push.lightstreamer.com/lightstreamer"),
- Some("QUOTE_ADAPTER"),
+ Some("DEMO"),
)?));
//
diff --git a/src/subscription.rs b/src/subscription.rs
index be10380..e008804 100644
--- a/src/subscription.rs
+++ b/src/subscription.rs
@@ -3,14 +3,32 @@ use std::collections::HashMap;
use std::fmt::{self, Debug, Formatter};
/// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
-#[derive(Debug)]
+#[derive(Debug, Default)]
pub enum Snapshot {
Yes,
No,
Number(usize),
+ #[default]
None,
}
+impl Default for &Snapshot {
+ fn default() -> Self {
+ &Snapshot::None
+ }
+}
+
+impl ToString for Snapshot {
+ fn to_string(&self) -> String {
+ match self {
+ Snapshot::Yes => "true".to_string(),
+ Snapshot::No => "false".to_string(),
+ Snapshot::Number(n) => n.to_string(),
+ Snapshot::None => "none".to_string(),
+ }
+ }
+}
+
/// Enum representing the subscription mode.
#[derive(Debug, PartialEq, Eq)]
pub enum SubscriptionMode {
@@ -32,6 +50,17 @@ impl SubscriptionMode {
}
}
+impl ToString for SubscriptionMode {
+ fn to_string(&self) -> String {
+ match self {
+ SubscriptionMode::Merge => "MERGE".to_string(),
+ SubscriptionMode::Distinct => "DISTINCT".to_string(),
+ SubscriptionMode::Raw => "RAW".to_string(),
+ SubscriptionMode::Command => "COMMAND".to_string(),
+ }
+ }
+}
+
/// Struct representing a Subscription to be submitted to a Lightstreamer Server.
/// It contains subscription details and the listeners needed to process the real-time data.
pub struct Subscription {
@@ -187,7 +216,7 @@ impl Subscription {
/// - `group`: A String to be expanded into an item list by the Metadata Adapter.
pub fn set_item_group(&mut self, group: String) -> Result<(), String> {
if self.is_active {
- return Err("Subscription is active".to_string());
+ return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string());
}
self.item_group = Some(group);
Ok(())