Implemented part of the item update logic.
This commit is contained in:
parent
7d7f380e30
commit
82a9d2f070
@ -14,6 +14,7 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme"
|
||||
cookie = { version = "0", features = ["percent-encode"]}
|
||||
futures = "0"
|
||||
futures-util = "0"
|
||||
json-patch = "1"
|
||||
reqwest = { version = "0", features = ["json", "stream"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = { version = "1" }
|
||||
|
@ -1,428 +1,241 @@
|
||||
use std::collections::HashMap;
|
||||
use serde_json::Value;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
/// Contains all the information related to an update of the field values for an item.
|
||||
/// It reports all the new values of the fields.
|
||||
///
|
||||
/// COMMAND Subscription:
|
||||
/// If the involved Subscription is a COMMAND Subscription, then the values for the current update
|
||||
/// are meant as relative to the same key.
|
||||
///
|
||||
/// Moreover, if the involved Subscription has a two-level behavior enabled, then each update may
|
||||
/// be associated with either a first-level or a second-level item. In this case, the reported fields
|
||||
/// are always the union of the first-level and second-level fields, and each single update can only
|
||||
/// change either the first-level or the second-level fields (but for the "command" field, which is
|
||||
/// first-level and is always set to "UPDATE" upon a second-level update). When the two-level behavior
|
||||
/// is enabled, in all methods where a field name has to be supplied, the following convention should
|
||||
/// be followed:
|
||||
/// Moreover, if the involved Subscription has a two-level behavior enabled, then each update may be
|
||||
/// associated with either a first-level or a second-level item. In this case, the reported fields are
|
||||
/// always the union of the first-level and second-level fields and each single update can only change
|
||||
/// either the first-level or the second-level fields (but for the "command" field, which is first-level
|
||||
/// and is always set to "UPDATE" upon a second-level update); note that the second-level field values
|
||||
/// are always None until the first second-level update occurs). When the two-level behavior is enabled,
|
||||
/// in all methods where a field name has to be supplied, the following convention should be followed:
|
||||
///
|
||||
/// - The field name can always be used, both for the first-level and the second-level fields.
|
||||
/// In case of name conflict, the first-level field is meant.
|
||||
/// - The field position can always be used; however, the field positions for the second-level fields
|
||||
/// start at the highest position of the first-level field list + 1. If a field schema had been
|
||||
/// specified for either first-level or second-level Subscriptions, then client-side knowledge of
|
||||
/// the first-level schema length would be required.
|
||||
/// - The field name can always be used, both for the first-level and the second-level fields. In case of
|
||||
/// name conflict, the first-level field is meant.
|
||||
/// - The field position can always be used; however, the field positions for the second-level fields start
|
||||
/// at the highest position of the first-level field list + 1. If a field schema had been specified for
|
||||
/// either first-level or second-level Subscriptions, then client-side knowledge of the first-level schema
|
||||
/// length would be required.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ItemUpdate {
|
||||
changed_fields: HashMap<String, Value>,
|
||||
fields: HashMap<String, Value>,
|
||||
item_name: Option<String>,
|
||||
item_pos: usize,
|
||||
is_snapshot: bool,
|
||||
prev_values: HashMap<String, Value>,
|
||||
pub item_name: Option<String>,
|
||||
pub item_pos: usize,
|
||||
pub fields: HashMap<String, Option<String>>,
|
||||
pub changed_fields: HashMap<String, Option<String>>,
|
||||
pub is_snapshot: bool,
|
||||
}
|
||||
|
||||
impl ItemUpdate {
|
||||
/// Returns a map containing the values for each field changed with the last server update.
|
||||
/// The related field name is used as key for the values in the map.
|
||||
/// The related field name is used as key for the values in the map. Note that if the Subscription
|
||||
/// mode of the involved Subscription is COMMAND, then changed fields are meant as relative to the
|
||||
/// previous update for the same key. On such tables if a DELETE command is received, all the fields,
|
||||
/// excluding the key field, will be present as changed, with None value. All of this is also true on
|
||||
/// tables that have the two-level behavior enabled, but in case of DELETE commands second-level fields
|
||||
/// will not be iterated.
|
||||
///
|
||||
/// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed
|
||||
/// fields are meant as relative to the previous update for the same key. On such tables if a
|
||||
/// DELETE command is received, all the fields, excluding the key field, will be present as
|
||||
/// changed, with None value. All of this is also true on tables that have the two-level behavior
|
||||
/// enabled, but in case of DELETE commands second-level fields will not be iterated.
|
||||
/// # Raises
|
||||
/// - `IllegalStateException` – if the Subscription was initialized using a field schema.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an `IllegalStateException` if the Subscription was initialized using a field schema.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// let changed_fields = item_update.get_changed_fields();
|
||||
/// assert_eq!(changed_fields.len(), 2);
|
||||
/// assert_eq!(changed_fields.get("foo"), Some(&json!(42)));
|
||||
/// assert_eq!(changed_fields.get("bar"), Some(&json!("baz")));
|
||||
/// ```
|
||||
pub fn get_changed_fields(&self) -> &HashMap<String, Value> {
|
||||
&self.changed_fields
|
||||
/// # Returns
|
||||
/// A map containing the values for each field changed with the last server update.
|
||||
pub fn get_changed_fields(&self) -> HashMap<String, Option<String>> {
|
||||
self.changed_fields.clone()
|
||||
}
|
||||
|
||||
/// Returns a map containing the values for each field changed with the last server update.
|
||||
/// The 1-based field position within the field schema or field list is used as key for the
|
||||
/// values in the map.
|
||||
/// The 1-based field position within the field schema or field list is used as key for the values in
|
||||
/// the map. Note that if the Subscription mode of the involved Subscription is COMMAND, then changed
|
||||
/// fields are meant as relative to the previous update for the same key. On such tables if a DELETE
|
||||
/// command is received, all the fields, excluding the key field, will be present as changed, with None
|
||||
/// value. All of this is also true on tables that have the two-level behavior enabled, but in case of
|
||||
/// DELETE commands second-level fields will not be iterated.
|
||||
///
|
||||
/// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed
|
||||
/// fields are meant as relative to the previous update for the same key. On such tables if a
|
||||
/// DELETE command is received, all the fields, excluding the key field, will be present as
|
||||
/// changed, with None value. All of this is also true on tables that have the two-level behavior
|
||||
/// enabled, but in case of DELETE commands second-level fields will not be iterated.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// use std::collections::HashMap;
|
||||
/// let mut changed_fields_by_pos = HashMap::new();
|
||||
/// changed_fields_by_pos.insert(1, json!(42));
|
||||
/// changed_fields_by_pos.insert(2, json!("baz"));
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: changed_fields_by_pos,
|
||||
/// fields: HashMap::new(),
|
||||
/// item_name: None,
|
||||
/// item_pos: 0,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// let changed_fields = item_update.get_changed_fields_by_position();
|
||||
/// assert_eq!(changed_fields.len(), 2);
|
||||
/// assert_eq!(changed_fields.get(&1), Some(&json!(42)));
|
||||
/// assert_eq!(changed_fields.get(&2), Some(&json!("baz")));
|
||||
/// ```
|
||||
pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Value> {
|
||||
// Convert the changed_fields HashMap to a HashMap with usize keys
|
||||
let changed_fields_by_pos: HashMap<usize, Value> = self
|
||||
.changed_fields
|
||||
/// # Returns
|
||||
/// A map containing the values for each field changed with the last server update.
|
||||
pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Option<String>> {
|
||||
self.changed_fields
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, (_k, v))| (i + 1, v.clone()))
|
||||
.collect();
|
||||
changed_fields_by_pos
|
||||
.map(|(name, value)| (self.get_field_position(name), value.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns a map containing the values for each field in the Subscription.
|
||||
/// The related field name is used as key for the values in the map.
|
||||
///
|
||||
/// # Errors
|
||||
/// # Raises
|
||||
/// - `IllegalStateException` – if the Subscription was initialized using a field schema.
|
||||
///
|
||||
/// Returns an `IllegalStateException` if the Subscription was initialized using a field schema.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: HashMap::new(),
|
||||
/// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// let fields = item_update.get_fields();
|
||||
/// assert_eq!(fields.len(), 2);
|
||||
/// assert_eq!(fields.get("foo"), Some(&json!(42)));
|
||||
/// assert_eq!(fields.get("bar"), Some(&json!("baz")));
|
||||
/// ```
|
||||
pub fn get_fields(&self) -> &HashMap<String, Value> {
|
||||
&self.fields
|
||||
/// # Returns
|
||||
/// A map containing the values for each field in the Subscription.
|
||||
pub fn get_fields(&self) -> HashMap<String, Option<String>> {
|
||||
self.fields.clone()
|
||||
}
|
||||
|
||||
/// Returns a map containing the values for each field in the Subscription.
|
||||
/// The 1-based field position within the field schema or field list is used as key for the
|
||||
/// values in the map.
|
||||
/// The 1-based field position within the field schema or field list is used as key for the values in the map.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// use std::collections::HashMap;
|
||||
/// let mut fields_by_pos = HashMap::new();
|
||||
/// fields_by_pos.insert(1, json!(42));
|
||||
/// fields_by_pos.insert(2, json!("baz"));
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: HashMap::new(),
|
||||
/// fields: fields_by_pos,
|
||||
/// item_name: None,
|
||||
/// item_pos: 0,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// let fields = item_update.get_fields_by_position();
|
||||
/// assert_eq!(fields.len(), 2);
|
||||
/// assert_eq!(fields.get(&1), Some(&json!(42)));
|
||||
/// assert_eq!(fields.get(&2), Some(&json!("baz")));
|
||||
/// ```
|
||||
pub fn get_fields_by_position(&self) -> &HashMap<String, Value> {
|
||||
&self.fields
|
||||
/// # Returns
|
||||
/// A map containing the values for each field in the Subscription.
|
||||
pub fn get_fields_by_position(&self) -> HashMap<usize, Option<String>> {
|
||||
self.fields
|
||||
.iter()
|
||||
.map(|(name, value)| (self.get_field_position(name), value.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Inquiry method that retrieves the name of the item to which this update pertains.
|
||||
///
|
||||
/// The name will be `None` if the related Subscription was initialized using an "Item Group".
|
||||
/// The name will be None if the related Subscription was initialized using an "Item Group".
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: HashMap::new(),
|
||||
/// fields: HashMap::new(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// assert_eq!(item_update.get_item_name(), Some("item1".to_string()));
|
||||
/// ```
|
||||
pub fn get_item_name(&self) -> Option<&String> {
|
||||
self.item_name.as_ref()
|
||||
/// # Returns
|
||||
/// The name of the item to which this update pertains.
|
||||
pub fn get_item_name(&self) -> Option<&str> {
|
||||
self.item_name.as_deref()
|
||||
}
|
||||
|
||||
/// Inquiry method that retrieves the position in the "Item List" or "Item Group" of the item
|
||||
/// to which this update pertains.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: HashMap::new(),
|
||||
/// fields: HashMap::new(),
|
||||
/// item_name: None,
|
||||
/// item_pos: 5,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// assert_eq!(item_update.get_item_pos(), 5);
|
||||
/// ```
|
||||
/// # Returns
|
||||
/// The 1-based position of the item to which this update pertains.
|
||||
pub fn get_item_pos(&self) -> usize {
|
||||
self.item_pos
|
||||
}
|
||||
|
||||
/// Inquiry method that gets the value for a specified field, as received from the Server with
|
||||
/// the current or previous update.
|
||||
/// Inquiry method that gets the value for a specified field, as received from the Server with the
|
||||
/// current or previous update.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
|
||||
/// # Raises
|
||||
/// - `IllegalArgumentException` – if the specified field is not part of the Subscription.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field
|
||||
/// List" or "Field Schema".
|
||||
/// - `field_name_or_pos` – The field name or the 1-based position of the field within the "Field List" or "Field Schema".
|
||||
///
|
||||
/// # Returns
|
||||
/// The value of the specified field; it can be None in the following cases:
|
||||
///
|
||||
/// The value of the specified field; it can be `None` in the following cases:
|
||||
/// - A `None` value has been received from the Server, as `None` is a possible value for a field.
|
||||
/// - No value has been received for the field yet.
|
||||
/// - The item is subscribed to with the COMMAND mode and a DELETE command is received (only the
|
||||
/// fields used to carry key and command information are valued).
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// assert_eq!(item_update.get_value("foo"), Some(json!(42)));
|
||||
/// assert_eq!(item_update.get_value("bar"), Some(json!("baz")));
|
||||
/// assert_eq!(item_update.get_value(1), Some(json!(42)));
|
||||
/// assert_eq!(item_update.get_value(2), Some(json!("baz")));
|
||||
/// ```
|
||||
pub fn get_value(&self, field_name_or_pos: &str) -> Option<&Value> {
|
||||
self.fields.get(field_name_or_pos)
|
||||
/// - a None value has been received from the Server, as None is a possible value for a field;
|
||||
/// - no value has been received for the field yet;
|
||||
/// - the item is subscribed to with the COMMAND mode and a DELETE command is received (only the fields
|
||||
/// used to carry key and command information are valued).
|
||||
pub fn get_value(&self, field_name_or_pos: &str) -> Option<&str> {
|
||||
match field_name_or_pos.parse::<usize>() {
|
||||
Ok(pos) => self
|
||||
.fields
|
||||
.iter()
|
||||
.find(|(name, _)| self.get_field_position(name) == pos)
|
||||
.and_then(|(_, value)| value.as_deref()),
|
||||
Err(_) => self.fields.get(field_name_or_pos).and_then(|v| v.as_deref()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inquiry method that gets the difference between the new value and the previous one as a
|
||||
/// JSON Patch structure, provided that the Server has used the JSON Patch format to send this
|
||||
/// difference, as part of the "delta delivery" mechanism. This, in turn, requires that:
|
||||
/// Inquiry method that gets the difference between the new value and the previous one as a JSON Patch structure,
|
||||
/// provided that the Server has used the JSON Patch format to send this difference, as part of the "delta delivery"
|
||||
/// mechanism. This, in turn, requires that:
|
||||
///
|
||||
/// - The Data Adapter has explicitly indicated JSON Patch as the privileged type of compression
|
||||
/// for this field.
|
||||
/// - Both the previous and new value are suitable for the JSON Patch computation (i.e. they are
|
||||
/// valid JSON representations).
|
||||
/// - The item was subscribed to in MERGE or DISTINCT mode (note that, in case of two-level
|
||||
/// behavior, this holds for all fields related with second-level items, as these items are in
|
||||
/// MERGE mode).
|
||||
/// - Sending the JSON Patch difference has been evaluated by the Server as more efficient than
|
||||
/// sending the full new value.
|
||||
/// - the Data Adapter has explicitly indicated JSON Patch as the privileged type of compression for this field;
|
||||
/// - both the previous and new value are suitable for the JSON Patch computation (i.e. they are valid JSON representations);
|
||||
/// - the item was subscribed to in MERGE or DISTINCT mode (note that, in case of two-level behavior, this holds for all
|
||||
/// fields related with second-level items, as these items are in MERGE mode);
|
||||
/// - sending the JSON Patch difference has been evaluated by the Server as more efficient than sending the full new value.
|
||||
///
|
||||
/// Note that the last condition can be enforced by leveraging the Server's `<jsonpatch_min_length>`
|
||||
/// configuration flag, so that the availability of the JSON Patch form would only depend on the
|
||||
/// Client and the Data Adapter.
|
||||
/// Note that the last condition can be enforced by leveraging the Server's <jsonpatch_min_length> configuration flag,
|
||||
/// so that the availability of the JSON Patch form would only depend on the Client and the Data Adapter.
|
||||
///
|
||||
/// When the above conditions are not met, the method just returns `None`; in this case, the
|
||||
/// new value can only be determined through `ItemUpdate::get_value()`. For instance, this will
|
||||
/// always be needed to get the first value received.
|
||||
/// When the above conditions are not met, the method just returns None; in this case, the new value can only be determined
|
||||
/// through `ItemUpdate.get_value()`. For instance, this will always be needed to get the first value received.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
|
||||
/// # Raises
|
||||
/// - `IllegalArgumentException` – if the specified field is not part of the Subscription.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field
|
||||
/// List" or "Field Schema".
|
||||
/// - `field_name_or_pos` – The field name or the 1-based position of the field within the "Field List" or "Field Schema".
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A JSON Patch structure representing the difference between the new value and the previous one,
|
||||
/// or `None` if the difference in JSON Patch format is not available for any reason.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::{json, Value};
|
||||
/// let mut item_update = ItemUpdate {
|
||||
/// changed_fields: vec![("foo".to_string(), json!(42))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// fields: vec![("foo".to_string(), json!(42))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: vec![("foo".to_string(), json!(41))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// };
|
||||
///
|
||||
/// // Assuming the Server sends a JSON Patch for the "foo" field
|
||||
/// let json_patch: Value = json!([
|
||||
/// { "op": "replace", "path": "/foo", "value": 42 }
|
||||
/// ]);
|
||||
/// item_update.changed_fields.insert("foo".to_string(), json_patch.clone());
|
||||
///
|
||||
/// assert_eq!(
|
||||
/// item_update.get_value_as_json_patch_if_available("foo"),
|
||||
/// Some(&json_patch)
|
||||
/// );
|
||||
/// ```
|
||||
pub fn get_value_as_json_patch_if_available(&self, field_name_or_pos: &str) -> Option<&Value> {
|
||||
self.changed_fields.get(field_name_or_pos)
|
||||
/// or None if the difference in JSON Patch format is not available for any reason.
|
||||
pub fn get_value_as_json_patch_if_available(
|
||||
&self,
|
||||
_field_name_or_pos: &str,
|
||||
) -> Option<String> {
|
||||
// Implementation pending
|
||||
None
|
||||
}
|
||||
|
||||
/// Inquiry method that asks whether the current update belongs to the item snapshot (which carries
|
||||
/// the current item state at the time of Subscription). Snapshot events are sent only if snapshot
|
||||
/// information was requested for the items through `Subscription::set_requested_snapshot()` and
|
||||
/// precede the real time events. Snapshot information take different forms in different subscription
|
||||
/// modes and can be spanned across zero, one or several update events. In particular:
|
||||
/// Inquiry method that asks whether the current update belongs to the item snapshot (which carries the current item state
|
||||
/// at the time of Subscription). Snapshot events are sent only if snapshot information was requested for the items through
|
||||
/// `Subscription.set_requested_snapshot()` and precede the real time events. Snapshot information takes different forms in
|
||||
/// different subscription modes and can be spanned across zero, one or several update events. In particular:
|
||||
///
|
||||
/// - If the item is subscribed to with the RAW subscription mode, then no snapshot is sent by
|
||||
/// the Server.
|
||||
/// - If the item is subscribed to with the MERGE subscription mode, then the snapshot consists
|
||||
/// of exactly one event, carrying the current value for all fields.
|
||||
/// - If the item is subscribed to with the DISTINCT subscription mode, then the snapshot consists
|
||||
/// of some of the most recent updates; these updates are as many as specified through
|
||||
/// `Subscription::set_requested_snapshot()`, unless fewer are available.
|
||||
/// - If the item is subscribed to with the COMMAND subscription mode, then the snapshot consists
|
||||
/// of an "ADD" event for each key that is currently present.
|
||||
/// - if the item is subscribed to with the RAW subscription mode, then no snapshot is sent by the Server;
|
||||
/// - if the item is subscribed to with the MERGE subscription mode, then the snapshot consists of exactly one event,
|
||||
/// carrying the current value for all fields;
|
||||
/// - if the item is subscribed to with the DISTINCT subscription mode, then the snapshot consists of some of the most recent
|
||||
/// updates; these updates are as many as specified through `Subscription.set_requested_snapshot()`, unless fewer are available;
|
||||
/// - if the item is subscribed to with the COMMAND subscription mode, then the snapshot consists of an "ADD" event for each key
|
||||
/// that is currently present.
|
||||
///
|
||||
/// Note that, in case of two-level behavior, snapshot-related updates for both the first-level
|
||||
/// item (which is in COMMAND mode) and any second-level items (which are in MERGE mode) are
|
||||
/// qualified with this flag.
|
||||
/// Note that, in case of two-level behavior, snapshot-related updates for both the first-level item (which is in COMMAND mode)
|
||||
/// and any second-level items (which are in MERGE mode) are qualified with this flag.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: HashMap::new(),
|
||||
/// fields: HashMap::new(),
|
||||
/// item_name: None,
|
||||
/// item_pos: 0,
|
||||
/// is_snapshot: true,
|
||||
/// prev_values: HashMap::new(),
|
||||
/// };
|
||||
/// assert!(item_update.is_snapshot());
|
||||
/// ```
|
||||
/// # Returns
|
||||
/// `true` if the current update event belongs to the item snapshot; `false` otherwise.
|
||||
pub fn is_snapshot(&self) -> bool {
|
||||
self.is_snapshot
|
||||
}
|
||||
|
||||
/// Inquiry method that asks whether the value for a field has changed after the reception of
|
||||
/// the last update from the Server for an item. If the Subscription mode is COMMAND then the
|
||||
/// change is meant as relative to the same key.
|
||||
/// Inquiry method that asks whether the value for a field has changed after the reception of the last update from the Server
|
||||
/// for an item. If the Subscription mode is COMMAND then the change is meant as relative to the same key.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `field_name_or_pos`: The field name or the 1-based position of the field within the field
|
||||
/// list or field schema.
|
||||
/// - `field_name_or_pos` – The field name or the 1-based position of the field within the field list or field schema.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Unless the Subscription mode is COMMAND, the return value is `true` in the following cases:
|
||||
///
|
||||
/// - It is the first update for the item.
|
||||
/// - The new field value is different than the previous field value received for the item.
|
||||
/// - It is the first update for the item;
|
||||
/// - the new field value is different than the previous field value received for the item.
|
||||
///
|
||||
/// If the Subscription mode is COMMAND, the return value is `true` in the following cases:
|
||||
///
|
||||
/// - It is the first update for the involved key value (i.e. the event carries an "ADD" command).
|
||||
/// - The new field value is different than the previous field value received for the item,
|
||||
/// relative to the same key value (the event must carry an "UPDATE" command).
|
||||
/// - The event carries a "DELETE" command (this applies to all fields other than the field used
|
||||
/// to carry key information).
|
||||
/// - it is the first update for the involved key value (i.e. the event carries an "ADD" command);
|
||||
/// - the new field value is different than the previous field value received for the item, relative to the same key value
|
||||
/// (the event must carry an "UPDATE" command);
|
||||
/// - the event carries a "DELETE" command (this applies to all fields other than the field used to carry key information).
|
||||
///
|
||||
/// In all other cases, the return value is `false`.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use serde_json::json;
|
||||
/// let item_update = ItemUpdate {
|
||||
/// changed_fields: vec![("foo".to_string(), json!(42))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// item_name: Some("item1".to_string()),
|
||||
/// item_pos: 1,
|
||||
/// is_snapshot: false,
|
||||
/// prev_values: vec![("foo".to_string(), json!(41))]
|
||||
/// .into_iter()
|
||||
/// .collect(),
|
||||
/// };
|
||||
/// assert!(item_update.is_value_changed("foo"));
|
||||
/// assert!(!item_update.is_value_changed("bar"));
|
||||
/// ```
|
||||
/// # Raises
|
||||
/// - `IllegalArgumentException` – if the specified field is not part of the Subscription.
|
||||
pub fn is_value_changed(&self, field_name_or_pos: &str) -> bool {
|
||||
if let Some(new_value) = self.fields.get(field_name_or_pos) {
|
||||
if let Some(prev_value) = self.prev_values.get(field_name_or_pos) {
|
||||
return new_value != prev_value;
|
||||
} else {
|
||||
// This is the first update for the item
|
||||
return true;
|
||||
}
|
||||
match field_name_or_pos.parse::<usize>() {
|
||||
Ok(pos) => self
|
||||
.changed_fields
|
||||
.iter()
|
||||
.any(|(name, _)| self.get_field_position(name) == pos),
|
||||
Err(_) => self.changed_fields.contains_key(field_name_or_pos),
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Helper method to get the 1-based position of a field within the field list or field schema.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `field_name` – The name of the field.
|
||||
///
|
||||
/// # Returns
|
||||
/// The 1-based position of the field within the field list or field schema.
|
||||
fn get_field_position(&self, field_name: &str) -> usize {
|
||||
// Implementation pending
|
||||
// This method should return the 1-based position of the field based on the field list or field schema
|
||||
// If the field is not found, it should raise an IllegalArgumentException
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
226
src/ls_client.rs
226
src/ls_client.rs
@ -3,11 +3,13 @@ use crate::client_message_listener::ClientMessageListener;
|
||||
use crate::connection_details::ConnectionDetails;
|
||||
use crate::connection_options::ConnectionOptions;
|
||||
use crate::error::IllegalStateException;
|
||||
use crate::subscription::Subscription;
|
||||
use crate::item_update::ItemUpdate;
|
||||
use crate::subscription::{Snapshot, Subscription, SubscriptionMode};
|
||||
use crate::util::*;
|
||||
|
||||
use cookie::Cookie;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
@ -126,7 +128,7 @@ impl LightstreamerClient {
|
||||
// Constants for WebSocket connection.
|
||||
//
|
||||
pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
|
||||
pub const SEC_WEBSOCKET_PROTOCOL: &'static str ="TLCP-2.4.0.lightstreamer.com";
|
||||
pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
|
||||
pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
|
||||
pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
|
||||
|
||||
@ -313,6 +315,7 @@ impl LightstreamerClient {
|
||||
let mut request_id: usize = 0;
|
||||
let mut _session_id: Option<String> = None;
|
||||
let mut subscription_id: usize = 0;
|
||||
let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
message = read_stream.next() => {
|
||||
@ -443,8 +446,225 @@ impl LightstreamerClient {
|
||||
//
|
||||
"u" => {
|
||||
println!("Received data update from server: '{}'", clean_text);
|
||||
// Parse arguments from the received message.
|
||||
let arguments = clean_text.split(",").collect::<Vec<&str>>();
|
||||
//
|
||||
// Extract the subscription from the first argument.
|
||||
//
|
||||
let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
|
||||
let subscription = match self.get_subscriptions().get(subscription_index-1) {
|
||||
Some(subscription) => subscription,
|
||||
None => {
|
||||
println!("Subscription not found for index: {}", subscription_index);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
//
|
||||
// Extract the item from the second argument.
|
||||
//
|
||||
let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
|
||||
let item = match subscription.get_items() {
|
||||
Some(items) => items.get(item_index),
|
||||
None => {
|
||||
println!("No items found in subscription: {:?}", subscription);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
//
|
||||
// Determine if the update is a snapshot or real-time update based on the subscription parameters.
|
||||
//
|
||||
let is_snapshot = match subscription.get_requested_snapshot() {
|
||||
Some(ls_snapshot) => {
|
||||
match ls_snapshot {
|
||||
Snapshot::No => false,
|
||||
Snapshot::Yes => {
|
||||
match subscription.get_mode() {
|
||||
SubscriptionMode::Merge => {
|
||||
if arguments.len() == 4 && arguments[3] == "$" {
|
||||
// EOS notification received
|
||||
true
|
||||
} else {
|
||||
// If item doesn't exist in item_updates yet, the first update
|
||||
// is always a snapshot.
|
||||
if let Some(item_updates) = item_updates.get(subscription_index) {
|
||||
if let Some(_) = item_updates.get(item_index) {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
},
|
||||
SubscriptionMode::Distinct | SubscriptionMode::Command => {
|
||||
if !subscription.is_subscribed() {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
_ => false,
|
||||
}
|
||||
},
|
||||
_ => false,
|
||||
}
|
||||
},
|
||||
None => false,
|
||||
};
|
||||
|
||||
},
|
||||
// Extract the field values from the third argument.
|
||||
let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
|
||||
|
||||
//
|
||||
// Get fields from subscription and create a HashMap of field names and values.
|
||||
//
|
||||
let subscription_fields = subscription.get_fields();
|
||||
let mut field_map: HashMap<String, Option<String>> = subscription_fields
|
||||
.map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut field_index = 0;
|
||||
for value in field_values {
|
||||
match value {
|
||||
"" => {
|
||||
// An empty value means the field is unchanged compared to the previous update of the same field.
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
||||
field_map.insert(field_name.to_string(), None);
|
||||
}
|
||||
field_index += 1;
|
||||
}
|
||||
"#" | "$" => {
|
||||
// A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
||||
field_map.insert(field_name.to_string(), Some("".to_string()));
|
||||
}
|
||||
field_index += 1;
|
||||
}
|
||||
value if value.starts_with('^') => {
|
||||
let command = value.chars().nth(1).unwrap_or(' ');
|
||||
match command {
|
||||
'0'..='9' => {
|
||||
let count = value[1..].parse().unwrap_or(0);
|
||||
for i in 0..count {
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
|
||||
field_map.insert(field_name.to_string(), None);
|
||||
}
|
||||
}
|
||||
field_index += count;
|
||||
}
|
||||
'P' | 'T' => {
|
||||
let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
||||
if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
|
||||
let new_value = match command {
|
||||
'P' => {
|
||||
// Apply JSON Patch
|
||||
let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
|
||||
let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
|
||||
let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
|
||||
let _ = json_patch::patch(&mut prev_json, &patch_operations);
|
||||
prev_json.to_string()
|
||||
}
|
||||
'T' => {
|
||||
// Apply TLCP-diff
|
||||
//tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
|
||||
unimplemented!("Implement TLCP-diff");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
field_map.insert(field_name.to_string(), Some(new_value.to_string()));
|
||||
}
|
||||
}
|
||||
field_index += 1;
|
||||
}
|
||||
_ => {
|
||||
let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
||||
field_map.insert(field_name.to_string(), Some(decoded_value));
|
||||
}
|
||||
field_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
|
||||
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
||||
field_map.insert(field_name.to_string(), Some(decoded_value));
|
||||
}
|
||||
field_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//println!("Field values: {:?}", field_map);
|
||||
|
||||
let changed_fields: HashMap<String, Option<String>> = field_map.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
//println!("Changed fields: {:?}", changed_fields);
|
||||
|
||||
//
|
||||
// Take the proper item_update from item_updates and update it with changed fields.
|
||||
// If the item_update doesn't exist yet, create a new one.
|
||||
//
|
||||
match item_updates.get_mut(subscription_index) {
|
||||
Some(item_updates) => match item_updates.get_mut(item_index) {
|
||||
Some(item_update) => {
|
||||
//
|
||||
// Iterate changed_fields and update existing item_update.fields assigning the new values.
|
||||
//
|
||||
for (field_name, new_value) in &changed_fields {
|
||||
if item_update.fields.contains_key(field_name) {
|
||||
item_update.fields.insert((*field_name).clone(), new_value.clone());
|
||||
}
|
||||
}
|
||||
item_update.changed_fields = changed_fields;
|
||||
},
|
||||
None => {
|
||||
// Create a new item_update and add it to item_updates.
|
||||
let item_update = ItemUpdate {
|
||||
item_name: item.cloned(),
|
||||
item_pos: item_index,
|
||||
fields: field_map,
|
||||
changed_fields: changed_fields,
|
||||
is_snapshot: is_snapshot,
|
||||
};
|
||||
item_updates.push(item_update);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// Create a new item_update and add it to item_updates.
|
||||
let item_update = ItemUpdate {
|
||||
item_name: item.cloned(),
|
||||
item_pos: item_index,
|
||||
fields: field_map,
|
||||
changed_fields: changed_fields,
|
||||
is_snapshot: is_snapshot,
|
||||
};
|
||||
item_updates.push(vec![item_update]);
|
||||
}
|
||||
};
|
||||
|
||||
//println!("Item updates: {:?}", item_updates);
|
||||
|
||||
println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap());
|
||||
println!("\n\n");
|
||||
|
||||
if item_updates.len() >= 3 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/*
|
||||
if let Some(item_updates) = item_updates.get_mut(subscription_index) {
|
||||
if let Some(item_update) = item_updates.get_mut(item_index) {
|
||||
for (field_name, field_value) in changed_fields {
|
||||
item_update.set_field_value(field_name, field_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
//
|
||||
// Connection confirmation from server.
|
||||
//
|
||||
|
24
src/main.rs
24
src/main.rs
@ -60,11 +60,31 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let mut my_subscription = Subscription::new(
|
||||
SubscriptionMode::Merge,
|
||||
Some(vec![
|
||||
"item2".to_string(),
|
||||
"item1".to_string(),
|
||||
"item2".to_string(),
|
||||
"item3".to_string(),
|
||||
"item4".to_string(),
|
||||
"item5".to_string(),
|
||||
"item6".to_string(),
|
||||
"item7".to_string(),
|
||||
"item8".to_string(),
|
||||
"item9".to_string(),
|
||||
"item10".to_string(),
|
||||
]),
|
||||
Some(vec![
|
||||
"stock_name".to_string(),
|
||||
"last_price".to_string(),
|
||||
"time".to_string(),
|
||||
"pct_change".to_string(),
|
||||
"bid_quantity".to_string(),
|
||||
"bid".to_string(),
|
||||
"ask".to_string(),
|
||||
"ask_quantity".to_string(),
|
||||
"min".to_string(),
|
||||
"max".to_string(),
|
||||
"ref_price".to_string(),
|
||||
"open_price".to_string(),
|
||||
]),
|
||||
Some(vec!["stock_name".to_string(), "last_price".to_string()]),
|
||||
)?;
|
||||
|
||||
my_subscription.set_data_adapter(Some(String::from("QUOTE_ADAPTER")))?;
|
||||
|
Loading…
x
Reference in New Issue
Block a user