aboutsummaryrefslogtreecommitdiff
path: root/src/ls_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ls_client.rs')
-rw-r--r--src/ls_client.rs65
1 files changed, 34 insertions, 31 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs
index 64b4a4f..b65885c 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -315,7 +315,7 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
- let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new();
+ let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new();
loop {
tokio::select! {
message = read_stream.next() => {
@@ -445,7 +445,7 @@ impl LightstreamerClient {
// Data updates from server.
//
"u" => {
- println!("Received data update from server: '{}'", clean_text);
+ //println!("Received data update from server: '{}'", clean_text);
// Parse arguments from the received message.
let arguments = clean_text.split(",").collect::<Vec<&str>>();
//
@@ -464,7 +464,7 @@ impl LightstreamerClient {
//
let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
let item = match subscription.get_items() {
- Some(items) => items.get(item_index),
+ Some(items) => items.get(item_index-1),
None => {
println!("No items found in subscription: {:?}", subscription);
continue;
@@ -486,13 +486,16 @@ impl LightstreamerClient {
} else {
// If item doesn't exist in item_updates yet, the first update
// is always a snapshot.
- if let Some(item_updates) = item_updates.get(subscription_index) {
- if let Some(_) = item_updates.get(item_index) {
+ if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
+ if let Some(_) = item_updates.get(&(item_index)) {
+ // Item update already exists in item_updates, so it's not a snapshot.
false
} else {
+ // Item update doesn't exist in item_updates, so the first update is always a snapshot.
true
}
} else {
+ // Item updates not found for subscription, so the first update is always a snapshot.
true
}
}
@@ -597,29 +600,36 @@ impl LightstreamerClient {
}
}
- //println!("Field values: {:?}", field_map);
-
- let changed_fields: HashMap<String, Option<String>> = field_map.iter()
- .map(|(k, v)| (k.clone(), v.clone()))
+ // Store only item_update's changed fields.
+ let changed_fields: HashMap<String, String> = field_map.iter()
+ .filter_map(|(k, v)| {
+ if let Some(v) = v {
+ Some((k.clone(), v.clone()))
+ } else {
+ None
+ }
+ })
.collect();
- //println!("Changed fields: {:?}", changed_fields);
//
// Take the proper item_update from item_updates and update it with changed fields.
// If the item_update doesn't exist yet, create a new one.
//
- match item_updates.get_mut(subscription_index) {
- Some(item_updates) => match item_updates.get_mut(item_index) {
+ let current_item_update: ItemUpdate;
+ match subscription_item_updates.get_mut(&(subscription_index)) {
+ Some(item_updates) => match item_updates.get_mut(&(item_index)) {
Some(item_update) => {
//
// Iterate changed_fields and update existing item_update.fields assigning the new values.
//
for (field_name, new_value) in &changed_fields {
if item_update.fields.contains_key(field_name) {
- item_update.fields.insert((*field_name).clone(), new_value.clone());
+ item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
}
}
item_update.changed_fields = changed_fields;
+ item_update.is_snapshot = is_snapshot;
+ current_item_update = item_update.clone();
},
None => {
// Create a new item_update and add it to item_updates.
@@ -630,7 +640,8 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
- item_updates.push(item_update);
+ current_item_update = item_update.clone();
+ item_updates.insert(item_index, item_update);
}
},
None => {
@@ -642,28 +653,20 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
- item_updates.push(vec![item_update]);
+ current_item_update = item_update.clone();
+ let mut item_updates = HashMap::new();
+ item_updates.insert(item_index, item_update);
+ subscription_item_updates.insert(subscription_index, item_updates);
}
};
- //println!("Item updates: {:?}", item_updates);
+ // Get mutable subscription listeners directly.
+ let subscription_listeners = subscription.get_listeners();
- println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap());
- println!("\n\n");
-
- if item_updates.len() >= 3 {
- return Ok(());
- }
-
- /*
- if let Some(item_updates) = item_updates.get_mut(subscription_index) {
- if let Some(item_update) = item_updates.get_mut(item_index) {
- for (field_name, field_value) in changed_fields {
- item_update.set_field_value(field_name, field_value);
- }
- }
+ // Iterate subscription listeners and call on_item_update for each listener.
+ for listener in subscription_listeners {
+ listener.on_item_update(&current_item_update);
}
- */
}
//
// Connection confirmation from server.