aboutsummaryrefslogtreecommitdiff
path: root/src/ls_client.rs
diff options
context:
space:
mode:
authorLibravatar daniloaz <daniloaz@gmail.com>2024-04-12 20:38:43 +0200
committerLibravatar daniloaz <daniloaz@gmail.com>2024-04-12 20:38:43 +0200
commit82a9d2f0708c7f3754d5c40a60fa291bb320301b (patch)
treea36179f1eb02b7691f28ea544ccf25df646c9021 /src/ls_client.rs
parent7d7f380e3075be51198f0ad457cc766f0641d984 (diff)
Implemented part of the item update logic.
Diffstat (limited to 'src/ls_client.rs')
-rw-r--r--src/ls_client.rs226
1 files changed, 223 insertions, 3 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs
index a463216..64b4a4f 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -3,11 +3,13 @@ use crate::client_message_listener::ClientMessageListener;
use crate::connection_details::ConnectionDetails;
use crate::connection_options::ConnectionOptions;
use crate::error::IllegalStateException;
-use crate::subscription::Subscription;
+use crate::item_update::ItemUpdate;
+use crate::subscription::{Snapshot, Subscription, SubscriptionMode};
use crate::util::*;
use cookie::Cookie;
use futures_util::{SinkExt, StreamExt};
+use std::collections::HashMap;
use std::error::Error;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
@@ -126,7 +128,7 @@ impl LightstreamerClient {
// Constants for WebSocket connection.
//
pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
- pub const SEC_WEBSOCKET_PROTOCOL: &'static str ="TLCP-2.4.0.lightstreamer.com";
+ pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
@@ -313,6 +315,7 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
+ let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new();
loop {
tokio::select! {
message = read_stream.next() => {
@@ -443,8 +446,225 @@ impl LightstreamerClient {
//
"u" => {
println!("Received data update from server: '{}'", clean_text);
+ // Parse arguments from the received message.
+ let arguments = clean_text.split(",").collect::<Vec<&str>>();
+ //
+ // Extract the subscription from the first argument.
+ //
+ let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
+ let subscription = match self.get_subscriptions().get(subscription_index-1) {
+ Some(subscription) => subscription,
+ None => {
+ println!("Subscription not found for index: {}", subscription_index);
+ continue;
+ }
+ };
+ //
+ // Extract the item from the second argument.
+ //
+ let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
+ let item = match subscription.get_items() {
+ Some(items) => items.get(item_index),
+ None => {
+ println!("No items found in subscription: {:?}", subscription);
+ continue;
+ }
+ };
+ //
+ // Determine if the update is a snapshot or real-time update based on the subscription parameters.
+ //
+ let is_snapshot = match subscription.get_requested_snapshot() {
+ Some(ls_snapshot) => {
+ match ls_snapshot {
+ Snapshot::No => false,
+ Snapshot::Yes => {
+ match subscription.get_mode() {
+ SubscriptionMode::Merge => {
+ if arguments.len() == 4 && arguments[3] == "$" {
+ // EOS notification received
+ true
+ } else {
+ // If item doesn't exist in item_updates yet, the first update
+ // is always a snapshot.
+ if let Some(item_updates) = item_updates.get(subscription_index) {
+ if let Some(_) = item_updates.get(item_index) {
+ false
+ } else {
+ true
+ }
+ } else {
+ true
+ }
+ }
+ },
+ SubscriptionMode::Distinct | SubscriptionMode::Command => {
+ if !subscription.is_subscribed() {
+ true
+ } else {
+ false
+ }
+ },
+ _ => false,
+ }
+ },
+ _ => false,
+ }
+ },
+ None => false,
+ };
- },
+ // Extract the field values from the third argument.
+ let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
+
+ //
+ // Get fields from subscription and create a HashMap of field names and values.
+ //
+ let subscription_fields = subscription.get_fields();
+ let mut field_map: HashMap<String, Option<String>> = subscription_fields
+ .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
+ .unwrap_or_default();
+
+ let mut field_index = 0;
+ for value in field_values {
+ match value {
+ "" => {
+ // An empty value means the field is unchanged compared to the previous update of the same field.
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
+ field_map.insert(field_name.to_string(), None);
+ }
+ field_index += 1;
+ }
+ "#" | "$" => {
+ // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
+ field_map.insert(field_name.to_string(), Some("".to_string()));
+ }
+ field_index += 1;
+ }
+ value if value.starts_with('^') => {
+ let command = value.chars().nth(1).unwrap_or(' ');
+ match command {
+ '0'..='9' => {
+ let count = value[1..].parse().unwrap_or(0);
+ for i in 0..count {
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
+ field_map.insert(field_name.to_string(), None);
+ }
+ }
+ field_index += count;
+ }
+ 'P' | 'T' => {
+ let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
+ if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
+ let new_value = match command {
+ 'P' => {
+ // Apply JSON Patch
+ let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
+ let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
+ let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
+ let _ = json_patch::patch(&mut prev_json, &patch_operations);
+ prev_json.to_string()
+ }
+ 'T' => {
+ // Apply TLCP-diff
+ //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
+ unimplemented!("Implement TLCP-diff");
+ }
+ _ => unreachable!(),
+ };
+ field_map.insert(field_name.to_string(), Some(new_value.to_string()));
+ }
+ }
+ field_index += 1;
+ }
+ _ => {
+ let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
+ field_map.insert(field_name.to_string(), Some(decoded_value));
+ }
+ field_index += 1;
+ }
+ }
+ }
+ _ => {
+ let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
+ if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
+ field_map.insert(field_name.to_string(), Some(decoded_value));
+ }
+ field_index += 1;
+ }
+ }
+ }
+
+ //println!("Field values: {:?}", field_map);
+
+ let changed_fields: HashMap<String, Option<String>> = field_map.iter()
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect();
+ //println!("Changed fields: {:?}", changed_fields);
+
+ //
+ // Take the proper item_update from item_updates and update it with changed fields.
+ // If the item_update doesn't exist yet, create a new one.
+ //
+ match item_updates.get_mut(subscription_index) {
+ Some(item_updates) => match item_updates.get_mut(item_index) {
+ Some(item_update) => {
+ //
+ // Iterate changed_fields and update existing item_update.fields assigning the new values.
+ //
+ for (field_name, new_value) in &changed_fields {
+ if item_update.fields.contains_key(field_name) {
+ item_update.fields.insert((*field_name).clone(), new_value.clone());
+ }
+ }
+ item_update.changed_fields = changed_fields;
+ },
+ None => {
+ // Create a new item_update and add it to item_updates.
+ let item_update = ItemUpdate {
+ item_name: item.cloned(),
+ item_pos: item_index,
+ fields: field_map,
+ changed_fields: changed_fields,
+ is_snapshot: is_snapshot,
+ };
+ item_updates.push(item_update);
+ }
+ },
+ None => {
+ // Create a new item_update and add it to item_updates.
+ let item_update = ItemUpdate {
+ item_name: item.cloned(),
+ item_pos: item_index,
+ fields: field_map,
+ changed_fields: changed_fields,
+ is_snapshot: is_snapshot,
+ };
+ item_updates.push(vec![item_update]);
+ }
+ };
+
+ //println!("Item updates: {:?}", item_updates);
+
+ println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap());
+ println!("\n\n");
+
+ if item_updates.len() >= 3 {
+ return Ok(());
+ }
+
+ /*
+ if let Some(item_updates) = item_updates.get_mut(subscription_index) {
+ if let Some(item_update) = item_updates.get_mut(item_index) {
+ for (field_name, field_value) in changed_fields {
+ item_update.set_field_value(field_name, field_value);
+ }
+ }
+ }
+ */
+ }
//
// Connection confirmation from server.
//