diff options
Diffstat (limited to 'src/ls_client.rs')
-rw-r--r-- | src/ls_client.rs | 65 |
1 files changed, 34 insertions, 31 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs index 64b4a4f..b65885c 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -315,7 +315,7 @@ impl LightstreamerClient { let mut request_id: usize = 0; let mut _session_id: Option<String> = None; let mut subscription_id: usize = 0; - let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new(); + let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new(); loop { tokio::select! { message = read_stream.next() => { @@ -445,7 +445,7 @@ impl LightstreamerClient { // Data updates from server. // "u" => { - println!("Received data update from server: '{}'", clean_text); + //println!("Received data update from server: '{}'", clean_text); // Parse arguments from the received message. let arguments = clean_text.split(",").collect::<Vec<&str>>(); // @@ -464,7 +464,7 @@ impl LightstreamerClient { // let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0); let item = match subscription.get_items() { - Some(items) => items.get(item_index), + Some(items) => items.get(item_index-1), None => { println!("No items found in subscription: {:?}", subscription); continue; @@ -486,13 +486,16 @@ impl LightstreamerClient { } else { // If item doesn't exist in item_updates yet, the first update // is always a snapshot. - if let Some(item_updates) = item_updates.get(subscription_index) { - if let Some(_) = item_updates.get(item_index) { + if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) { + if let Some(_) = item_updates.get(&(item_index)) { + // Item update already exists in item_updates, so it's not a snapshot. false } else { + // Item update doesn't exist in item_updates, so the first update is always a snapshot. true } } else { + // Item updates not found for subscription, so the first update is always a snapshot. true } } @@ -597,29 +600,36 @@ impl LightstreamerClient { } } - //println!("Field values: {:?}", field_map); - - let changed_fields: HashMap<String, Option<String>> = field_map.iter() - .map(|(k, v)| (k.clone(), v.clone())) + // Store only item_update's changed fields. + let changed_fields: HashMap<String, String> = field_map.iter() + .filter_map(|(k, v)| { + if let Some(v) = v { + Some((k.clone(), v.clone())) + } else { + None + } + }) .collect(); - //println!("Changed fields: {:?}", changed_fields); // // Take the proper item_update from item_updates and update it with changed fields. // If the item_update doesn't exist yet, create a new one. // - match item_updates.get_mut(subscription_index) { - Some(item_updates) => match item_updates.get_mut(item_index) { + let current_item_update: ItemUpdate; + match subscription_item_updates.get_mut(&(subscription_index)) { + Some(item_updates) => match item_updates.get_mut(&(item_index)) { Some(item_update) => { // // Iterate changed_fields and update existing item_update.fields assigning the new values. // for (field_name, new_value) in &changed_fields { if item_update.fields.contains_key(field_name) { - item_update.fields.insert((*field_name).clone(), new_value.clone()); + item_update.fields.insert((*field_name).clone(), Some(new_value.clone())); } } item_update.changed_fields = changed_fields; + item_update.is_snapshot = is_snapshot; + current_item_update = item_update.clone(); }, None => { // Create a new item_update and add it to item_updates. @@ -630,7 +640,8 @@ impl LightstreamerClient { changed_fields: changed_fields, is_snapshot: is_snapshot, }; - item_updates.push(item_update); + current_item_update = item_update.clone(); + item_updates.insert(item_index, item_update); } }, None => { @@ -642,28 +653,20 @@ impl LightstreamerClient { changed_fields: changed_fields, is_snapshot: is_snapshot, }; - item_updates.push(vec![item_update]); + current_item_update = item_update.clone(); + let mut item_updates = HashMap::new(); + item_updates.insert(item_index, item_update); + subscription_item_updates.insert(subscription_index, item_updates); } }; - //println!("Item updates: {:?}", item_updates); + // Get mutable subscription listeners directly. + let subscription_listeners = subscription.get_listeners(); - println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap()); - println!("\n\n"); - - if item_updates.len() >= 3 { - return Ok(()); - } - - /* - if let Some(item_updates) = item_updates.get_mut(subscription_index) { - if let Some(item_update) = item_updates.get_mut(item_index) { - for (field_name, field_value) in changed_fields { - item_update.set_field_value(field_name, field_value); - } - } + // Iterate subscription listeners and call on_item_update for each listener. + for listener in subscription_listeners { + listener.on_item_update(¤t_item_update); } - */ } // // Connection confirmation from server. |