aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-03 20:55:19 +0200
committerLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-04-03 20:55:19 +0200
commitc094bf9c8bad3db0d6ae9b53f9087e081a3ba63b (patch)
treed321e6f22b7bdaaa61c271d9931dcf52cd95e14b
parentfacada6f8d5aecb5ce0bc2a13042622f93f15807 (diff)
♻️ (ls_client.rs): refactor base_params to Vec for consistent param handling
✨ (ls_client.rs): add conditional adapter_set param to base_params ♻️ (ls_client.rs): remove redundant adapter_set insertion ♻️ (ls_client.rs): refactor message processing loop for clarity and extensibility 🔧 (main.rs): extract MAX_CONNECTION_ATTEMPTS as constant for better configurability ✨ (main.rs): change adapter_set from "DEMO" to "QUOTE_ADAPTER" for client initialization
-rw-r--r--src/ls_client.rs224
-rw-r--r--src/main.rs6
2 files changed, 135 insertions, 95 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs
index 51e4786..23b9319 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -206,21 +206,20 @@ impl LightstreamerClient {
)));
}
- let mut base_params = HashMap::new();
+ let mut base_params: Vec<(&str, &str)> = Vec::new();
//
// Build the request base parameters.
//
+ if let Some(adapter_set) = &self.adapter_set {
+ base_params.push(("LS_adapter_set", adapter_set));
+ }
base_params.extend([
("LS_protocol", "TLCP-2.5.0"),
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
]);
- if let Some(adapter_set) = self.connection_details.get_adapter_set() {
- base_params.insert("LS_adapter_set", adapter_set);
- }
-
//
// Add optional parameters
//
@@ -235,9 +234,6 @@ impl LightstreamerClient {
params.insert("LS_password", password);
}
- if let Some(adapter_set) = &self.adapter_set {
- params.insert("LS_adapter_set", adapter_set);
- }
if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() {
params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string());
@@ -382,68 +378,143 @@ impl LightstreamerClient {
//
// Confirm the connection by sending a 'wsok' message to the server.
//
+
+ //
+ // Initiate communication with the server by sending a 'wsok' message.
+ //
write_stream
.send(Message::Text("wsok".into()))
.await?;
- if let Some(result) = read_stream.next().await {
- match result? {
- Message::Text(text) => {
- let clean_text = clean_message(&text);
- if clean_text == "wsok" {
- println!("Connection confirmed by server");
- } else {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Unexpected message received from server: {}", clean_text),
- )));
- }
- }
- non_text_message => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Unexpected non-text message from server: {:?}",
- non_text_message
- ),
- )));
- }
- }
- }
//
- // Session creation.
+ // Start reading and processing messages from the server.
//
+ let mut request_id: usize = 0;
let mut session_id: Option<String> = None;
- let encoded_params = serde_urlencoded::to_string(&base_params)?;
- write_stream
- .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
- .await?;
- if let Some(result) = read_stream.next().await {
- match result? {
- Message::Text(text) => {
- let clean_text = clean_message(&text);
- if clean_text.starts_with("conok") {
- let session_info: Vec<&str> = clean_text.split(",").collect();
- session_id = session_info.get(1).map(|s| s.to_string());
- } else {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Unexpected message received from server: {}", clean_text),
- )));
+ let mut subscription_id: usize = 0;
+ loop {
+ tokio::select! {
+ message = read_stream.next() => {
+ match message {
+ Some(Ok(Message::Text(text))) => {
+ let clean_text = clean_message(&text);
+ let message_fields: Vec<&str> = clean_text.split(",").collect();
+ match *message_fields.first().unwrap_or(&"") {
+ //
+ // Connection confirmation from server.
+ //
+ "wsok" => {
+ println!("Connection confirmed by server");
+ //
+ // Request session creation.
+ //
+ let mut params: Vec<(&str, &str)> = vec![];
+ if let Some(adapter_set) = self.connection_details.get_adapter_set() {
+ params.push(("LS_adapter_set", adapter_set));
+ }
+ params.extend(base_params.iter().cloned());
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ write_stream
+ .send(Message::Text(format!("create_session\r\n{}\n", encoded_params)))
+ .await?;
+ },
+ //
+ // Notifications from server.
+ //
+ "cons" | "servname" | "clientip" => {
+ println!("Received notification from server: {}", clean_text);
+ // Don't do anything with these notifications for now.
+ },
+ //
+ // Session created successfully.
+ //
+ "conok" => {
+ if let Some(session_id) = message_fields.get(1).as_deref() {
+ println!("Session created with ID: {:?}", session_id);
+ //
+ // Subscribe to the desired items.
+ //
+ /*
+ let ls_item_group = self.get_subscriptions().iter().map(|subscription| {
+ format!(
+ "{},{},{}",
+ subscription.get_mode().to_string(),
+ subscription.get_items().join(","),
+ subscription.get_fields().join(",")
+ )
+ }).collect::<Vec<String>>().join(";");
+ */
+ request_id += 1;
+ let ls_req_id = request_id.to_string();
+ subscription_id += 1;
+ let ls_sub_id = subscription_id.to_string();
+ let mut params: Vec<(&str, &str)> = vec![
+ ("LS_session", *session_id),
+ ("LS_reqId", &ls_req_id),
+ ("LS_subId", &ls_sub_id),
+ ("LS_op", "add"),
+ ("LS_table", "1"),
+ ("LS_id", "1"),
+ ("LS_mode", "MERGE"),
+ ("LS_schema", "stock_name,last_price"),
+ ("LS_snapshot", "true"),
+ ];
+ params.extend(base_params.iter().cloned());
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+ println!("Encoded params: {}", encoded_params);
+ write_stream
+ .send(Message::Text(format!("control\r\n{}\n", encoded_params)))
+ .await?;
+ } else {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "Session ID not found in 'conok' message from server",
+ )));
+ }
+ },
+ "probe" => {
+ println!("Received probe message from server: '{}'", clean_text);
+ },
+ unexpected_message => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!(
+ "Unexpected message received from server: '{}'",
+ unexpected_message
+ ),
+ )));
+ },
+ }
+ },
+ Some(Ok(non_text_message)) => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!(
+ "Unexpected non-text message from server: {:?}",
+ non_text_message
+ ),
+ )));
+ },
+ Some(Err(err)) => {
+ return Err(Box::new(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("Error reading message from server: {}", err),
+ )));
+ },
+ None => {
+ println!("No more messages from server");
+ break;
+ },
}
- }
- non_text_message => {
- return Err(Box::new(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Unexpected non-text message from server: {:?}",
- non_text_message
- ),
- )));
- }
+ },
+ _ = shutdown_signal.notified() => {
+ println!("Received shutdown signal");
+ break;
+ },
}
}
+ /*
//
// Perform subscription.
//
@@ -487,40 +558,7 @@ impl LightstreamerClient {
}
}
}
-
-
- // Listen for messages from the server
- loop {
- tokio::select! {
- message = read_stream.next() => {
- match message {
- Some(Ok(Message::Text(text))) => {
- println!("Received message from server: {}", text);
- },
- Some(Ok(non_text_message)) => {
- println!("Received non-text message from server: {:?}", non_text_message);
- },
- Some(Err(err)) => {
- return Err(Box::new(
- std::io::Error::new(std::io::ErrorKind::InvalidData, format!(
- "Error reading message from server: {}",
- err
- )),
- ));
- },
- None => {
- println!("No more messages from server");
- break;
- },
- }
- },
- _ = shutdown_signal.notified() => {
- println!("Received shutdown signal");
- break;
- },
- }
- }
- }
+ */
println!("Ending function connect() to Lightstreamer server");
diff --git a/src/main.rs b/src/main.rs
index 8431333..c95f9e8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,6 +16,8 @@ use std::sync::Arc;
use tokio::sync::{Notify, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+const MAX_CONNECTION_ATTEMPTS: u64 = 1;
+
/// Sets up a signal hook for SIGINT and SIGTERM.
///
/// Creates a signal hook for the specified signals and spawns a thread to handle them.
@@ -79,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a new Lightstreamer client instance and wrap it in an Arc<Mutex<>> so it can be shared across threads.
let client = Arc::new(Mutex::new(LightstreamerClient::new(
Some("http://push.lightstreamer.com/lightstreamer"),
- Some("DEMO"),
+ Some("QUOTE_ADAPTER"),
)?));
//
@@ -102,7 +104,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
//
let mut retry_interval_milis: u64 = 0;
let mut retry_counter: u64 = 0;
- while retry_counter < 5 {
+ while retry_counter < MAX_CONNECTION_ATTEMPTS {
let mut client = client.lock().await;
match client.connect(Arc::clone(&shutdown_signal)).await {
Ok(_) => {