diff options
author | 2024-04-13 12:21:12 +0200 | |
---|---|---|
committer | 2024-04-13 12:21:12 +0200 | |
commit | 5af8a69942aeed85b5cd2bd84cb5f33169834690 (patch) | |
tree | d3910ffc83037fd931d665112b076070f01b9295 | |
parent | 82a9d2f0708c7f3754d5c40a60fa291bb320301b (diff) |
✨ (Cargo.toml): Bump package version to 0.1.8 for new changes
♻️ (item_update.rs): Refactor ItemUpdate struct and related methods to store only non-null changed fields
♻️ (ls_client.rs): Refactor data update handling to store updates in a HashMap and call on_item_update for each listener
🐛 (ls_client.rs): Fix item index off-by-one error in data update handling
🐛 (main.rs): Update on_item_update implementation to handle new ItemUpdate structure
♻️ (subscription_listener.rs): Refactor on_item_update method to take a reference to ItemUpdate
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/item_update.rs | 6 | ||||
-rw-r--r-- | src/ls_client.rs | 65 | ||||
-rw-r--r-- | src/main.rs | 31 | ||||
-rw-r--r-- | src/subscription_listener.rs | 2 |
5 files changed, 66 insertions, 40 deletions
@@ -1,6 +1,6 @@ [package] name = "lightstreamer-client" -version = "0.1.7" +version = "0.1.8" edition = "2021" authors = ["Daniel López Azaña <daniloaz@gmail.com>"] description = "A Rust client for Lightstreamer, designed to facilitate real-time communication with Lightstreamer servers." diff --git a/src/item_update.rs b/src/item_update.rs index 6c21882..7c43d93 100644 --- a/src/item_update.rs +++ b/src/item_update.rs @@ -28,7 +28,7 @@ pub struct ItemUpdate { pub item_name: Option<String>, pub item_pos: usize, pub fields: HashMap<String, Option<String>>, - pub changed_fields: HashMap<String, Option<String>>, + pub changed_fields: HashMap<String, String>, pub is_snapshot: bool, } @@ -46,7 +46,7 @@ impl ItemUpdate { /// /// # Returns /// A map containing the values for each field changed with the last server update. - pub fn get_changed_fields(&self) -> HashMap<String, Option<String>> { + pub fn get_changed_fields(&self) -> HashMap<String, String> { self.changed_fields.clone() } @@ -60,7 +60,7 @@ impl ItemUpdate { /// /// # Returns /// A map containing the values for each field changed with the last server update. - pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Option<String>> { + pub fn get_changed_fields_by_position(&self) -> HashMap<usize, String> { self.changed_fields .iter() .map(|(name, value)| (self.get_field_position(name), value.clone())) diff --git a/src/ls_client.rs b/src/ls_client.rs index 64b4a4f..b65885c 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -315,7 +315,7 @@ impl LightstreamerClient { let mut request_id: usize = 0; let mut _session_id: Option<String> = None; let mut subscription_id: usize = 0; - let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new(); + let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new(); loop { tokio::select! { message = read_stream.next() => { @@ -445,7 +445,7 @@ impl LightstreamerClient { // Data updates from server. // "u" => { - println!("Received data update from server: '{}'", clean_text); + //println!("Received data update from server: '{}'", clean_text); // Parse arguments from the received message. let arguments = clean_text.split(",").collect::<Vec<&str>>(); // @@ -464,7 +464,7 @@ impl LightstreamerClient { // let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0); let item = match subscription.get_items() { - Some(items) => items.get(item_index), + Some(items) => items.get(item_index-1), None => { println!("No items found in subscription: {:?}", subscription); continue; @@ -486,13 +486,16 @@ impl LightstreamerClient { } else { // If item doesn't exist in item_updates yet, the first update // is always a snapshot. - if let Some(item_updates) = item_updates.get(subscription_index) { - if let Some(_) = item_updates.get(item_index) { + if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) { + if let Some(_) = item_updates.get(&(item_index)) { + // Item update already exists in item_updates, so it's not a snapshot. false } else { + // Item update doesn't exist in item_updates, so the first update is always a snapshot. true } } else { + // Item updates not found for subscription, so the first update is always a snapshot. true } } @@ -597,29 +600,36 @@ impl LightstreamerClient { } } - //println!("Field values: {:?}", field_map); - - let changed_fields: HashMap<String, Option<String>> = field_map.iter() - .map(|(k, v)| (k.clone(), v.clone())) + // Store only item_update's changed fields. + let changed_fields: HashMap<String, String> = field_map.iter() + .filter_map(|(k, v)| { + if let Some(v) = v { + Some((k.clone(), v.clone())) + } else { + None + } + }) .collect(); - //println!("Changed fields: {:?}", changed_fields); // // Take the proper item_update from item_updates and update it with changed fields. // If the item_update doesn't exist yet, create a new one. // - match item_updates.get_mut(subscription_index) { - Some(item_updates) => match item_updates.get_mut(item_index) { + let current_item_update: ItemUpdate; + match subscription_item_updates.get_mut(&(subscription_index)) { + Some(item_updates) => match item_updates.get_mut(&(item_index)) { Some(item_update) => { // // Iterate changed_fields and update existing item_update.fields assigning the new values. // for (field_name, new_value) in &changed_fields { if item_update.fields.contains_key(field_name) { - item_update.fields.insert((*field_name).clone(), new_value.clone()); + item_update.fields.insert((*field_name).clone(), Some(new_value.clone())); } } item_update.changed_fields = changed_fields; + item_update.is_snapshot = is_snapshot; + current_item_update = item_update.clone(); }, None => { // Create a new item_update and add it to item_updates. @@ -630,7 +640,8 @@ impl LightstreamerClient { changed_fields: changed_fields, is_snapshot: is_snapshot, }; - item_updates.push(item_update); + current_item_update = item_update.clone(); + item_updates.insert(item_index, item_update); } }, None => { @@ -642,28 +653,20 @@ impl LightstreamerClient { changed_fields: changed_fields, is_snapshot: is_snapshot, }; - item_updates.push(vec![item_update]); + current_item_update = item_update.clone(); + let mut item_updates = HashMap::new(); + item_updates.insert(item_index, item_update); + subscription_item_updates.insert(subscription_index, item_updates); } }; - //println!("Item updates: {:?}", item_updates); + // Get mutable subscription listeners directly. + let subscription_listeners = subscription.get_listeners(); - println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap()); - println!("\n\n"); - - if item_updates.len() >= 3 { - return Ok(()); - } - - /* - if let Some(item_updates) = item_updates.get_mut(subscription_index) { - if let Some(item_update) = item_updates.get_mut(item_index) { - for (field_name, field_value) in changed_fields { - item_update.set_field_value(field_name, field_value); - } - } + // Iterate subscription listeners and call on_item_update for each listener. + for listener in subscription_listeners { + listener.on_item_update(¤t_item_update); } - */ } // // Connection confirmation from server. diff --git a/src/main.rs b/src/main.rs index 4959652..c284605 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,11 +43,34 @@ async fn setup_signal_hook(shutdown_signal: Arc<Notify>) { pub struct MySubscriptionListener {} impl SubscriptionListener for MySubscriptionListener { - fn on_item_update(&mut self, update: ItemUpdate) { + fn on_item_update(&self, update: &ItemUpdate) { println!( - "UPDATE {} {}", - update.get_value("stock_name").unwrap(), - update.get_value("last_price").unwrap() + "UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}", + update.item_name.as_ref().unwrap_or(&"N/A".to_string()), + "stock_name", + update.get_value("stock_name").unwrap_or(&"N/A".to_string()), + "last_price", + update.get_value("last_price").unwrap_or(&"N/A".to_string()), + "time", + update.get_value("time").unwrap_or(&"N/A".to_string()), + "pct_change", + update.get_value("pct_change").unwrap_or(&"N/A".to_string()), + "bid_quantity", + update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()), + "bid", + update.get_value("bid").unwrap_or(&"N/A".to_string()), + "ask", + update.get_value("ask").unwrap_or(&"N/A".to_string()), + "ask_quantity", + update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()), + "min", + update.get_value("min").unwrap_or(&"N/A".to_string()), + "max", + update.get_value("max").unwrap_or(&"N/A".to_string()), + "ref_price", + update.get_value("ref_price").unwrap_or(&"N/A".to_string()), + "open_price", + update.get_value("open_price").unwrap_or(&"N/A".to_string()), ); } } diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs index f37c703..72958fe 100644 --- a/src/subscription_listener.rs +++ b/src/subscription_listener.rs @@ -156,7 +156,7 @@ pub trait SubscriptionListener: Send { /// - `update`: a value object containing the updated values for all the fields, together with /// meta-information about the update itself and some helper methods that can be used to /// iterate through all or new values. - fn on_item_update(&mut self, _update: ItemUpdate) { + fn on_item_update(&self, _update: &ItemUpdate) { // Default implementation does nothing. unimplemented!("Implement on_item_update method for SubscriptionListener."); } |