diff options
authorLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-04 15:39:55 +0200
committerLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-04 15:39:55 +0200
commit023758b3b5892faf0f82fff6013bcf21b1198756 (patch)
parent2565f3be410c71c35e0d70bbea1c7be627f4ecd2 (diff)
✨ (connection_options.rs): Add get_send_sync method for ConnectionOptions
♻️ (ls_client.rs): Refactor message processing to handle multiple submessages 🐛 (ls_client.rs): Fix default instantiation of ConnectionOptions using default method
2 files changed, 159 insertions, 141 deletions
diff --git a/src/connection_options.rs b/src/connection_options.rs
index e3218df..e9b3ab0 100644
--- a/src/connection_options.rs
+++ b/src/connection_options.rs
@@ -239,6 +239,13 @@ impl ConnectionOptions {
+ /// Inquiry method that gets if LS_send_sync is to be sent to the server.
+ /// If set to false, instructs the Server not to send the SYNC notifications on this connection.
+ /// If omitted, the default is true.
+ pub fn get_send_sync(&self) -> bool {
+ self.send_sync
+ }
/// Inquiry method that gets the maximum time allowed for attempts to recover the current session
/// upon an interruption, after which a new session will be created. A 0 value also means that
/// any attempt to recover the current session is prevented in the first place.
diff --git a/src/ls_client.rs b/src/ls_client.rs
index e3317f7..63cd04d 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -383,155 +383,166 @@ impl LightstreamerClient {
message = read_stream.next() => {
match message {
Some(Ok(Message::Text(text))) => {
- let clean_text = clean_message(&text);
- let message_fields: Vec<&str> = clean_text.split(",").collect();
- match *message_fields.first().unwrap_or(&"") {
- //
- // Errors from server.
- //
- "conerr" | "reqerr" => {
- println!("Received connection error from server: {}", clean_text);
- break;
- },
- //
- // Session created successfully.
- //
- "conok" => {
- if let Some(session_id) = message_fields.get(1).as_deref() {
- println!("Session creation confirmed by server: '{}'", clean_text);
- println!("Session created with ID: {:?}", session_id);
- //
- // Subscribe to the desired items.
- //
- while let Some(subscription) = self.subscriptions.get(subscription_id) {
+ // Messages could include multiple submessages separated by /r/n.
+ // Split the message into submessages and process each one separately.
+ let submessages: Vec<&str> = text.split("\r\n")
+ .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
+ .collect();
+ for submessage in submessages {
+ let clean_text = clean_message(&submessage);
+ let submessage_fields: Vec<&str> = clean_text.split(",").collect();
+ match *submessage_fields.first().unwrap_or(&"") {
+ //
+ // Errors from server.
+ //
+ "conerr" | "reqerr" => {
+ println!("Received connection error from server: {}", clean_text);
+ break;
+ },
+ //
+ // Session created successfully.
+ //
+ "conok" => {
+ if let Some(session_id) = submessage_fields.get(1).as_deref() {
+ println!("Session creation confirmed by server: '{}'", clean_text);
+ println!("Session created with ID: {:?}", session_id);
- // Gather all the necessary subscription parameters.
+ // Subscribe to the desired items.
- request_id += 1;
- let ls_req_id = request_id.to_string();
- subscription_id += 1;
- let ls_sub_id = subscription_id.to_string();
- let ls_mode = subscription.get_mode().to_string();
- let ls_group = match subscription.get_item_group() {
- Some(item_group) => item_group.to_string(),
- None => match subscription.get_items() {
- Some(items) => {
- items.join(",")
+ while let Some(subscription) = self.subscriptions.get(subscription_id) {
+ //
+ // Gather all the necessary subscription parameters.
+ //
+ request_id += 1;
+ let ls_req_id = request_id.to_string();
+ subscription_id += 1;
+ let ls_sub_id = subscription_id.to_string();
+ let ls_mode = subscription.get_mode().to_string();
+ let ls_group = match subscription.get_item_group() {
+ Some(item_group) => item_group.to_string(),
+ None => match subscription.get_items() {
+ Some(items) => {
+ items.join(" ")
+ },
+ None => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "No item group or items found in subscription.",
+ )));
+ },
- None => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "No item group or items found in subscription.",
- )));
+ };
+ let ls_schema = match subscription.get_field_schema() {
+ Some(field_schema) => field_schema.to_string(),
+ None => match subscription.get_fields() {
+ Some(fields) => {
+ fields.join(" ")
+ },
+ None => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "No field schema or fields found in subscription.",
+ )));
+ },
- },
- };
- let ls_schema = match subscription.get_field_schema() {
- Some(field_schema) => field_schema.to_string(),
- None => match subscription.get_fields() {
- Some(fields) => {
- fields.join(",")
- },
- None => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "No field schema or fields found in subscription.",
- )));
- },
- },
- };
- let ls_data_adapter = match subscription.get_data_adapter() {
- Some(data_adapter) => data_adapter.to_string(),
- None => "".to_string(),
- };
- let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
- //
- // Prepare the subscription request.
- //
- let mut params: Vec<(&str, &str)> = vec![
- //("LS_session", session_id),
- ("LS_reqId", &ls_req_id),
- ("LS_op", "add"),
- ("LS_subId", &ls_sub_id),
- ("LS_mode", &ls_mode),
- ("LS_group", &ls_group),
- ("LS_schema", &ls_schema),
- ("LS_data_adapter", &ls_data_adapter),
- ("LS_ack", "false"),
- ];
- if ls_snapshot != "" {
- params.push(("LS_snapshot", &ls_snapshot));
+ };
+ let ls_data_adapter = match subscription.get_data_adapter() {
+ Some(data_adapter) => data_adapter.to_string(),
+ None => "".to_string(),
+ };
+ let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
+ //
+ // Prepare the subscription request.
+ //
+ let mut params: Vec<(&str, &str)> = vec![
+ //("LS_session", session_id),
+ ("LS_reqId", &ls_req_id),
+ ("LS_op", "add"),
+ ("LS_subId", &ls_sub_id),
+ ("LS_mode", &ls_mode),
+ ("LS_group", &ls_group),
+ ("LS_schema", &ls_schema),
+ ("LS_data_adapter", &ls_data_adapter),
+ ("LS_ack", "false"),
+ ];
+ if ls_snapshot != "" {
+ params.push(("LS_snapshot", &ls_snapshot));
+ }
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ write_stream
+ .send(Message::Text(format!("control\r\n{}", encoded_params)))
+ .await?;
+ println!("Sent subscription request: '{}'", encoded_params);
- let encoded_params = serde_urlencoded::to_string(&params)?;
- write_stream
- .send(Message::Text(format!("control\r\n{}", encoded_params)))
- .await?;
- println!("Sent subscription request: '{}'", encoded_params);
+ } else {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "Session ID not found in 'conok' message from server",
+ )));
- } else {
+ },
+ //
+ // Notifications from server.
+ //
+ "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => {
+ println!("Received notification from server: {}", clean_text);
+ // Don't do anything with these notifications for now.
+ },
+ "probe" => {
+ println!("Received probe message from server: '{}'", clean_text);
+ },
+ //
+ // Subscription confirmation from server.
+ //
+ "subok" => {
+ println!("Subscription confirmed by server: '{}'", clean_text);
+ },
+ //
+ // Data updates from server.
+ //
+ "u" => {
+ println!("Received data update from server: '{}'", clean_text);
+ },
+ //
+ // Connection confirmation from server.
+ //
+ "wsok" => {
+ println!("Connection confirmed by server: '{}'", clean_text);
+ //
+ // Request session creation.
+ //
+ let ls_adapter_set = match self.connection_details.get_adapter_set() {
+ Some(adapter_set) => adapter_set,
+ None => {
+ return Err(Box::new(IllegalStateException::new(
+ "No adapter set found in connection details.",
+ )));
+ },
+ };
+ let ls_send_sync = self.connection_options.get_send_sync().to_string();
+ println!("self.connection_options.get_send_sync(): {:?}", self.connection_options.get_send_sync());
+ let params: Vec<(&str, &str)> = vec![
+ ("LS_adapter_set", &ls_adapter_set),
+ ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
+ ("LS_send_sync", &ls_send_sync),
+ ("LS_protocol", "TLCP-2.5.0"),
+ ];
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ write_stream
+ .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
+ .await?;
+ println!("Sent create session request: '{}'", encoded_params);
+ },
+ unexpected_message => {
return Err(Box::new(std::io::Error::new(
- "Session ID not found in 'conok' message from server",
+ format!(
+ "Unexpected message received from server: '{:?}'",
+ unexpected_message
+ ),
- }
- },
- //
- // Notifications from server.
- //
- "cons" | "servname" | "clientip" => {
- println!("Received notification from server: {}", clean_text);
- // Don't do anything with these notifications for now.
- },
- "probe" => {
- println!("Received probe message from server: '{}'", clean_text);
- },
- //
- // Subscription confirmation from server.
- //
- "subok" => {
- println!("Subscription confirmed by server: '{}'", clean_text);
- },
- //
- // Data updates from server.
- //
- "u" => {
- println!("Received data update from server: '{}'", clean_text);
- },
- //
- // Connection confirmation from server.
- //
- "wsok" => {
- println!("Connection confirmed by server: '{}'", clean_text);
- //
- // Request session creation.
- //
- let ls_adapter_set = match self.connection_details.get_adapter_set() {
- Some(adapter_set) => adapter_set,
- None => {
- return Err(Box::new(IllegalStateException::new(
- "No adapter set found in connection details.",
- )));
- },
- };
- let params: Vec<(&str, &str)> = vec![
- ("LS_adapter_set", &ls_adapter_set),
- ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
- ("LS_protocol", "TLCP-2.5.0"),
- ];
- let encoded_params = serde_urlencoded::to_string(&params)?;
- write_stream
- .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
- .await?;
- },
- unexpected_message => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Unexpected message received from server: '{:?}'",
- unexpected_message
- ),
- )));
- },
+ },
+ }
Some(Ok(non_text_message)) => {
@@ -700,7 +711,7 @@ impl LightstreamerClient {
adapter_set: Option<&str>,
) -> Result<LightstreamerClient, IllegalStateException> {
let connection_details = ConnectionDetails::new(server_address, adapter_set);
- let connection_options = ConnectionOptions::new();
+ let connection_options = ConnectionOptions::default();
Ok(LightstreamerClient {
server_address: server_address.map(|s| s.to_string()),