(connection_options.rs): Add get_send_sync method for ConnectionOptions

♻️ (ls_client.rs): Refactor message processing to handle multiple submessages
🐛 (ls_client.rs): Fix default instantiation of ConnectionOptions using default method
This commit is contained in:
Daniel López Azaña 2024-04-04 15:39:55 +02:00
parent 2565f3be41
commit 023758b3b5
2 changed files with 159 additions and 141 deletions

View File

@ -239,6 +239,13 @@ impl ConnectionOptions {
self.reverse_heartbeat_interval self.reverse_heartbeat_interval
} }
/// Inquiry method that gets if LS_send_sync is to be sent to the server.
/// If set to false, instructs the Server not to send the SYNC notifications on this connection.
/// If omitted, the default is true.
pub fn get_send_sync(&self) -> bool {
self.send_sync
}
/// Inquiry method that gets the maximum time allowed for attempts to recover the current session /// Inquiry method that gets the maximum time allowed for attempts to recover the current session
/// upon an interruption, after which a new session will be created. A 0 value also means that /// upon an interruption, after which a new session will be created. A 0 value also means that
/// any attempt to recover the current session is prevented in the first place. /// any attempt to recover the current session is prevented in the first place.

View File

@ -383,155 +383,166 @@ impl LightstreamerClient {
message = read_stream.next() => { message = read_stream.next() => {
match message { match message {
Some(Ok(Message::Text(text))) => { Some(Ok(Message::Text(text))) => {
let clean_text = clean_message(&text); // Messages could include multiple submessages separated by /r/n.
let message_fields: Vec<&str> = clean_text.split(",").collect(); // Split the message into submessages and process each one separately.
match *message_fields.first().unwrap_or(&"") { let submessages: Vec<&str> = text.split("\r\n")
// .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
// Errors from server. .collect();
// for submessage in submessages {
"conerr" | "reqerr" => { let clean_text = clean_message(&submessage);
println!("Received connection error from server: {}", clean_text); let submessage_fields: Vec<&str> = clean_text.split(",").collect();
break; match *submessage_fields.first().unwrap_or(&"") {
}, //
// // Errors from server.
// Session created successfully. //
// "conerr" | "reqerr" => {
"conok" => { println!("Received connection error from server: {}", clean_text);
if let Some(session_id) = message_fields.get(1).as_deref() { break;
println!("Session creation confirmed by server: '{}'", clean_text); },
println!("Session created with ID: {:?}", session_id); //
// // Session created successfully.
// Subscribe to the desired items. //
// "conok" => {
while let Some(subscription) = self.subscriptions.get(subscription_id) { if let Some(session_id) = submessage_fields.get(1).as_deref() {
println!("Session creation confirmed by server: '{}'", clean_text);
println!("Session created with ID: {:?}", session_id);
// //
// Gather all the necessary subscription parameters. // Subscribe to the desired items.
// //
request_id += 1; while let Some(subscription) = self.subscriptions.get(subscription_id) {
let ls_req_id = request_id.to_string(); //
subscription_id += 1; // Gather all the necessary subscription parameters.
let ls_sub_id = subscription_id.to_string(); //
let ls_mode = subscription.get_mode().to_string(); request_id += 1;
let ls_group = match subscription.get_item_group() { let ls_req_id = request_id.to_string();
Some(item_group) => item_group.to_string(), subscription_id += 1;
None => match subscription.get_items() { let ls_sub_id = subscription_id.to_string();
Some(items) => { let ls_mode = subscription.get_mode().to_string();
items.join(",") let ls_group = match subscription.get_item_group() {
Some(item_group) => item_group.to_string(),
None => match subscription.get_items() {
Some(items) => {
items.join(" ")
},
None => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"No item group or items found in subscription.",
)));
},
}, },
None => { };
return Err(Box::new(std::io::Error::new( let ls_schema = match subscription.get_field_schema() {
std::io::ErrorKind::InvalidData, Some(field_schema) => field_schema.to_string(),
"No item group or items found in subscription.", None => match subscription.get_fields() {
))); Some(fields) => {
fields.join(" ")
},
None => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"No field schema or fields found in subscription.",
)));
},
}, },
}, };
}; let ls_data_adapter = match subscription.get_data_adapter() {
let ls_schema = match subscription.get_field_schema() { Some(data_adapter) => data_adapter.to_string(),
Some(field_schema) => field_schema.to_string(), None => "".to_string(),
None => match subscription.get_fields() { };
Some(fields) => { let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
fields.join(",") //
}, // Prepare the subscription request.
None => { //
return Err(Box::new(std::io::Error::new( let mut params: Vec<(&str, &str)> = vec![
std::io::ErrorKind::InvalidData, //("LS_session", session_id),
"No field schema or fields found in subscription.", ("LS_reqId", &ls_req_id),
))); ("LS_op", "add"),
}, ("LS_subId", &ls_sub_id),
}, ("LS_mode", &ls_mode),
}; ("LS_group", &ls_group),
let ls_data_adapter = match subscription.get_data_adapter() { ("LS_schema", &ls_schema),
Some(data_adapter) => data_adapter.to_string(), ("LS_data_adapter", &ls_data_adapter),
None => "".to_string(), ("LS_ack", "false"),
}; ];
let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); if ls_snapshot != "" {
// params.push(("LS_snapshot", &ls_snapshot));
// Prepare the subscription request. }
// let encoded_params = serde_urlencoded::to_string(&params)?;
let mut params: Vec<(&str, &str)> = vec![ write_stream
//("LS_session", session_id), .send(Message::Text(format!("control\r\n{}", encoded_params)))
("LS_reqId", &ls_req_id), .await?;
("LS_op", "add"), println!("Sent subscription request: '{}'", encoded_params);
("LS_subId", &ls_sub_id),
("LS_mode", &ls_mode),
("LS_group", &ls_group),
("LS_schema", &ls_schema),
("LS_data_adapter", &ls_data_adapter),
("LS_ack", "false"),
];
if ls_snapshot != "" {
params.push(("LS_snapshot", &ls_snapshot));
} }
let encoded_params = serde_urlencoded::to_string(&params)?; } else {
write_stream return Err(Box::new(std::io::Error::new(
.send(Message::Text(format!("control\r\n{}", encoded_params))) std::io::ErrorKind::InvalidData,
.await?; "Session ID not found in 'conok' message from server",
println!("Sent subscription request: '{}'", encoded_params); )));
} }
} else { },
//
// Notifications from server.
//
"conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => {
println!("Received notification from server: {}", clean_text);
// Don't do anything with these notifications for now.
},
"probe" => {
println!("Received probe message from server: '{}'", clean_text);
},
//
// Subscription confirmation from server.
//
"subok" => {
println!("Subscription confirmed by server: '{}'", clean_text);
},
//
// Data updates from server.
//
"u" => {
println!("Received data update from server: '{}'", clean_text);
},
//
// Connection confirmation from server.
//
"wsok" => {
println!("Connection confirmed by server: '{}'", clean_text);
//
// Request session creation.
//
let ls_adapter_set = match self.connection_details.get_adapter_set() {
Some(adapter_set) => adapter_set,
None => {
return Err(Box::new(IllegalStateException::new(
"No adapter set found in connection details.",
)));
},
};
let ls_send_sync = self.connection_options.get_send_sync().to_string();
println!("self.connection_options.get_send_sync(): {:?}", self.connection_options.get_send_sync());
let params: Vec<(&str, &str)> = vec![
("LS_adapter_set", &ls_adapter_set),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
("LS_send_sync", &ls_send_sync),
("LS_protocol", "TLCP-2.5.0"),
];
let encoded_params = serde_urlencoded::to_string(&params)?;
write_stream
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
.await?;
println!("Sent create session request: '{}'", encoded_params);
},
unexpected_message => {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
"Session ID not found in 'conok' message from server", format!(
"Unexpected message received from server: '{:?}'",
unexpected_message
),
))); )));
} },
}, }
//
// Notifications from server.
//
"cons" | "servname" | "clientip" => {
println!("Received notification from server: {}", clean_text);
// Don't do anything with these notifications for now.
},
"probe" => {
println!("Received probe message from server: '{}'", clean_text);
},
//
// Subscription confirmation from server.
//
"subok" => {
println!("Subscription confirmed by server: '{}'", clean_text);
},
//
// Data updates from server.
//
"u" => {
println!("Received data update from server: '{}'", clean_text);
},
//
// Connection confirmation from server.
//
"wsok" => {
println!("Connection confirmed by server: '{}'", clean_text);
//
// Request session creation.
//
let ls_adapter_set = match self.connection_details.get_adapter_set() {
Some(adapter_set) => adapter_set,
None => {
return Err(Box::new(IllegalStateException::new(
"No adapter set found in connection details.",
)));
},
};
let params: Vec<(&str, &str)> = vec![
("LS_adapter_set", &ls_adapter_set),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
("LS_protocol", "TLCP-2.5.0"),
];
let encoded_params = serde_urlencoded::to_string(&params)?;
write_stream
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
.await?;
},
unexpected_message => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Unexpected message received from server: '{:?}'",
unexpected_message
),
)));
},
} }
}, },
Some(Ok(non_text_message)) => { Some(Ok(non_text_message)) => {
@ -700,7 +711,7 @@ impl LightstreamerClient {
adapter_set: Option<&str>, adapter_set: Option<&str>,
) -> Result<LightstreamerClient, IllegalStateException> { ) -> Result<LightstreamerClient, IllegalStateException> {
let connection_details = ConnectionDetails::new(server_address, adapter_set); let connection_details = ConnectionDetails::new(server_address, adapter_set);
let connection_options = ConnectionOptions::new(); let connection_options = ConnectionOptions::default();
Ok(LightstreamerClient { Ok(LightstreamerClient {
server_address: server_address.map(|s| s.to_string()), server_address: server_address.map(|s| s.to_string()),