(Cargo.toml): Bump package version to 0.1.8 for new changes

♻️ (item_update.rs): Refactor ItemUpdate struct and related methods to store only non-null changed fields
♻️ (ls_client.rs): Refactor data update handling to store updates in a HashMap and call on_item_update for each listener
🐛 (ls_client.rs): Fix item index off-by-one error in data update handling
🐛 (main.rs): Update on_item_update implementation to handle new ItemUpdate structure
♻️ (subscription_listener.rs): Refactor on_item_update method to take a reference to ItemUpdate
This commit is contained in:
daniloaz 2024-04-13 12:21:12 +02:00
parent 82a9d2f070
commit 5af8a69942
5 changed files with 66 additions and 40 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "lightstreamer-client"
version = "0.1.7"
version = "0.1.8"
edition = "2021"
authors = ["Daniel López Azaña <daniloaz@gmail.com>"]
description = "A Rust client for Lightstreamer, designed to facilitate real-time communication with Lightstreamer servers."

View File

@ -28,7 +28,7 @@ pub struct ItemUpdate {
pub item_name: Option<String>,
pub item_pos: usize,
pub fields: HashMap<String, Option<String>>,
pub changed_fields: HashMap<String, Option<String>>,
pub changed_fields: HashMap<String, String>,
pub is_snapshot: bool,
}
@ -46,7 +46,7 @@ impl ItemUpdate {
///
/// # Returns
/// A map containing the values for each field changed with the last server update.
pub fn get_changed_fields(&self) -> HashMap<String, Option<String>> {
pub fn get_changed_fields(&self) -> HashMap<String, String> {
self.changed_fields.clone()
}
@ -60,7 +60,7 @@ impl ItemUpdate {
///
/// # Returns
/// A map containing the values for each field changed with the last server update.
pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Option<String>> {
pub fn get_changed_fields_by_position(&self) -> HashMap<usize, String> {
self.changed_fields
.iter()
.map(|(name, value)| (self.get_field_position(name), value.clone()))

View File

@ -315,7 +315,7 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new();
let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new();
loop {
tokio::select! {
message = read_stream.next() => {
@ -445,7 +445,7 @@ impl LightstreamerClient {
// Data updates from server.
//
"u" => {
println!("Received data update from server: '{}'", clean_text);
//println!("Received data update from server: '{}'", clean_text);
// Parse arguments from the received message.
let arguments = clean_text.split(",").collect::<Vec<&str>>();
//
@ -464,7 +464,7 @@ impl LightstreamerClient {
//
let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
let item = match subscription.get_items() {
Some(items) => items.get(item_index),
Some(items) => items.get(item_index-1),
None => {
println!("No items found in subscription: {:?}", subscription);
continue;
@ -486,13 +486,16 @@ impl LightstreamerClient {
} else {
// If item doesn't exist in item_updates yet, the first update
// is always a snapshot.
if let Some(item_updates) = item_updates.get(subscription_index) {
if let Some(_) = item_updates.get(item_index) {
if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
if let Some(_) = item_updates.get(&(item_index)) {
// Item update already exists in item_updates, so it's not a snapshot.
false
} else {
// Item update doesn't exist in item_updates, so the first update is always a snapshot.
true
}
} else {
// Item updates not found for subscription, so the first update is always a snapshot.
true
}
}
@ -597,29 +600,36 @@ impl LightstreamerClient {
}
}
//println!("Field values: {:?}", field_map);
let changed_fields: HashMap<String, Option<String>> = field_map.iter()
.map(|(k, v)| (k.clone(), v.clone()))
// Store only item_update's changed fields.
let changed_fields: HashMap<String, String> = field_map.iter()
.filter_map(|(k, v)| {
if let Some(v) = v {
Some((k.clone(), v.clone()))
} else {
None
}
})
.collect();
//println!("Changed fields: {:?}", changed_fields);
//
// Take the proper item_update from item_updates and update it with changed fields.
// If the item_update doesn't exist yet, create a new one.
//
match item_updates.get_mut(subscription_index) {
Some(item_updates) => match item_updates.get_mut(item_index) {
let current_item_update: ItemUpdate;
match subscription_item_updates.get_mut(&(subscription_index)) {
Some(item_updates) => match item_updates.get_mut(&(item_index)) {
Some(item_update) => {
//
// Iterate changed_fields and update existing item_update.fields assigning the new values.
//
for (field_name, new_value) in &changed_fields {
if item_update.fields.contains_key(field_name) {
item_update.fields.insert((*field_name).clone(), new_value.clone());
item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
}
}
item_update.changed_fields = changed_fields;
item_update.is_snapshot = is_snapshot;
current_item_update = item_update.clone();
},
None => {
// Create a new item_update and add it to item_updates.
@ -630,7 +640,8 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
item_updates.push(item_update);
current_item_update = item_update.clone();
item_updates.insert(item_index, item_update);
}
},
None => {
@ -642,28 +653,20 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
item_updates.push(vec![item_update]);
current_item_update = item_update.clone();
let mut item_updates = HashMap::new();
item_updates.insert(item_index, item_update);
subscription_item_updates.insert(subscription_index, item_updates);
}
};
//println!("Item updates: {:?}", item_updates);
// Get mutable subscription listeners directly.
let subscription_listeners = subscription.get_listeners();
println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap());
println!("\n\n");
if item_updates.len() >= 3 {
return Ok(());
// Iterate subscription listeners and call on_item_update for each listener.
for listener in subscription_listeners {
listener.on_item_update(&current_item_update);
}
/*
if let Some(item_updates) = item_updates.get_mut(subscription_index) {
if let Some(item_update) = item_updates.get_mut(item_index) {
for (field_name, field_value) in changed_fields {
item_update.set_field_value(field_name, field_value);
}
}
}
*/
}
//
// Connection confirmation from server.

View File

@ -43,11 +43,34 @@ async fn setup_signal_hook(shutdown_signal: Arc<Notify>) {
pub struct MySubscriptionListener {}
impl SubscriptionListener for MySubscriptionListener {
fn on_item_update(&mut self, update: ItemUpdate) {
fn on_item_update(&self, update: &ItemUpdate) {
println!(
"UPDATE {} {}",
update.get_value("stock_name").unwrap(),
update.get_value("last_price").unwrap()
"UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}",
update.item_name.as_ref().unwrap_or(&"N/A".to_string()),
"stock_name",
update.get_value("stock_name").unwrap_or(&"N/A".to_string()),
"last_price",
update.get_value("last_price").unwrap_or(&"N/A".to_string()),
"time",
update.get_value("time").unwrap_or(&"N/A".to_string()),
"pct_change",
update.get_value("pct_change").unwrap_or(&"N/A".to_string()),
"bid_quantity",
update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()),
"bid",
update.get_value("bid").unwrap_or(&"N/A".to_string()),
"ask",
update.get_value("ask").unwrap_or(&"N/A".to_string()),
"ask_quantity",
update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()),
"min",
update.get_value("min").unwrap_or(&"N/A".to_string()),
"max",
update.get_value("max").unwrap_or(&"N/A".to_string()),
"ref_price",
update.get_value("ref_price").unwrap_or(&"N/A".to_string()),
"open_price",
update.get_value("open_price").unwrap_or(&"N/A".to_string()),
);
}
}

View File

@ -156,7 +156,7 @@ pub trait SubscriptionListener: Send {
/// - `update`: a value object containing the updated values for all the fields, together with
/// meta-information about the update itself and some helper methods that can be used to
/// iterate through all or new values.
fn on_item_update(&mut self, _update: ItemUpdate) {
fn on_item_update(&self, _update: &ItemUpdate) {
// Default implementation does nothing.
unimplemented!("Implement on_item_update method for SubscriptionListener.");
}