From 023758b3b5892faf0f82fff6013bcf21b1198756 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Aza=C3=B1a?= <daniloaz@gmail.com>
Date: Thu, 4 Apr 2024 15:39:55 +0200
Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20(connection=5Foptions.rs):=20Add=20?=
 =?UTF-8?q?get=5Fsend=5Fsync=20method=20for=20ConnectionOptions=20?=
 =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ls=5Fclient.rs):=20Refactor=20message=20p?=
 =?UTF-8?q?rocessing=20to=20handle=20multiple=20submessages=20=F0=9F=90=9B?=
 =?UTF-8?q?=20(ls=5Fclient.rs):=20Fix=20default=20instantiation=20of=20Con?=
 =?UTF-8?q?nectionOptions=20using=20default=20method?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 src/connection_options.rs |   7 +
 src/ls_client.rs          | 293 ++++++++++++++++++++------------------
 2 files changed, 159 insertions(+), 141 deletions(-)

diff --git a/src/connection_options.rs b/src/connection_options.rs
index e3218df..e9b3ab0 100644
--- a/src/connection_options.rs
+++ b/src/connection_options.rs
@@ -239,6 +239,13 @@ impl ConnectionOptions {
         self.reverse_heartbeat_interval
     }
 
+    /// Inquiry method that gets if LS_send_sync is to be sent to the server.
+    /// If set to false, instructs the Server not to send the SYNC notifications on this connection.
+    /// If omitted, the default is true.
+    pub fn get_send_sync(&self) -> bool {
+        self.send_sync
+    }
+
     /// Inquiry method that gets the maximum time allowed for attempts to recover the current session
     /// upon an interruption, after which a new session will be created. A 0 value also means that
     /// any attempt to recover the current session is prevented in the first place.
