♻️ (ls_client.rs): refactor subscription logic for clarity and maintainability

♻️ (ls_client.rs): remove hardcoded base_params and use dynamic params for session creation
♻️ (ls_client.rs): remove commented-out subscription code for cleanup
 (ls_client.rs): handle new server messages for connection errors and subscription confirmations
🚚 (ls_client.rs): remove unnecessary println at the end of connect function
 (main.rs): change adapter set from "QUOTE_ADAPTER" to "DEMO" for client initialization
 (subscription.rs): add Default trait implementation for Snapshot enum
 (subscription.rs): implement ToString trait for Snapshot and SubscriptionMode enums
🐛 (subscription.rs): fix error message in set_item_group method to be more descriptive
This commit is contained in:
Daniel López Azaña 2024-04-04 13:26:50 +02:00
parent c094bf9c8b
commit 2565f3be41
3 changed files with 156 additions and 127 deletions

View File

@ -206,20 +206,6 @@ impl LightstreamerClient {
))); )));
} }
let mut base_params: Vec<(&str, &str)> = Vec::new();
//
// Build the request base parameters.
//
if let Some(adapter_set) = &self.adapter_set {
base_params.push(("LS_adapter_set", adapter_set));
}
base_params.extend([
("LS_protocol", "TLCP-2.5.0"),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
]);
// //
// Add optional parameters // Add optional parameters
// //
@ -401,22 +387,93 @@ impl LightstreamerClient {
let message_fields: Vec<&str> = clean_text.split(",").collect(); let message_fields: Vec<&str> = clean_text.split(",").collect();
match *message_fields.first().unwrap_or(&"") { match *message_fields.first().unwrap_or(&"") {
// //
// Connection confirmation from server. // Errors from server.
// //
"wsok" => { "conerr" | "reqerr" => {
println!("Connection confirmed by server"); println!("Received connection error from server: {}", clean_text);
break;
},
// //
// Request session creation. // Session created successfully.
// //
let mut params: Vec<(&str, &str)> = vec![]; "conok" => {
if let Some(adapter_set) = self.connection_details.get_adapter_set() { if let Some(session_id) = message_fields.get(1).as_deref() {
params.push(("LS_adapter_set", adapter_set)); println!("Session creation confirmed by server: '{}'", clean_text);
println!("Session created with ID: {:?}", session_id);
//
// Subscribe to the desired items.
//
while let Some(subscription) = self.subscriptions.get(subscription_id) {
//
// Gather all the necessary subscription parameters.
//
request_id += 1;
let ls_req_id = request_id.to_string();
subscription_id += 1;
let ls_sub_id = subscription_id.to_string();
let ls_mode = subscription.get_mode().to_string();
let ls_group = match subscription.get_item_group() {
Some(item_group) => item_group.to_string(),
None => match subscription.get_items() {
Some(items) => {
items.join(",")
},
None => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"No item group or items found in subscription.",
)));
},
},
};
let ls_schema = match subscription.get_field_schema() {
Some(field_schema) => field_schema.to_string(),
None => match subscription.get_fields() {
Some(fields) => {
fields.join(",")
},
None => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"No field schema or fields found in subscription.",
)));
},
},
};
let ls_data_adapter = match subscription.get_data_adapter() {
Some(data_adapter) => data_adapter.to_string(),
None => "".to_string(),
};
let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
//
// Prepare the subscription request.
//
let mut params: Vec<(&str, &str)> = vec![
//("LS_session", session_id),
("LS_reqId", &ls_req_id),
("LS_op", "add"),
("LS_subId", &ls_sub_id),
("LS_mode", &ls_mode),
("LS_group", &ls_group),
("LS_schema", &ls_schema),
("LS_data_adapter", &ls_data_adapter),
("LS_ack", "false"),
];
if ls_snapshot != "" {
params.push(("LS_snapshot", &ls_snapshot));
} }
params.extend(base_params.iter().cloned());
let encoded_params = serde_urlencoded::to_string(&params)?; let encoded_params = serde_urlencoded::to_string(&params)?;
write_stream write_stream
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) .send(Message::Text(format!("control\r\n{}", encoded_params)))
.await?; .await?;
println!("Sent subscription request: '{}'", encoded_params);
}
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Session ID not found in 'conok' message from server",
)));
}
}, },
// //
// Notifications from server. // Notifications from server.
@ -425,61 +482,52 @@ impl LightstreamerClient {
println!("Received notification from server: {}", clean_text); println!("Received notification from server: {}", clean_text);
// Don't do anything with these notifications for now. // Don't do anything with these notifications for now.
}, },
//
// Session created successfully.
//
"conok" => {
if let Some(session_id) = message_fields.get(1).as_deref() {
println!("Session created with ID: {:?}", session_id);
//
// Subscribe to the desired items.
//
/*
let ls_item_group = self.get_subscriptions().iter().map(|subscription| {
format!(
"{},{},{}",
subscription.get_mode().to_string(),
subscription.get_items().join(","),
subscription.get_fields().join(",")
)
}).collect::<Vec<String>>().join(";");
*/
request_id += 1;
let ls_req_id = request_id.to_string();
subscription_id += 1;
let ls_sub_id = subscription_id.to_string();
let mut params: Vec<(&str, &str)> = vec![
("LS_session", *session_id),
("LS_reqId", &ls_req_id),
("LS_subId", &ls_sub_id),
("LS_op", "add"),
("LS_table", "1"),
("LS_id", "1"),
("LS_mode", "MERGE"),
("LS_schema", "stock_name,last_price"),
("LS_snapshot", "true"),
];
params.extend(base_params.iter().cloned());
let encoded_params = serde_urlencoded::to_string(&params)?;
println!("Encoded params: {}", encoded_params);
write_stream
.send(Message::Text(format!("control\r\n{}\n", encoded_params)))
.await?;
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Session ID not found in 'conok' message from server",
)));
}
},
"probe" => { "probe" => {
println!("Received probe message from server: '{}'", clean_text); println!("Received probe message from server: '{}'", clean_text);
}, },
//
// Subscription confirmation from server.
//
"subok" => {
println!("Subscription confirmed by server: '{}'", clean_text);
},
//
// Data updates from server.
//
"u" => {
println!("Received data update from server: '{}'", clean_text);
},
//
// Connection confirmation from server.
//
"wsok" => {
println!("Connection confirmed by server: '{}'", clean_text);
//
// Request session creation.
//
let ls_adapter_set = match self.connection_details.get_adapter_set() {
Some(adapter_set) => adapter_set,
None => {
return Err(Box::new(IllegalStateException::new(
"No adapter set found in connection details.",
)));
},
};
let params: Vec<(&str, &str)> = vec![
("LS_adapter_set", &ls_adapter_set),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
("LS_protocol", "TLCP-2.5.0"),
];
let encoded_params = serde_urlencoded::to_string(&params)?;
write_stream
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
.await?;
},
unexpected_message => { unexpected_message => {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
format!( format!(
"Unexpected message received from server: '{}'", "Unexpected message received from server: '{:?}'",
unexpected_message unexpected_message
), ),
))); )));
@ -514,54 +562,6 @@ impl LightstreamerClient {
} }
} }
/*
//
// Perform subscription.
//
if let Some(session_id) = session_id {
let mut params = base_params.clone();
params.extend([
("LS_session", session_id.as_str()),
("LS_op", "add"),
("LS_table", "1"),
("LS_id", "1"),
("LS_mode", "MERGE"),
("LS_schema", "stock_name,last_price"),
("LS_data_adapter", "QUOTE_ADAPTER"),
("LS_snapshot", "true"),
]);
let encoded_params = serde_urlencoded::to_string(&base_params)?;
write_stream
.send(Message::Text(format!("control\r\n{}\n", encoded_params)))
.await?;
if let Some(result) = read_stream.next().await {
match result? {
Message::Text(text) => {
let clean_text = clean_message(&text);
if clean_text.starts_with("subok") {
println!("Subscription confirmed by server");
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unexpected message received from server: {}", clean_text),
)));
}
}
non_text_message => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Unexpected non-text message from server: {:?}",
non_text_message
),
)));
}
}
}
*/
println!("Ending function connect() to Lightstreamer server");
Ok(()) Ok(())
} }

