diff options
Diffstat (limited to 'src/ls_client.rs')
-rw-r--r-- | src/ls_client.rs | 226 |
1 files changed, 223 insertions, 3 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs index a463216..64b4a4f 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -3,11 +3,13 @@ use crate::client_message_listener::ClientMessageListener; use crate::connection_details::ConnectionDetails; use crate::connection_options::ConnectionOptions; use crate::error::IllegalStateException; -use crate::subscription::Subscription; +use crate::item_update::ItemUpdate; +use crate::subscription::{Snapshot, Subscription, SubscriptionMode}; use crate::util::*; use cookie::Cookie; use futures_util::{SinkExt, StreamExt}; +use std::collections::HashMap; use std::error::Error; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; @@ -126,7 +128,7 @@ impl LightstreamerClient { // Constants for WebSocket connection. // pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w=="; - pub const SEC_WEBSOCKET_PROTOCOL: &'static str ="TLCP-2.4.0.lightstreamer.com"; + pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com"; pub const SEC_WEBSOCKET_VERSION: &'static str = "13"; pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket"; @@ -313,6 +315,7 @@ impl LightstreamerClient { let mut request_id: usize = 0; let mut _session_id: Option<String> = None; let mut subscription_id: usize = 0; + let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new(); loop { tokio::select! { message = read_stream.next() => { @@ -443,8 +446,225 @@ impl LightstreamerClient { // "u" => { println!("Received data update from server: '{}'", clean_text); + // Parse arguments from the received message. + let arguments = clean_text.split(",").collect::<Vec<&str>>(); + // + // Extract the subscription from the first argument. + // + let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0); + let subscription = match self.get_subscriptions().get(subscription_index-1) { + Some(subscription) => subscription, + None => { + println!("Subscription not found for index: {}", subscription_index); + continue; + } + }; + // + // Extract the item from the second argument. + // + let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0); + let item = match subscription.get_items() { + Some(items) => items.get(item_index), + None => { + println!("No items found in subscription: {:?}", subscription); + continue; + } + }; + // + // Determine if the update is a snapshot or real-time update based on the subscription parameters. + // + let is_snapshot = match subscription.get_requested_snapshot() { + Some(ls_snapshot) => { + match ls_snapshot { + Snapshot::No => false, + Snapshot::Yes => { + match subscription.get_mode() { + SubscriptionMode::Merge => { + if arguments.len() == 4 && arguments[3] == "$" { + // EOS notification received + true + } else { + // If item doesn't exist in item_updates yet, the first update + // is always a snapshot. + if let Some(item_updates) = item_updates.get(subscription_index) { + if let Some(_) = item_updates.get(item_index) { + false + } else { + true + } + } else { + true + } + } + }, + SubscriptionMode::Distinct | SubscriptionMode::Command => { + if !subscription.is_subscribed() { + true + } else { + false + } + }, + _ => false, + } + }, + _ => false, + } + }, + None => false, + }; - }, + // Extract the field values from the third argument. + let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect(); + + // + // Get fields from subscription and create a HashMap of field names and values. + // + let subscription_fields = subscription.get_fields(); + let mut field_map: HashMap<String, Option<String>> = subscription_fields + .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect()) + .unwrap_or_default(); + + let mut field_index = 0; + for value in field_values { + match value { + "" => { + // An empty value means the field is unchanged compared to the previous update of the same field. + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), None); + } + field_index += 1; + } + "#" | "$" => { + // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty. + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some("".to_string())); + } + field_index += 1; + } + value if value.starts_with('^') => { + let command = value.chars().nth(1).unwrap_or(' '); + match command { + '0'..='9' => { + let count = value[1..].parse().unwrap_or(0); + for i in 0..count { + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) { + field_map.insert(field_name.to_string(), None); + } + } + field_index += count; + } + 'P' | 'T' => { + let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) { + let new_value = match command { + 'P' => { + // Apply JSON Patch + let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null); + let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null); + let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default(); + let _ = json_patch::patch(&mut prev_json, &patch_operations); + prev_json.to_string() + } + 'T' => { + // Apply TLCP-diff + //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string()) + unimplemented!("Implement TLCP-diff"); + } + _ => unreachable!(), + }; + field_map.insert(field_name.to_string(), Some(new_value.to_string())); + } + } + field_index += 1; + } + _ => { + let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some(decoded_value)); + } + field_index += 1; + } + } + } + _ => { + let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some(decoded_value)); + } + field_index += 1; + } + } + } + + //println!("Field values: {:?}", field_map); + + let changed_fields: HashMap<String, Option<String>> = field_map.iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + //println!("Changed fields: {:?}", changed_fields); + + // + // Take the proper item_update from item_updates and update it with changed fields. + // If the item_update doesn't exist yet, create a new one. + // + match item_updates.get_mut(subscription_index) { + Some(item_updates) => match item_updates.get_mut(item_index) { + Some(item_update) => { + // + // Iterate changed_fields and update existing item_update.fields assigning the new values. + // + for (field_name, new_value) in &changed_fields { + if item_update.fields.contains_key(field_name) { + item_update.fields.insert((*field_name).clone(), new_value.clone()); + } + } + item_update.changed_fields = changed_fields; + }, + None => { + // Create a new item_update and add it to item_updates. + let item_update = ItemUpdate { + item_name: item.cloned(), + item_pos: item_index, + fields: field_map, + changed_fields: changed_fields, + is_snapshot: is_snapshot, + }; + item_updates.push(item_update); + } + }, + None => { + // Create a new item_update and add it to item_updates. + let item_update = ItemUpdate { + item_name: item.cloned(), + item_pos: item_index, + fields: field_map, + changed_fields: changed_fields, + is_snapshot: is_snapshot, + }; + item_updates.push(vec![item_update]); + } + }; + + //println!("Item updates: {:?}", item_updates); + + println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap()); + println!("\n\n"); + + if item_updates.len() >= 3 { + return Ok(()); + } + + /* + if let Some(item_updates) = item_updates.get_mut(subscription_index) { + if let Some(item_update) = item_updates.get_mut(item_index) { + for (field_name, field_value) in changed_fields { + item_update.set_field_value(field_name, field_value); + } + } + } + */ + } // // Connection confirmation from server. // |