diff --git a/src/ls_client.rs b/src/ls_client.rs
index e3317f7..63cd04d 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -383,155 +383,166 @@ impl LightstreamerClient {
                 message = read_stream.next() => {
                     match message {
                         Some(Ok(Message::Text(text))) => {
-                            let clean_text = clean_message(&text);
-                            let message_fields: Vec<&str> = clean_text.split(",").collect();
-                            match *message_fields.first().unwrap_or(&"") {
-                                //
-                                // Errors from server.
-                                //
-                                "conerr" | "reqerr" => {
-                                    println!("Received connection error from server: {}", clean_text);
-                                    break;
-                                },
-                                //
-                                // Session created successfully.
-                                //
-                                "conok" => {
-                                    if let Some(session_id) = message_fields.get(1).as_deref() {
-                                        println!("Session creation confirmed by server: '{}'", clean_text);
-                                        println!("Session created with ID: {:?}", session_id);
-                                        //
-                                        // Subscribe to the desired items.
-                                        //
-                                        while let Some(subscription) = self.subscriptions.get(subscription_id) {
+                            // Messages could include multiple submessages separated by /r/n.
+                            // Split the message into submessages and process each one separately.
+                            let submessages: Vec<&str> = text.split("\r\n")
+                                .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
+                                .collect();
+                            for submessage in submessages {
+                                let clean_text = clean_message(&submessage);
+                                let submessage_fields: Vec<&str> = clean_text.split(",").collect();
+                                match *submessage_fields.first().unwrap_or(&"") {
+                                    //
+                                    // Errors from server.
+                                    //
+                                    "conerr" | "reqerr" => {
+                                        println!("Received connection error from server: {}", clean_text);
+                                        break;
+                                    },
+                                    //
+                                    // Session created successfully.
+                                    //
+                                    "conok" => {
+                                        if let Some(session_id) = submessage_fields.get(1).as_deref() {
+                                            println!("Session creation confirmed by server: '{}'", clean_text);
+                                            println!("Session created with ID: {:?}", session_id);
                                             //
-                                            // Gather all the necessary subscription parameters.
+                                            // Subscribe to the desired items.
                                             //
-                                            request_id += 1;
-                                            let ls_req_id = request_id.to_string();
-                                            subscription_id += 1;
-                                            let ls_sub_id = subscription_id.to_string();
-                                            let ls_mode = subscription.get_mode().to_string();
-                                            let ls_group = match subscription.get_item_group() {
-                                                Some(item_group) => item_group.to_string(),
-                                                None => match subscription.get_items() {
-                                                    Some(items) => {
-                                                        items.join(",")
+                                            while let Some(subscription) = self.subscriptions.get(subscription_id) {
+                                                //
+                                                // Gather all the necessary subscription parameters.
+                                                //
+                                                request_id += 1;
+                                                let ls_req_id = request_id.to_string();
+                                                subscription_id += 1;
+                                                let ls_sub_id = subscription_id.to_string();
+                                                let ls_mode = subscription.get_mode().to_string();
+                                                let ls_group = match subscription.get_item_group() {
+                                                    Some(item_group) => item_group.to_string(),
+                                                    None => match subscription.get_items() {
+                                                        Some(items) => {
+                                                            items.join(" ")
+                                                        },
+                                                        None => {
+                                                            return Err(Box::new(std::io::Error::new(
+                                                                std::io::ErrorKind::InvalidData,
+                                                                "No item group or items found in subscription.",
+                                                            )));
+                                                        },
                                                     },
-                                                    None => {
-                                                        return Err(Box::new(std::io::Error::new(
-                                                            std::io::ErrorKind::InvalidData,
-                                                            "No item group or items found in subscription.",
-                                                        )));
+                                                };
+                                                let ls_schema = match subscription.get_field_schema() {
+                                                    Some(field_schema) => field_schema.to_string(),
+                                                    None => match subscription.get_fields() {
+                                                        Some(fields) => {
+                                                            fields.join(" ")
+                                                        },
+                                                        None => {
+                                                            return Err(Box::new(std::io::Error::new(
+                                                                std::io::ErrorKind::InvalidData,
+                                                                "No field schema or fields found in subscription.",
+                                                            )));
+                                                        },
                                                     },
-                                                },
-                                            };
-                                            let ls_schema = match subscription.get_field_schema() {
-                                                Some(field_schema) => field_schema.to_string(),
-                                                None => match subscription.get_fields() {
-                                                    Some(fields) => {
-                                                        fields.join(",")
-                                                    },
-                                                    None => {
-                                                        return Err(Box::new(std::io::Error::new(
-                                                            std::io::ErrorKind::InvalidData,
-                                                            "No field schema or fields found in subscription.",
-                                                        )));
-                                                    },
-                                                },
-                                            };
-                                            let ls_data_adapter = match subscription.get_data_adapter() {
-                                                Some(data_adapter) => data_adapter.to_string(),
-                                                None => "".to_string(),
-                                            };
-                                            let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
-                                            //
-                                            // Prepare the subscription request.
-                                            //
-                                            let mut params: Vec<(&str, &str)> = vec![
-                                                //("LS_session", session_id),
-                                                ("LS_reqId", &ls_req_id),
-                                                ("LS_op", "add"),
-                                                ("LS_subId", &ls_sub_id),
-                                                ("LS_mode", &ls_mode),
-                                                ("LS_group", &ls_group),
-                                                ("LS_schema", &ls_schema),
-                                                ("LS_data_adapter", &ls_data_adapter),
-                                                ("LS_ack", "false"),
-                                            ];
-                                            if ls_snapshot != "" {
-                                                params.push(("LS_snapshot", &ls_snapshot));
+                                                };
+                                                let ls_data_adapter = match subscription.get_data_adapter() {
+                                                    Some(data_adapter) => data_adapter.to_string(),
+                                                    None => "".to_string(),
+                                                };
+                                                let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
+                                                //
+                                                // Prepare the subscription request.
+                                                //
+                                                let mut params: Vec<(&str, &str)> = vec![
+                                                    //("LS_session", session_id),
+                                                    ("LS_reqId", &ls_req_id),
+                                                    ("LS_op", "add"),
+                                                    ("LS_subId", &ls_sub_id),
+                                                    ("LS_mode", &ls_mode),
+                                                    ("LS_group", &ls_group),
+                                                    ("LS_schema", &ls_schema),
+                                                    ("LS_data_adapter", &ls_data_adapter),
+                                                    ("LS_ack", "false"),
+                                                ];
+                                                if ls_snapshot != "" {
+                                                    params.push(("LS_snapshot", &ls_snapshot));
+                                                }
+                                                let encoded_params = serde_urlencoded::to_string(&params)?;
+                                                write_stream
+                                                    .send(Message::Text(format!("control\r\n{}", encoded_params)))
+                                                    .await?;
+                                                println!("Sent subscription request: '{}'", encoded_params);
                                             }
-                                            let encoded_params = serde_urlencoded::to_string(&params)?;
-                                            write_stream
-                                                .send(Message::Text(format!("control\r\n{}", encoded_params)))
-                                                .await?;
-                                            println!("Sent subscription request: '{}'", encoded_params);
+                                        } else {
+                                            return Err(Box::new(std::io::Error::new(
+                                                std::io::ErrorKind::InvalidData,
+                                                "Session ID not found in 'conok' message from server",
+                                            )));
                                         }
-                                    } else {
+                                    },
+                                    //
+                                    // Notifications from server.
+                                    //
+                                    "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => {
+                                        println!("Received notification from server: {}", clean_text);
+                                        // Don't do anything with these notifications for now.
+                                    },
+                                    "probe" => {
+                                        println!("Received probe message from server: '{}'", clean_text);
+                                    },
+                                    //
+                                    // Subscription confirmation from server.
+                                    //
+                                    "subok" => {
+                                        println!("Subscription confirmed by server: '{}'", clean_text);
+                                    },
+                                    //
+                                    // Data updates from server.
+                                    //
+                                    "u" => {
+                                        println!("Received data update from server: '{}'", clean_text);
+                                    },
+                                    //
+                                    // Connection confirmation from server.
+                                    //
+                                    "wsok" => {
+                                        println!("Connection confirmed by server: '{}'", clean_text);
+                                        //
+                                        // Request session creation.
+                                        //
+                                        let ls_adapter_set = match self.connection_details.get_adapter_set() {
+                                            Some(adapter_set) => adapter_set,
+                                            None => {
+                                                return Err(Box::new(IllegalStateException::new(
+                                                    "No adapter set found in connection details.",
+                                                )));
+                                            },
+                                        };
+                                        let ls_send_sync = self.connection_options.get_send_sync().to_string();
+                                        println!("self.connection_options.get_send_sync(): {:?}", self.connection_options.get_send_sync());
+                                        let params: Vec<(&str, &str)> = vec![
+                                            ("LS_adapter_set", &ls_adapter_set),
+                                            ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
+                                            ("LS_send_sync", &ls_send_sync),
+                                            ("LS_protocol", "TLCP-2.5.0"),
+                                        ];
+                                        let encoded_params = serde_urlencoded::to_string(&params)?;
+                                        write_stream
+                                            .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
+                                            .await?;
+                                        println!("Sent create session request: '{}'", encoded_params);
+                                    },
+                                    unexpected_message => {
                                         return Err(Box::new(std::io::Error::new(
                                             std::io::ErrorKind::InvalidData,
-                                            "Session ID not found in 'conok' message from server",
+                                            format!(
+                                                "Unexpected message received from server: '{:?}'",
+                                                unexpected_message
+                                            ),
                                         )));
-                                    }
-                                },
-                                //
-                                // Notifications from server.
-                                //
-                                "cons" | "servname" | "clientip" => {
-                                    println!("Received notification from server: {}", clean_text);
-                                    // Don't do anything with these notifications for now.
-                                },
-                                "probe" => {
-                                    println!("Received probe message from server: '{}'", clean_text);
-                                },
-                                //
-                                // Subscription confirmation from server.
-                                //
-                                "subok" => {
-                                    println!("Subscription confirmed by server: '{}'", clean_text);
-                                },
-                                //
-                                // Data updates from server.
-                                //
-                                "u" => {
-                                    println!("Received data update from server: '{}'", clean_text);
-                                },
-                                //
-                                // Connection confirmation from server.
-                                //
-                                "wsok" => {
-                                    println!("Connection confirmed by server: '{}'", clean_text);
-                                    //
-                                    // Request session creation.
-                                    //
-                                    let ls_adapter_set = match self.connection_details.get_adapter_set() {
-                                        Some(adapter_set) => adapter_set,
-                                        None => {
-                                            return Err(Box::new(IllegalStateException::new(
-                                                "No adapter set found in connection details.",
-                                            )));
-                                        },
-                                    };
-                                    let params: Vec<(&str, &str)> = vec![
-                                        ("LS_adapter_set", &ls_adapter_set),
-                                        ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
-                                        ("LS_protocol", "TLCP-2.5.0"),
-                                    ];
-                                    let encoded_params = serde_urlencoded::to_string(&params)?;
-                                    write_stream
-                                        .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
-                                        .await?;
-                                },
-                                unexpected_message => {
-                                    return Err(Box::new(std::io::Error::new(
-                                        std::io::ErrorKind::InvalidData,
-                                        format!(
-                                            "Unexpected message received from server: '{:?}'",
-                                            unexpected_message
-                                        ),
-                                    )));
-                                },
+                                    },
+                                }
                             }
                         },
                         Some(Ok(non_text_message)) => {
@@ -700,7 +711,7 @@ impl LightstreamerClient {
         adapter_set: Option<&str>,
     ) -> Result<LightstreamerClient, IllegalStateException> {
         let connection_details = ConnectionDetails::new(server_address, adapter_set);
-        let connection_options = ConnectionOptions::new();
+        let connection_options = ConnectionOptions::default();
 
         Ok(LightstreamerClient {
             server_address: server_address.map(|s| s.to_string()),