diff options
author | 2024-04-04 15:39:55 +0200 | |
---|---|---|
committer | 2024-04-04 15:39:55 +0200 | |
commit | 023758b3b5892faf0f82fff6013bcf21b1198756 (patch) | |
tree | 255edbd8d8727a854477c0fcdbd5d002f982b318 | |
parent | 2565f3be410c71c35e0d70bbea1c7be627f4ecd2 (diff) |
✨ (connection_options.rs): Add get_send_sync method for ConnectionOptions
♻️ (ls_client.rs): Refactor message processing to handle multiple submessages
🐛 (ls_client.rs): Fix default instantiation of ConnectionOptions using default method
-rw-r--r-- | src/connection_options.rs | 7 | ||||
-rw-r--r-- | src/ls_client.rs | 293 |
2 files changed, 159 insertions, 141 deletions
diff --git a/src/connection_options.rs b/src/connection_options.rs index e3218df..e9b3ab0 100644 --- a/src/connection_options.rs +++ b/src/connection_options.rs @@ -239,6 +239,13 @@ impl ConnectionOptions { self.reverse_heartbeat_interval } + /// Inquiry method that gets if LS_send_sync is to be sent to the server. + /// If set to false, instructs the Server not to send the SYNC notifications on this connection. + /// If omitted, the default is true. + pub fn get_send_sync(&self) -> bool { + self.send_sync + } + /// Inquiry method that gets the maximum time allowed for attempts to recover the current session /// upon an interruption, after which a new session will be created. A 0 value also means that /// any attempt to recover the current session is prevented in the first place. diff --git a/src/ls_client.rs b/src/ls_client.rs index e3317f7..63cd04d 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -383,155 +383,166 @@ impl LightstreamerClient { message = read_stream.next() => { match message { Some(Ok(Message::Text(text))) => { - let clean_text = clean_message(&text); - let message_fields: Vec<&str> = clean_text.split(",").collect(); - match *message_fields.first().unwrap_or(&"") { - // - // Errors from server. - // - "conerr" | "reqerr" => { - println!("Received connection error from server: {}", clean_text); - break; - }, - // - // Session created successfully. - // - "conok" => { - if let Some(session_id) = message_fields.get(1).as_deref() { - println!("Session creation confirmed by server: '{}'", clean_text); - println!("Session created with ID: {:?}", session_id); - // - // Subscribe to the desired items. - // - while let Some(subscription) = self.subscriptions.get(subscription_id) { + // Messages could include multiple submessages separated by /r/n. + // Split the message into submessages and process each one separately. + let submessages: Vec<&str> = text.split("\r\n") + .filter(|&line| !line.trim().is_empty()) // Filter out empty lines. + .collect(); + for submessage in submessages { + let clean_text = clean_message(&submessage); + let submessage_fields: Vec<&str> = clean_text.split(",").collect(); + match *submessage_fields.first().unwrap_or(&"") { + // + // Errors from server. + // + "conerr" | "reqerr" => { + println!("Received connection error from server: {}", clean_text); + break; + }, + // + // Session created successfully. + // + "conok" => { + if let Some(session_id) = submessage_fields.get(1).as_deref() { + println!("Session creation confirmed by server: '{}'", clean_text); + println!("Session created with ID: {:?}", session_id); // - // Gather all the necessary subscription parameters. + // Subscribe to the desired items. // - request_id += 1; - let ls_req_id = request_id.to_string(); - subscription_id += 1; - let ls_sub_id = subscription_id.to_string(); - let ls_mode = subscription.get_mode().to_string(); - let ls_group = match subscription.get_item_group() { - Some(item_group) => item_group.to_string(), - None => match subscription.get_items() { - Some(items) => { - items.join(",") + while let Some(subscription) = self.subscriptions.get(subscription_id) { + // + // Gather all the necessary subscription parameters. + // + request_id += 1; + let ls_req_id = request_id.to_string(); + subscription_id += 1; + let ls_sub_id = subscription_id.to_string(); + let ls_mode = subscription.get_mode().to_string(); + let ls_group = match subscription.get_item_group() { + Some(item_group) => item_group.to_string(), + None => match subscription.get_items() { + Some(items) => { + items.join(" ") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No item group or items found in subscription.", + ))); + }, }, - None => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "No item group or items found in subscription.", - ))); + }; + let ls_schema = match subscription.get_field_schema() { + Some(field_schema) => field_schema.to_string(), + None => match subscription.get_fields() { + Some(fields) => { + fields.join(" ") + }, + None => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No field schema or fields found in subscription.", + ))); + }, }, - }, - }; - let ls_schema = match subscription.get_field_schema() { - Some(field_schema) => field_schema.to_string(), - None => match subscription.get_fields() { - Some(fields) => { - fields.join(",") - }, - None => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "No field schema or fields found in subscription.", - ))); - }, - }, - }; - let ls_data_adapter = match subscription.get_data_adapter() { - Some(data_adapter) => data_adapter.to_string(), - None => "".to_string(), - }; - let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); - // - // Prepare the subscription request. - // - let mut params: Vec<(&str, &str)> = vec![ - //("LS_session", session_id), - ("LS_reqId", &ls_req_id), - ("LS_op", "add"), - ("LS_subId", &ls_sub_id), - ("LS_mode", &ls_mode), - ("LS_group", &ls_group), - ("LS_schema", &ls_schema), - ("LS_data_adapter", &ls_data_adapter), - ("LS_ack", "false"), - ]; - if ls_snapshot != "" { - params.push(("LS_snapshot", &ls_snapshot)); + }; + let ls_data_adapter = match subscription.get_data_adapter() { + Some(data_adapter) => data_adapter.to_string(), + None => "".to_string(), + }; + let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); + // + // Prepare the subscription request. + // + let mut params: Vec<(&str, &str)> = vec![ + //("LS_session", session_id), + ("LS_reqId", &ls_req_id), + ("LS_op", "add"), + ("LS_subId", &ls_sub_id), + ("LS_mode", &ls_mode), + ("LS_group", &ls_group), + ("LS_schema", &ls_schema), + ("LS_data_adapter", &ls_data_adapter), + ("LS_ack", "false"), + ]; + if ls_snapshot != "" { + params.push(("LS_snapshot", &ls_snapshot)); + } + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("control\r\n{}", encoded_params))) + .await?; + println!("Sent subscription request: '{}'", encoded_params); } - let encoded_params = serde_urlencoded::to_string(¶ms)?; - write_stream - .send(Message::Text(format!("control\r\n{}", encoded_params))) - .await?; - println!("Sent subscription request: '{}'", encoded_params); + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Session ID not found in 'conok' message from server", + ))); } - } else { + }, + // + // Notifications from server. + // + "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => { + println!("Received notification from server: {}", clean_text); + // Don't do anything with these notifications for now. + }, + "probe" => { + println!("Received probe message from server: '{}'", clean_text); + }, + // + // Subscription confirmation from server. + // + "subok" => { + println!("Subscription confirmed by server: '{}'", clean_text); + }, + // + // Data updates from server. + // + "u" => { + println!("Received data update from server: '{}'", clean_text); + }, + // + // Connection confirmation from server. + // + "wsok" => { + println!("Connection confirmed by server: '{}'", clean_text); + // + // Request session creation. + // + let ls_adapter_set = match self.connection_details.get_adapter_set() { + Some(adapter_set) => adapter_set, + None => { + return Err(Box::new(IllegalStateException::new( + "No adapter set found in connection details.", + ))); + }, + }; + let ls_send_sync = self.connection_options.get_send_sync().to_string(); + println!("self.connection_options.get_send_sync(): {:?}", self.connection_options.get_send_sync()); + let params: Vec<(&str, &str)> = vec![ + ("LS_adapter_set", &ls_adapter_set), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), + ("LS_send_sync", &ls_send_sync), + ("LS_protocol", "TLCP-2.5.0"), + ]; + let encoded_params = serde_urlencoded::to_string(¶ms)?; + write_stream + .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) + .await?; + println!("Sent create session request: '{}'", encoded_params); + }, + unexpected_message => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, - "Session ID not found in 'conok' message from server", + format!( + "Unexpected message received from server: '{:?}'", + unexpected_message + ), ))); - } - }, - // - // Notifications from server. - // - "cons" | "servname" | "clientip" => { - println!("Received notification from server: {}", clean_text); - // Don't do anything with these notifications for now. - }, - "probe" => { - println!("Received probe message from server: '{}'", clean_text); - }, - // - // Subscription confirmation from server. - // - "subok" => { - println!("Subscription confirmed by server: '{}'", clean_text); - }, - // - // Data updates from server. - // - "u" => { - println!("Received data update from server: '{}'", clean_text); - }, - // - // Connection confirmation from server. - // - "wsok" => { - println!("Connection confirmed by server: '{}'", clean_text); - // - // Request session creation. - // - let ls_adapter_set = match self.connection_details.get_adapter_set() { - Some(adapter_set) => adapter_set, - None => { - return Err(Box::new(IllegalStateException::new( - "No adapter set found in connection details.", - ))); - }, - }; - let params: Vec<(&str, &str)> = vec![ - ("LS_adapter_set", &ls_adapter_set), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), - ("LS_protocol", "TLCP-2.5.0"), - ]; - let encoded_params = serde_urlencoded::to_string(¶ms)?; - write_stream - .send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) - .await?; - }, - unexpected_message => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Unexpected message received from server: '{:?}'", - unexpected_message - ), - ))); - }, + }, + } } }, Some(Ok(non_text_message)) => { @@ -700,7 +711,7 @@ impl LightstreamerClient { adapter_set: Option<&str>, ) -> Result<LightstreamerClient, IllegalStateException> { let connection_details = ConnectionDetails::new(server_address, adapter_set); - let connection_options = ConnectionOptions::new(); + let connection_options = ConnectionOptions::default(); Ok(LightstreamerClient { server_address: server_address.map(|s| s.to_string()), |