♻️ (ls_client.rs): refactor base_params to Vec for consistent param handling

 (ls_client.rs): add conditional adapter_set param to base_params
♻️ (ls_client.rs): remove redundant adapter_set insertion
♻️ (ls_client.rs): refactor message processing loop for clarity and extensibility
🔧 (main.rs): extract MAX_CONNECTION_ATTEMPTS as constant for better configurability
 (main.rs): change adapter_set from "DEMO" to "QUOTE_ADAPTER" for client initialization
This commit is contained in:
Daniel López Azaña 2024-04-03 20:55:19 +02:00
parent facada6f8d
commit c094bf9c8b
2 changed files with 135 additions and 95 deletions

View File

@ -206,21 +206,20 @@ impl LightstreamerClient {
))); )));
} }
let mut base_params = HashMap::new(); let mut base_params: Vec<(&str, &str)> = Vec::new();
// //
// Build the request base parameters. // Build the request base parameters.
// //
if let Some(adapter_set) = &self.adapter_set {
base_params.push(("LS_adapter_set", adapter_set));
}
base_params.extend([ base_params.extend([
("LS_protocol", "TLCP-2.5.0"), ("LS_protocol", "TLCP-2.5.0"),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
]); ]);
if let Some(adapter_set) = self.connection_details.get_adapter_set() {
base_params.insert("LS_adapter_set", adapter_set);
}
// //
// Add optional parameters // Add optional parameters
// //
@ -235,9 +234,6 @@ impl LightstreamerClient {
params.insert("LS_password", password); params.insert("LS_password", password);
} }
if let Some(adapter_set) = &self.adapter_set {
params.insert("LS_adapter_set", adapter_set);
}
if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() { if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() {
params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string()); params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string());
@ -382,57 +378,115 @@ impl LightstreamerClient {
// //
// Confirm the connection by sending a 'wsok' message to the server. // Confirm the connection by sending a 'wsok' message to the server.
// //
//
// Initiate communication with the server by sending a 'wsok' message.
//
write_stream write_stream
.send(Message::Text("wsok".into())) .send(Message::Text("wsok".into()))
.await?; .await?;
if let Some(result) = read_stream.next().await {
match result? {
Message::Text(text) => {
let clean_text = clean_message(&text);
if clean_text == "wsok" {
println!("Connection confirmed by server");
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unexpected message received from server: {}", clean_text),
)));
}
}
non_text_message => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Unexpected non-text message from server: {:?}",
non_text_message
),
)));
}
}
}
// //
// Session creation. // Start reading and processing messages from the server.
// //
let mut request_id: usize = 0;
let mut session_id: Option<String> = None; let mut session_id: Option<String> = None;
let encoded_params = serde_urlencoded::to_string(&base_params)?; let mut subscription_id: usize = 0;
loop {
tokio::select! {
message = read_stream.next() => {
match message {
Some(Ok(Message::Text(text))) => {
let clean_text = clean_message(&text);
let message_fields: Vec<&str> = clean_text.split(",").collect();
match *message_fields.first().unwrap_or(&"") {
//
// Connection confirmation from server.
//
"wsok" => {
println!("Connection confirmed by server");
//
// Request session creation.
//
let mut params: Vec<(&str, &str)> = vec![];
if let Some(adapter_set) = self.connection_details.get_adapter_set() {
params.push(("LS_adapter_set", adapter_set));
}
params.extend(base_params.iter().cloned());
let encoded_params = serde_urlencoded::to_string(&params)?;
write_stream write_stream
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params))) .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
.await?; .await?;
if let Some(result) = read_stream.next().await { },
match result? { //
Message::Text(text) => { // Notifications from server.
let clean_text = clean_message(&text); //
if clean_text.starts_with("conok") { "cons" | "servname" | "clientip" => {
let session_info: Vec<&str> = clean_text.split(",").collect(); println!("Received notification from server: {}", clean_text);
session_id = session_info.get(1).map(|s| s.to_string()); // Don't do anything with these notifications for now.
},
//
// Session created successfully.
//
"conok" => {
if let Some(session_id) = message_fields.get(1).as_deref() {
println!("Session created with ID: {:?}", session_id);
//
// Subscribe to the desired items.
//
/*
let ls_item_group = self.get_subscriptions().iter().map(|subscription| {
format!(
"{},{},{}",
subscription.get_mode().to_string(),
subscription.get_items().join(","),
subscription.get_fields().join(",")
)
}).collect::<Vec<String>>().join(";");
*/
request_id += 1;
let ls_req_id = request_id.to_string();
subscription_id += 1;
let ls_sub_id = subscription_id.to_string();
let mut params: Vec<(&str, &str)> = vec![
("LS_session", *session_id),
("LS_reqId", &ls_req_id),
("LS_subId", &ls_sub_id),
("LS_op", "add"),
("LS_table", "1"),
("LS_id", "1"),
("LS_mode", "MERGE"),
("LS_schema", "stock_name,last_price"),
("LS_snapshot", "true"),
];
params.extend(base_params.iter().cloned());
let encoded_params = serde_urlencoded::to_string(&params)?;
println!("Encoded params: {}", encoded_params);
write_stream
.send(Message::Text(format!("control\r\n{}\n", encoded_params)))
.await?;
} else { } else {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
format!("Unexpected message received from server: {}", clean_text), "Session ID not found in 'conok' message from server",
))); )));
} }
},
"probe" => {
println!("Received probe message from server: '{}'", clean_text);
},
unexpected_message => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Unexpected message received from server: '{}'",
unexpected_message
),
)));
},
} }
non_text_message => { },
Some(Ok(non_text_message)) => {
return Err(Box::new(std::io::Error::new( return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
format!( format!(
@ -440,10 +494,27 @@ impl LightstreamerClient {
non_text_message non_text_message
), ),
))); )));
},
Some(Err(err)) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Error reading message from server: {}", err),
)));
},
None => {
println!("No more messages from server");
break;
},
} }
},
_ = shutdown_signal.notified() => {
println!("Received shutdown signal");
break;
},
} }
} }
/*
// //
// Perform subscription. // Perform subscription.
// //
@ -487,40 +558,7 @@ impl LightstreamerClient {
} }
} }
} }
*/
// Listen for messages from the server
loop {
tokio::select! {
message = read_stream.next() => {
match message {
Some(Ok(Message::Text(text))) => {
println!("Received message from server: {}", text);
},
Some(Ok(non_text_message)) => {
println!("Received non-text message from server: {:?}", non_text_message);
},
Some(Err(err)) => {
return Err(Box::new(
std::io::Error::new(std::io::ErrorKind::InvalidData, format!(
"Error reading message from server: {}",
err
)),
));
},
None => {
println!("No more messages from server");
break;
},
}
},
_ = shutdown_signal.notified() => {
println!("Received shutdown signal");
break;
},
}
}
}
println!("Ending function connect() to Lightstreamer server"); println!("Ending function connect() to Lightstreamer server");