View File

@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads. // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
let client = Arc::new(Mutex::new(LightstreamerClient::new( let client = Arc::new(Mutex::new(LightstreamerClient::new(
Some("http://push.lightstreamer.com/lightstreamer"), Some("http://push.lightstreamer.com/lightstreamer"),
Some("QUOTE_ADAPTER"), Some("DEMO"),
)?)); )?));
// //

View File

@ -3,14 +3,32 @@ use std::collections::HashMap;
use std::fmt::{self, Debug, Formatter}; use std::fmt::{self, Debug, Formatter};
/// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription. /// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
#[derive(Debug)] #[derive(Debug, Default)]
pub enum Snapshot { pub enum Snapshot {
Yes, Yes,
No, No,
Number(usize), Number(usize),
#[default]
None, None,
} }
impl Default for &Snapshot {
fn default() -> Self {
&Snapshot::None
}
}
impl ToString for Snapshot {
fn to_string(&self) -> String {
match self {
Snapshot::Yes => "true".to_string(),
Snapshot::No => "false".to_string(),
Snapshot::Number(n) => n.to_string(),
Snapshot::None => "none".to_string(),
}
}
}
/// Enum representing the subscription mode. /// Enum representing the subscription mode.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum SubscriptionMode { pub enum SubscriptionMode {
@ -32,6 +50,17 @@ impl SubscriptionMode {
} }
} }
impl ToString for SubscriptionMode {
fn to_string(&self) -> String {
match self {
SubscriptionMode::Merge => "MERGE".to_string(),
SubscriptionMode::Distinct => "DISTINCT".to_string(),
SubscriptionMode::Raw => "RAW".to_string(),
SubscriptionMode::Command => "COMMAND".to_string(),
}
}
}
/// Struct representing a Subscription to be submitted to a Lightstreamer Server. /// Struct representing a Subscription to be submitted to a Lightstreamer Server.
/// It contains subscription details and the listeners needed to process the real-time data. /// It contains subscription details and the listeners needed to process the real-time data.
pub struct Subscription { pub struct Subscription {
@ -187,7 +216,7 @@ impl Subscription {
/// - `group`: A String to be expanded into an item list by the Metadata Adapter. /// - `group`: A String to be expanded into an item list by the Metadata Adapter.
pub fn set_item_group(&mut self, group: String) -> Result<(), String> { pub fn set_item_group(&mut self, group: String) -> Result<(), String> {
if self.is_active { if self.is_active {
return Err("Subscription is active".to_string()); return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string());
} }
self.item_group = Some(group); self.item_group = Some(group);
Ok(()) Ok(())