View File

@ -16,6 +16,8 @@ use std::sync::Arc;
use tokio::sync::{Notify, Mutex}; use tokio::sync::{Notify, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
const MAX_CONNECTION_ATTEMPTS: u64 = 1;
/// Sets up a signal hook for SIGINT and SIGTERM. /// Sets up a signal hook for SIGINT and SIGTERM.
/// ///
/// Creates a signal hook for the specified signals and spawns a thread to handle them. /// Creates a signal hook for the specified signals and spawns a thread to handle them.
@ -79,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads. // Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
let client = Arc::new(Mutex::new(LightstreamerClient::new( let client = Arc::new(Mutex::new(LightstreamerClient::new(
Some("http://push.lightstreamer.com/lightstreamer"), Some("http://push.lightstreamer.com/lightstreamer"),
Some("DEMO"), Some("QUOTE_ADAPTER"),
)?)); )?));
// //
@ -102,7 +104,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// //
let mut retry_interval_milis: u64 = 0; let mut retry_interval_milis: u64 = 0;
let mut retry_counter: u64 = 0; let mut retry_counter: u64 = 0;
while retry_counter < 5 { while retry_counter < MAX_CONNECTION_ATTEMPTS {
let mut client = client.lock().await; let mut client = client.lock().await;
match client.connect(Arc::clone(&shutdown_signal)).await { match client.connect(Arc::clone(&shutdown_signal)).await {
Ok(_) => { Ok(_) => {