aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar daniloaz <daniloaz@gmail.com>2024-03-24 20:39:38 +0100
committerLibravatar daniloaz <daniloaz@gmail.com>2024-03-24 20:39:43 +0100
commit7e1eb27a06e5545b3d1b77b5998dc0463df27d70 (patch)
tree8c9569d9574e3321390521bf855ad9d39a187db2
parentdfd6b4a2b7494854096663be73dc9255ff14c7d8 (diff)
Created structure and scaffolding for the Lightstreamer client.
-rw-r--r--Cargo.toml5
-rw-r--r--src/client_listener.rs183
-rw-r--r--src/client_message_listener.rs69
-rw-r--r--src/connection_details.rs3
-rw-r--r--src/connection_options.rs147
-rw-r--r--src/item_update.rs428
-rw-r--r--src/lib.rs35
-rw-r--r--src/lightstreamer_client.rs458
-rw-r--r--src/main.rs87
-rw-r--r--src/proxy.rs3
-rw-r--r--src/subscription.rs840
-rw-r--r--src/subscription_listener.rs244
12 files changed, 2497 insertions, 5 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 140be77..a4354c3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,12 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+cookie = { version = "0", features = ["percent-encode"]}
futures = "0"
hyper = { version = "1", features = ["full"] }
reqwest = { version = "0", features = ["json", "stream"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
-tokio = { version = "1", features = ["full"] }
+serde_urlencoded = "0"
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+tokio-tungstenite = { version = "0", features = ["native-tls"] }
diff --git a/src/client_listener.rs b/src/client_listener.rs
new file mode 100644
index 0000000..06902aa
--- /dev/null
+++ b/src/client_listener.rs
@@ -0,0 +1,183 @@
+/// Interface to be implemented to listen to `LightstreamerClient` events comprehending notifications
+/// of connection activity and errors.
+///
+/// Events for these listeners are dispatched by a different thread than the one that generates them.
+/// This means that, upon reception of an event, it is possible that the internal state of the client
+/// has changed. On the other hand, all the notifications for a single `LightstreamerClient`,
+/// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener`
+/// will be dispatched by the same thread.
+pub trait ClientListener {
+ /// Event handler that receives a notification when the `ClientListener` instance is removed
+ /// from a `LightstreamerClient` through `LightstreamerClient.removeListener()`. This is the
+ /// last event to be fired on the listener.
+ fn on_listen_end(&self) {
+ // Implementation for on_listen_end
+ }
+
+ /// Event handler that receives a notification when the `ClientListener` instance is added
+ /// to a `LightstreamerClient` through `LightstreamerClient.addListener()`. This is the first
+ /// event to be fired on the listener.
+ fn on_listen_start(&self) {
+ // Implementation for on_listen_start
+ }
+
+ /// Event handler that receives a notification each time the value of a property of
+ /// `LightstreamerClient.connectionDetails` or `LightstreamerClient.connectionOptions` is changed.
+ ///
+ /// Properties of these objects can be modified by direct calls to them or by server sent events.
+ ///
+ /// # Parameters
+ ///
+ /// * `property`: the name of the changed property.
+ ///
+ /// Possible values are:
+ ///
+ /// - `adapterSet`
+ /// - `serverAddress`
+ /// - `user`
+ /// - `password`
+ /// - `serverInstanceAddress`
+ /// - `serverSocketName`
+ /// - `clientIp`
+ /// - `sessionId`
+ /// - `contentLength`
+ /// - `idleTimeout`
+ /// - `keepaliveInterval`
+ /// - `requestedMaxBandwidth`
+ /// - `realMaxBandwidth`
+ /// - `pollingInterval`
+ /// - `reconnectTimeout`
+ /// - `stalledTimeout`
+ /// - `retryDelay`
+ /// - `firstRetryMaxDelay`
+ /// - `slowingEnabled`
+ /// - `forcedTransport`
+ /// - `serverInstanceAddressIgnored`
+ /// - `reverseHeartbeatInterval`
+ /// - `earlyWSOpenEnabled`
+ /// - `httpExtraHeaders`
+ /// - `httpExtraHeadersOnSessionCreationOnly`
+ ///
+ /// See also `LightstreamerClient.connectionDetails`
+ ///
+ /// See also `LightstreamerClient.connectionOptions`
+ fn on_property_change(&self, property: &str) {
+ // Implementation for on_property_change
+ }
+
+ /// Event handler that is called when the Server notifies a refusal on the client attempt
+ /// to open a new connection or the interruption of a streaming connection. In both cases,
+ /// the `onStatusChange()` event handler has already been invoked with a "DISCONNECTED" status
+ /// and no recovery attempt has been performed. By setting a custom handler, however, it is
+ /// possible to override this and perform custom recovery actions.
+ ///
+ /// # Parameters
+ ///
+ /// * `code`: The error code. It can be one of the following:
+ /// - `1`: user/password check failed
+ /// - `2`: requested Adapter Set not available
+ /// - `7`: licensed maximum number of sessions reached (this can only happen with some licenses)
+ /// - `8`: configured maximum number of sessions reached
+ /// - `9`: configured maximum server load reached
+ /// - `10`: new sessions temporarily blocked
+ /// - `11`: streaming is not available because of Server license restrictions (this can only happen with special licenses).
+ /// - `21`: a request for this session has unexpectedly reached a wrong Server instance, which suggests that a routing issue may be in place.
+ /// - `30-41`: the current connection or the whole session has been closed by external agents; the possible cause may be:
+ /// - The session was closed on the Server side (via software or by the administrator) (32), or through a client "destroy" request (31);
+ /// - The Metadata Adapter imposes limits on the overall open sessions for the current user and has requested the closure of the current session upon opening of a new session for the same user on a different browser window (35);
+ /// - An unexpected error occurred on the Server while the session was in activity (33, 34);
+ /// - An unknown or unexpected cause; any code different from the ones identified in the above cases could be issued. A detailed description for the specific cause is currently not supplied (i.e. `errorMessage` is `None` in this case).
+ /// - `60`: this version of the client is not allowed by the current license terms.
+ /// - `61`: there was an error in the parsing of the server response thus the client cannot continue with the current session.
+ /// - `66`: an unexpected exception was thrown by the Metadata Adapter while authorizing the connection.
+ /// - `68`: the Server could not open or continue with the session because of an internal error.
+ /// - `70`: an unusable port was configured on the server address.
+ /// - `71`: this kind of client is not allowed by the current license terms.
+ /// - `<= 0`: the Metadata Adapter has refused the user connection; the code value is dependent on the specific Metadata Adapter implementation
+ /// * `message`: The description of the error as sent by the Server.
+ ///
+ /// See also `onStatusChange()`
+ ///
+ /// See also `ConnectionDetails.setAdapterSet()`
+ fn on_server_error(&self, code: i32, message: &str) {
+ // Implementation for on_server_error
+ }
+
+ /// Event handler that receives a notification each time the `LightstreamerClient` status has changed.
+ /// The status changes may be originated either by custom actions (e.g. by calling `LightstreamerClient.disconnect()`)
+ /// or by internal actions.
+ ///
+ /// The normal cases are the following:
+ ///
+ /// After issuing `connect()` when the current status is `DISCONNECTED*`, the client will switch to `CONNECTING`
+ /// first and to `CONNECTED:STREAM-SENSING` as soon as the pre-flight request receives its answer. As soon as
+ /// the new session is established, it will switch to `CONNECTED:WS-STREAMING` if the environment permits WebSockets;
+ /// otherwise it will switch to `CONNECTED:HTTP-STREAMING` if the environment permits streaming or to
+ /// `CONNECTED:HTTP-POLLING` as a last resort.
+ ///
+ /// On the other hand, after issuing `connect` when the status is already `CONNECTED:*` a switch to `CONNECTING`
+ /// is usually not needed and the current session is kept.
+ ///
+ /// After issuing `LightstreamerClient.disconnect()`, the status will switch to `DISCONNECTED`.
+ ///
+ /// In case of a server connection refusal, the status may switch from `CONNECTING` directly to `DISCONNECTED`.
+ /// After that, the `onServerError()` event handler will be invoked.
+ ///
+ /// Possible special cases are the following:
+ ///
+ /// - In case of Server unavailability during streaming, the status may switch from `CONNECTED:*-STREAMING` to
+ /// `STALLED` (see `ConnectionOptions.setStalledTimeout()`). If the unavailability ceases, the status will
+ /// switch back to `CONNECTED:*-STREAMING`; otherwise, if the unavailability persists (see `ConnectionOptions.setReconnectTimeout()`),
+ /// the status will switch to `DISCONNECTED:TRYING-RECOVERY` and eventually to `CONNECTED:*-STREAMING`.
+ /// - In case the connection or the whole session is forcibly closed by the Server, the status may switch from
+ /// `CONNECTED:*-STREAMING` or `CONNECTED:*-POLLING` directly to `DISCONNECTED`. After that, the `onServerError()`
+ /// event handler will be invoked.
+ /// - Depending on the setting in `ConnectionOptions.setSlowingEnabled()`, in case of slow update processing,
+ /// the status may switch from `CONNECTED:WS-STREAMING` to `CONNECTED:WS-POLLING` or from `CONNECTED:HTTP-STREAMING`
+ /// to `CONNECTED:HTTP-POLLING`.
+ /// - If the status is `CONNECTED:*-POLLING` and any problem during an intermediate poll occurs, the status may
+ /// switch to `CONNECTING` and eventually to `CONNECTED:*-POLLING`. The same may hold for the `CONNECTED:*-STREAMING`
+ /// case, when a rebind is needed.
+ /// - In case a forced transport was set through `ConnectionOptions.setForcedTransport()`, only the related final
+ /// status or statuses are possible.
+ /// - In case of connection problems, the status may switch from any value to `DISCONNECTED:WILL-RETRY`
+ /// (see `ConnectionOptions.setRetryDelay()`), then to `CONNECTING` and a new attempt will start. However,
+ /// in most cases, the client will try to recover the current session; hence, the `DISCONNECTED:TRYING-RECOVERY`
+ /// status will be entered and the recovery attempt will start.
+ /// - In case of connection problems during a recovery attempt, the status may stay in `DISCONNECTED:TRYING-RECOVERY`
+ /// for long time, while further attempts are made. If the recovery is no longer possible, the current session
+ /// will be abandoned and the status will switch to `DISCONNECTED:WILL-RETRY` before the next attempts.
+ ///
+ /// By setting a custom handler it is possible to perform actions related to connection and disconnection
+ /// occurrences. Note that `LightstreamerClient.connect()` and `LightstreamerClient.disconnect()`, as any other
+ /// method, can be issued directly from within a handler.
+ ///
+ /// # Parameters
+ ///
+ /// * `status`: The new status. It can be one of the following values:
+ /// - `CONNECTING`: the client has started a connection attempt and is waiting for a Server answer.
+ /// - `CONNECTED:STREAM-SENSING`: the client received a first response from the server and is now evaluating
+ /// if a streaming connection is fully functional.
+ /// - `CONNECTED:WS-STREAMING`: a streaming connection over WebSocket has been established.
+ /// - `CONNECTED:HTTP-STREAMING`: a streaming connection over HTTP has been established.
+ /// - `CONNECTED:WS-POLLING`: a polling connection over WebSocket has been started. Note that, unlike polling
+ /// over HTTP, in this case only one connection is actually opened (see `ConnectionOptions.setSlowingEnabled()`).
+ /// - `CONNECTED:HTTP-POLLING`: a polling connection over HTTP has been started.
+ /// - `STALLED`: a streaming session has been silent for a while, the status will eventually return to its
+ /// previous `CONNECTED:*-STREAMING` status or will switch to `DISCONNECTED:WILL-RETRY` / `DISCONNECTED:TRYING-RECOVERY`.
+ /// - `DISCONNECTED:WILL-RETRY`: a connection or connection attempt has been closed; a new attempt will be
+ /// performed (possibly after a timeout).
+ /// - `DISCONNECTED:TRYING-RECOVERY`: a connection has been closed and the client has started a connection
+ /// attempt and is waiting for a Server answer; if successful, the underlying session will be kept.
+ /// - `DISCONNECTED`: a connection or connection attempt has been closed. The client will not connect anymore
+ /// until a new `LightstreamerClient.connect()` call is issued.
+ ///
+ /// See also `LightstreamerClient.connect()`
+ ///
+ /// See also `LightstreamerClient.disconnect()`
+ ///
+ /// See also `LightstreamerClient.getStatus()`
+ fn on_status_change(&self, status: &str) {
+ // Implementation for on_status_change
+ }
+} \ No newline at end of file
diff --git a/src/client_message_listener.rs b/src/client_message_listener.rs
new file mode 100644
index 0000000..ccf5eda
--- /dev/null
+++ b/src/client_message_listener.rs
@@ -0,0 +1,69 @@
+/// Interface to be implemented to listen to `LightstreamerClient.sendMessage()` events reporting
+/// a message processing outcome. Events for these listeners are dispatched by a different
+/// thread than the one that generates them. All the notifications for a single `LightstreamerClient`,
+/// including notifications to `ClientListener`, `SubscriptionListener` and `ClientMessageListener`
+/// will be dispatched by the same thread. Only one event per message is fired on this listener.
+pub trait ClientMessageListener {
+ /// Event handler that is called by Lightstreamer when any notifications of the processing
+ /// outcome of the related message haven't been received yet and can no longer be received.
+ /// Typically, this happens after the session has been closed. In this case, the client has
+ /// no way of knowing the processing outcome and any outcome is possible.
+ ///
+ /// # Parameters
+ ///
+ /// * `msg`: the message to which this notification is related.
+ /// * `sent_on_network`: `true` if the message was sent on the network, `false` otherwise.
+ /// Even if the flag is `true`, it is not possible to infer whether the message actually
+ /// reached the Lightstreamer Server or not.
+ fn on_abort(&self, msg: &str, sent_on_network: bool) {
+ // Implementation for on_abort
+ }
+
+ /// Event handler that is called by Lightstreamer when the related message has been processed
+ /// by the Server but the expected processing outcome could not be achieved for any reason.
+ ///
+ /// # Parameters
+ ///
+ /// * `msg`: the message to which this notification is related.
+ /// * `code`: the error code sent by the Server. It can be one of the following:
+ /// - `<= 0`: the Metadata Adapter has refused the message; the code value is dependent
+ /// on the specific Metadata Adapter implementation.
+ /// * `error`: the description of the error sent by the Server.
+ fn on_deny(&self, msg: &str, code: i32, error: &str) {
+ // Implementation for on_deny
+ }
+
+ /// Event handler that is called by Lightstreamer to notify that the related message has
+ /// been discarded by the Server. This means that the message has not reached the Metadata
+ /// Adapter and the message next in the sequence is considered enabled for processing.
+ ///
+ /// # Parameters
+ ///
+ /// * `msg`: the message to which this notification is related.
+ fn on_discarded(&self, msg: &str) {
+ // Implementation for on_discarded
+ }
+
+ /// Event handler that is called by Lightstreamer when the related message has been processed
+ /// by the Server but the processing has failed for any reason. The level of completion of
+ /// the processing by the Metadata Adapter cannot be determined.
+ ///
+ /// # Parameters
+ ///
+ /// * `msg`: the message to which this notification is related.
+ fn on_error(&self, msg: &str) {
+ // Implementation for on_error
+ }
+
+ /// Event handler that is called by Lightstreamer when the related message has been processed
+ /// by the Server with success.
+ ///
+ /// # Parameters
+ ///
+ /// * `msg`: the message to which this notification is related.
+ /// * `response`: the response from the Metadata Adapter. If not supplied (i.e. supplied as `None`),
+ /// an empty message is received here.
+ fn on_processed(&self, msg: &str, response: Option<&str>) {
+ // Implementation for on_processed
+ }
+} \ No newline at end of file
diff --git a/src/connection_details.rs b/src/connection_details.rs
new file mode 100644
index 0000000..03f33f5
--- /dev/null
+++ b/src/connection_details.rs
@@ -0,0 +1,3 @@
+pub struct ConnectionDetails {
+
+} \ No newline at end of file
diff --git a/src/connection_options.rs b/src/connection_options.rs
new file mode 100644
index 0000000..aefb0c0
--- /dev/null
+++ b/src/connection_options.rs
@@ -0,0 +1,147 @@
+use crate::proxy::Proxy;
+
+/// Used by LightstreamerClient to provide an extra connection properties data object.
+/// Data struct that contains the policy settings used to connect to a Lightstreamer Server.
+/// An instance of this struct is attached to every LightstreamerClient as connection_options.
+pub struct ConnectionOptions {
+ content_length: Option<usize>,
+ first_retry_max_delay: Option<u64>,
+ forced_transport: Option<String>,
+ http_extra_headers: Option<Vec<(String, String)>>,
+ http_extra_headers_on_session_creation_only: Option<bool>,
+ idle_timeout: Option<u64>,
+ keepalive_interval: Option<u64>,
+ polling_interval: Option<u64>,
+ proxy: Option<Proxy>,
+ real_max_bandwidth: Option<u64>,
+ reconnect_timeout: Option<u64>,
+ requested_max_bandwidth: Option<u64>,
+ retry_delay: Option<u64>,
+ reverse_heartbeat_interval: Option<u64>,
+ server_instance_address_ignored: Option<bool>,
+ session_recovery_timeout: Option<u64>,
+ slowing_enabled: Option<bool>,
+ stalled_timeout: Option<u64>,
+}
+
+impl ConnectionOptions {
+ /// Creates a new ConnectionOptions object with default values.
+ pub fn new() -> ConnectionOptions {
+ ConnectionOptions::default()
+ }
+
+ /// Sets the content length.
+ pub fn set_content_length(&mut self, content_length: usize) {
+ self.content_length = Some(content_length);
+ }
+
+ /// Sets the first retry max delay.
+ pub fn set_first_retry_max_delay(&mut self, first_retry_max_delay: u64) {
+ self.first_retry_max_delay = Some(first_retry_max_delay);
+ }
+
+ /// Sets the forced transport.
+ pub fn set_forced_transport(&mut self, forced_transport: String) {
+ self.forced_transport = Some(forced_transport);
+ }
+
+ /// Sets the HTTP extra headers.
+ pub fn set_http_extra_headers(&mut self, http_extra_headers: Vec<(String, String)>) {
+ self.http_extra_headers = Some(http_extra_headers);
+ }
+
+ /// Sets the HTTP extra headers on session creation only.
+ pub fn set_http_extra_headers_on_session_creation_only(&mut self, http_extra_headers_on_session_creation_only: bool) {
+ self.http_extra_headers_on_session_creation_only = Some(http_extra_headers_on_session_creation_only);
+ }
+
+ /// Sets the idle timeout.
+ pub fn set_idle_timeout(&mut self, idle_timeout: u64) {
+ self.idle_timeout = Some(idle_timeout);
+ }
+
+ /// Sets the keepalive interval.
+ pub fn set_keepalive_interval(&mut self, keepalive_interval: u64) {
+ self.keepalive_interval = Some(keepalive_interval);
+ }
+
+ /// Sets the polling interval.
+ pub fn set_polling_interval(&mut self, polling_interval: u64) {
+ self.polling_interval = Some(polling_interval);
+ }
+
+ /// Sets the proxy.
+ pub fn set_proxy(&mut self, proxy: Proxy) {
+ self.proxy = Some(proxy);
+ }
+
+ /// Sets the real max bandwidth.
+ pub fn set_real_max_bandwidth(&mut self, real_max_bandwidth: u64) {
+ self.real_max_bandwidth = Some(real_max_bandwidth);
+ }
+
+ /// Sets the reconnect timeout.
+ pub fn set_reconnect_timeout(&mut self, reconnect_timeout: u64) {
+ self.reconnect_timeout = Some(reconnect_timeout);
+ }
+
+ /// Sets the requested max bandwidth.
+ pub fn set_requested_max_bandwidth(&mut self, requested: u64) {
+ self.requested_max_bandwidth = Some(requested);
+ }
+
+ /// Sets the retry delay.
+ pub fn set_retry_delay(&mut self, retry_delay: u64) {
+ self.retry_delay = Some(retry_delay);
+ }
+
+ /// Sets the reverse heartbeat interval.
+ pub fn set_reverse_heartbeat_interval(&mut self, reverse_heartbeat_interval: u64) {
+ self.reverse_heartbeat_interval = Some(reverse_heartbeat_interval);
+ }
+
+ /// Sets the server instance address ignored.
+ pub fn set_server_instance_address_ignored(&mut self, server_instance_address_ignored: bool) {
+ self.server_instance_address_ignored = Some(server_instance_address_ignored);
+ }
+
+ /// Sets the session recovery timeout.
+ pub fn set_session_recovery_timeout(&mut self, session_recovery_timeout: u64) {
+ self.session_recovery_timeout = Some(session_recovery_timeout);
+ }
+
+ /// Sets the slowing enabled.
+ pub fn set_slowing_enabled(&mut self, slowing_enabled: bool) {
+ self.slowing_enabled = Some(slowing_enabled);
+ }
+
+ /// Sets the stalled timeout.
+ pub fn set_stalled_timeout(&mut self, stalled_timeout: u64) {
+ self.stalled_timeout = Some(stalled_timeout);
+ }
+}
+
+impl Default for ConnectionOptions {
+ fn default() -> Self {
+ ConnectionOptions {
+ content_length: None,
+ first_retry_max_delay: Some(100),
+ forced_transport: None,
+ http_extra_headers: None,
+ http_extra_headers_on_session_creation_only: Some(false),
+ idle_timeout: Some(19000),
+ keepalive_interval: Some(0),
+ polling_interval: Some(0),
+ proxy: None,
+ real_max_bandwidth: None,
+ reconnect_timeout: Some(3000),
+ requested_max_bandwidth: None,
+ retry_delay: Some(4000),
+ reverse_heartbeat_interval: Some(0),
+ server_instance_address_ignored: Some(false),
+ session_recovery_timeout: Some(15000),
+ slowing_enabled: Some(false),
+ stalled_timeout: Some(2000),
+ }
+ }
+} \ No newline at end of file
diff --git a/src/item_update.rs b/src/item_update.rs
new file mode 100644
index 0000000..041526e
--- /dev/null
+++ b/src/item_update.rs
@@ -0,0 +1,428 @@
+use std::collections::HashMap;
+use serde_json::Value;
+
+/// Contains all the information related to an update of the field values for an item.
+/// It reports all the new values of the fields.
+///
+/// If the involved Subscription is a COMMAND Subscription, then the values for the current update
+/// are meant as relative to the same key.
+///
+/// Moreover, if the involved Subscription has a two-level behavior enabled, then each update may
+/// be associated with either a first-level or a second-level item. In this case, the reported fields
+/// are always the union of the first-level and second-level fields, and each single update can only
+/// change either the first-level or the second-level fields (but for the "command" field, which is
+/// first-level and is always set to "UPDATE" upon a second-level update). When the two-level behavior
+/// is enabled, in all methods where a field name has to be supplied, the following convention should
+/// be followed:
+///
+/// - The field name can always be used, both for the first-level and the second-level fields.
+/// In case of name conflict, the first-level field is meant.
+/// - The field position can always be used; however, the field positions for the second-level fields
+/// start at the highest position of the first-level field list + 1. If a field schema had been
+/// specified for either first-level or second-level Subscriptions, then client-side knowledge of
+/// the first-level schema length would be required.
+pub struct ItemUpdate {
+ changed_fields: HashMap<String, Value>,
+ fields: HashMap<String, Value>,
+ item_name: Option<String>,
+ item_pos: usize,
+ is_snapshot: bool,
+ prev_values: HashMap<String, Value>,
+}
+
+impl ItemUpdate {
+ /// Returns a map containing the values for each field changed with the last server update.
+ /// The related field name is used as key for the values in the map.
+ ///
+ /// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed
+ /// fields are meant as relative to the previous update for the same key. On such tables if a
+ /// DELETE command is received, all the fields, excluding the key field, will be present as
+ /// changed, with None value. All of this is also true on tables that have the two-level behavior
+ /// enabled, but in case of DELETE commands second-level fields will not be iterated.
+ ///
+ /// # Errors
+ ///
+ /// Returns an `IllegalStateException` if the Subscription was initialized using a field schema.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// let item_update = ItemUpdate {
+ /// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// let changed_fields = item_update.get_changed_fields();
+ /// assert_eq!(changed_fields.len(), 2);
+ /// assert_eq!(changed_fields.get("foo"), Some(&json!(42)));
+ /// assert_eq!(changed_fields.get("bar"), Some(&json!("baz")));
+ /// ```
+ pub fn get_changed_fields(&self) -> &HashMap<String, Value> {
+ &self.changed_fields
+ }
+
+ /// Returns a map containing the values for each field changed with the last server update.
+ /// The 1-based field position within the field schema or field list is used as key for the
+ /// values in the map.
+ ///
+ /// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed
+ /// fields are meant as relative to the previous update for the same key. On such tables if a
+ /// DELETE command is received, all the fields, excluding the key field, will be present as
+ /// changed, with None value. All of this is also true on tables that have the two-level behavior
+ /// enabled, but in case of DELETE commands second-level fields will not be iterated.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// use std::collections::HashMap;
+ /// let mut changed_fields_by_pos = HashMap::new();
+ /// changed_fields_by_pos.insert(1, json!(42));
+ /// changed_fields_by_pos.insert(2, json!("baz"));
+ /// let item_update = ItemUpdate {
+ /// changed_fields: changed_fields_by_pos,
+ /// fields: HashMap::new(),
+ /// item_name: None,
+ /// item_pos: 0,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// let changed_fields = item_update.get_changed_fields_by_position();
+ /// assert_eq!(changed_fields.len(), 2);
+ /// assert_eq!(changed_fields.get(&1), Some(&json!(42)));
+ /// assert_eq!(changed_fields.get(&2), Some(&json!("baz")));
+ /// ```
+ pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Value> {
+ // Convert the changed_fields HashMap to a HashMap with usize keys
+ let changed_fields_by_pos: HashMap<usize, Value> = self
+ .changed_fields
+ .iter()
+ .enumerate()
+ .map(|(i, (k, v))| (i + 1, v.clone()))
+ .collect();
+ changed_fields_by_pos
+ }
+
+ /// Returns a map containing the values for each field in the Subscription.
+ /// The related field name is used as key for the values in the map.
+ ///
+ /// # Errors
+ ///
+ /// Returns an `IllegalStateException` if the Subscription was initialized using a field schema.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// let item_update = ItemUpdate {
+ /// changed_fields: HashMap::new(),
+ /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// let fields = item_update.get_fields();
+ /// assert_eq!(fields.len(), 2);
+ /// assert_eq!(fields.get("foo"), Some(&json!(42)));
+ /// assert_eq!(fields.get("bar"), Some(&json!("baz")));
+ /// ```
+ pub fn get_fields(&self) -> &HashMap<String, Value> {
+ &self.fields
+ }
+
+ /// Returns a map containing the values for each field in the Subscription.
+ /// The 1-based field position within the field schema or field list is used as key for the
+ /// values in the map.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// use std::collections::HashMap;
+ /// let mut fields_by_pos = HashMap::new();
+ /// fields_by_pos.insert(1, json!(42));
+ /// fields_by_pos.insert(2, json!("baz"));
+ /// let item_update = ItemUpdate {
+ /// changed_fields: HashMap::new(),
+ /// fields: fields_by_pos,
+ /// item_name: None,
+ /// item_pos: 0,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// let fields = item_update.get_fields_by_position();
+ /// assert_eq!(fields.len(), 2);
+ /// assert_eq!(fields.get(&1), Some(&json!(42)));
+ /// assert_eq!(fields.get(&2), Some(&json!("baz")));
+ /// ```
+ pub fn get_fields_by_position(&self) -> &HashMap<String, Value> {
+ &self.fields
+ }
+
+ /// Inquiry method that retrieves the name of the item to which this update pertains.
+ ///
+ /// The name will be `None` if the related Subscription was initialized using an "Item Group".
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let item_update = ItemUpdate {
+ /// changed_fields: HashMap::new(),
+ /// fields: HashMap::new(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// assert_eq!(item_update.get_item_name(), Some("item1".to_string()));
+ /// ```
+ pub fn get_item_name(&self) -> Option<&String> {
+ self.item_name.as_ref()
+ }
+
+ /// Inquiry method that retrieves the position in the "Item List" or "Item Group" of the item
+ /// to which this update pertains.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let item_update = ItemUpdate {
+ /// changed_fields: HashMap::new(),
+ /// fields: HashMap::new(),
+ /// item_name: None,
+ /// item_pos: 5,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// assert_eq!(item_update.get_item_pos(), 5);
+ /// ```
+ pub fn get_item_pos(&self) -> usize {
+ self.item_pos
+ }
+
+ /// Inquiry method that gets the value for a specified field, as received from the Server with
+ /// the current or previous update.
+ ///
+ /// # Errors
+ ///
+ /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
+ ///
+ /// # Parameters
+ ///
+ /// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field
+ /// List" or "Field Schema".
+ ///
+ /// # Returns
+ ///
+ /// The value of the specified field; it can be `None` in the following cases:
+ /// - A `None` value has been received from the Server, as `None` is a possible value for a field.
+ /// - No value has been received for the field yet.
+ /// - The item is subscribed to with the COMMAND mode and a DELETE command is received (only the
+ /// fields used to carry key and command information are valued).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// let item_update = ItemUpdate {
+ /// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// assert_eq!(item_update.get_value("foo"), Some(json!(42)));
+ /// assert_eq!(item_update.get_value("bar"), Some(json!("baz")));
+ /// assert_eq!(item_update.get_value(1), Some(json!(42)));
+ /// assert_eq!(item_update.get_value(2), Some(json!("baz")));
+ /// ```
+ pub fn get_value(&self, field_name_or_pos: &str) -> Option<&Value> {
+ self.fields.get(field_name_or_pos)
+ }
+
+ /// Inquiry method that gets the difference between the new value and the previous one as a
+ /// JSON Patch structure, provided that the Server has used the JSON Patch format to send this
+ /// difference, as part of the "delta delivery" mechanism. This, in turn, requires that:
+ ///
+ /// - The Data Adapter has explicitly indicated JSON Patch as the privileged type of compression
+ /// for this field.
+ /// - Both the previous and new value are suitable for the JSON Patch computation (i.e. they are
+ /// valid JSON representations).
+ /// - The item was subscribed to in MERGE or DISTINCT mode (note that, in case of two-level
+ /// behavior, this holds for all fields related with second-level items, as these items are in
+ /// MERGE mode).
+ /// - Sending the JSON Patch difference has been evaluated by the Server as more efficient than
+ /// sending the full new value.
+ ///
+ /// Note that the last condition can be enforced by leveraging the Server's `<jsonpatch_min_length>`
+ /// configuration flag, so that the availability of the JSON Patch form would only depend on the
+ /// Client and the Data Adapter.
+ ///
+ /// When the above conditions are not met, the method just returns `None`; in this case, the
+ /// new value can only be determined through `ItemUpdate::get_value()`. For instance, this will
+ /// always be needed to get the first value received.
+ ///
+ /// # Errors
+ ///
+ /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
+ ///
+ /// # Parameters
+ ///
+ /// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field
+ /// List" or "Field Schema".
+ ///
+ /// # Returns
+ ///
+ /// A JSON Patch structure representing the difference between the new value and the previous one,
+ /// or `None` if the difference in JSON Patch format is not available for any reason.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::{json, Value};
+ /// let mut item_update = ItemUpdate {
+ /// changed_fields: vec![("foo".to_string(), json!(42))]
+ /// .into_iter()
+ /// .collect(),
+ /// fields: vec![("foo".to_string(), json!(42))]
+ /// .into_iter()
+ /// .collect(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: vec![("foo".to_string(), json!(41))]
+ /// .into_iter()
+ /// .collect(),
+ /// };
+ ///
+ /// // Assuming the Server sends a JSON Patch for the "foo" field
+ /// let json_patch: Value = json!([
+ /// { "op": "replace", "path": "/foo", "value": 42 }
+ /// ]);
+ /// item_update.changed_fields.insert("foo".to_string(), json_patch.clone());
+ ///
+ /// assert_eq!(
+ /// item_update.get_value_as_json_patch_if_available("foo"),
+ /// Some(&json_patch)
+ /// );
+ /// ```
+ pub fn get_value_as_json_patch_if_available(&self, field_name_or_pos: &str) -> Option<&Value> {
+ self.changed_fields.get(field_name_or_pos)
+ }
+
+ /// Inquiry method that asks whether the current update belongs to the item snapshot (which carries
+ /// the current item state at the time of Subscription). Snapshot events are sent only if snapshot
+ /// information was requested for the items through `Subscription::set_requested_snapshot()` and
+ /// precede the real time events. Snapshot information take different forms in different subscription
+ /// modes and can be spanned across zero, one or several update events. In particular:
+ ///
+ /// - If the item is subscribed to with the RAW subscription mode, then no snapshot is sent by
+ /// the Server.
+ /// - If the item is subscribed to with the MERGE subscription mode, then the snapshot consists
+ /// of exactly one event, carrying the current value for all fields.
+ /// - If the item is subscribed to with the DISTINCT subscription mode, then the snapshot consists
+ /// of some of the most recent updates; these updates are as many as specified through
+ /// `Subscription::set_requested_snapshot()`, unless fewer are available.
+ /// - If the item is subscribed to with the COMMAND subscription mode, then the snapshot consists
+ /// of an "ADD" event for each key that is currently present.
+ ///
+ /// Note that, in case of two-level behavior, snapshot-related updates for both the first-level
+ /// item (which is in COMMAND mode) and any second-level items (which are in MERGE mode) are
+ /// qualified with this flag.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let item_update = ItemUpdate {
+ /// changed_fields: HashMap::new(),
+ /// fields: HashMap::new(),
+ /// item_name: None,
+ /// item_pos: 0,
+ /// is_snapshot: true,
+ /// prev_values: HashMap::new(),
+ /// };
+ /// assert!(item_update.is_snapshot());
+ /// ```
+ pub fn is_snapshot(&self) -> bool {
+ self.is_snapshot
+ }
+
+ /// Inquiry method that asks whether the value for a field has changed after the reception of
+ /// the last update from the Server for an item. If the Subscription mode is COMMAND then the
+ /// change is meant as relative to the same key.
+ ///
+ /// # Parameters
+ ///
+ /// - `field_name_or_pos`: The field name or the 1-based position of the field within the field
+ /// list or field schema.
+ ///
+ /// # Returns
+ ///
+ /// Unless the Subscription mode is COMMAND, the return value is `true` in the following cases:
+ ///
+ /// - It is the first update for the item.
+ /// - The new field value is different than the previous field value received for the item.
+ ///
+ /// If the Subscription mode is COMMAND, the return value is `true` in the following cases:
+ ///
+ /// - It is the first update for the involved key value (i.e. the event carries an "ADD" command).
+ /// - The new field value is different than the previous field value received for the item,
+ /// relative to the same key value (the event must carry an "UPDATE" command).
+ /// - The event carries a "DELETE" command (this applies to all fields other than the field used
+ /// to carry key information).
+ ///
+ /// In all other cases, the return value is `false`.
+ ///
+ /// # Errors
+ ///
+ /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use serde_json::json;
+ /// let item_update = ItemUpdate {
+ /// changed_fields: vec![("foo".to_string(), json!(42))]
+ /// .into_iter()
+ /// .collect(),
+ /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))]
+ /// .into_iter()
+ /// .collect(),
+ /// item_name: Some("item1".to_string()),
+ /// item_pos: 1,
+ /// is_snapshot: false,
+ /// prev_values: vec![("foo".to_string(), json!(41))]
+ /// .into_iter()
+ /// .collect(),
+ /// };
+ /// assert!(item_update.is_value_changed("foo"));
+ /// assert!(!item_update.is_value_changed("bar"));
+ /// ```
+ pub fn is_value_changed(&self, field_name_or_pos: &str) -> bool {
+ if let Some(new_value) = self.fields.get(field_name_or_pos) {
+ if let Some(prev_value) = self.prev_values.get(field_name_or_pos) {
+ return new_value != prev_value;
+ } else {
+ // This is the first update for the item
+ return true;
+ }
+ }
+ false
+ }
+} \ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..c7ea164
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,35 @@
+use std::fmt;
+use std::error::Error;
+
+pub mod client_listener;
+pub mod client_message_listener;
+pub mod item_update;
+pub mod subscription_listener;
+pub mod connection_details;
+pub mod connection_options;
+pub mod lightstreamer_client;
+pub mod proxy;
+pub mod subscription;
+
+#[derive(Debug)]
+pub struct IllegalStateException {
+ details: String
+}
+
+impl IllegalStateException {
+ pub fn new(msg: &str) -> IllegalStateException {
+ IllegalStateException{details: msg.to_string()}
+ }
+}
+
+impl fmt::Display for IllegalStateException {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f,"{}",self.details)
+ }
+}
+
+impl Error for IllegalStateException {
+ fn description(&self) -> &str {
+ &self.details
+ }
+} \ No newline at end of file
diff --git a/src/lightstreamer_client.rs b/src/lightstreamer_client.rs
new file mode 100644
index 0000000..2012c28
--- /dev/null
+++ b/src/lightstreamer_client.rs
@@ -0,0 +1,458 @@
+use crate::client_listener::ClientListener;
+use crate::client_message_listener::ClientMessageListener;
+use crate::connection_details::ConnectionDetails;
+use crate::connection_options::ConnectionOptions;
+use crate::subscription::Subscription;
+use crate::IllegalStateException;
+
+use cookie::Cookie;
+
+/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
+/// configuration settings, event handlers, operations for the control of the connection lifecycle,
+/// Subscription handling and to send messages.
+///
+/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
+/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
+/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
+/// a single instance of `LightstreamerClient` is needed.
+///
+/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
+/// endpoints.
+///
+/// You can listen to the events generated by a session by registering an event listener, such as
+/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
+/// such as session creation, connection status, subscription updates, and server messages. However,
+/// you should be aware that the event notifications are dispatched by a single thread, the so-called
+/// event thread. This means that if the operations of a listener are slow or blocking, they will
+/// delay the processing of the other listeners and affect the performance of your application.
+/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
+/// the listener methods as fast and simple as possible. Note that even if you create multiple
+/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
+/// among them.
+///
+/// # Parameters
+///
+/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
+/// will connect to. It is possible to specify it later by using `None` here. See
+/// `ConnectionDetails.setServerAddress()` for details.
+/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
+/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
+/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
+/// for details.
+///
+/// # Raises
+///
+/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
+/// for details.
+pub struct LightstreamerClient {
+ server_address: Option<String>,
+ adapter_set: Option<String>,
+ /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
+ /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
+ /// of this object can be overwritten by values received from a Lightstreamer Server.
+ pub connection_details: ConnectionDetails,
+ /// Data object that contains options and policies for the connection to the server. This instance
+ /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
+ /// can be overwritten by values received from a Lightstreamer Server.
+ pub connection_options: ConnectionOptions,
+ listeners: Vec<Box<dyn ClientListener>>,
+ subscriptions: Vec<Subscription>,
+}
+
+impl LightstreamerClient {
+ /// A constant string representing the name of the library.
+ pub const LIB_NAME: &'static str = "rust_client";
+
+ /// A constant string representing the version of the library.
+ pub const LIB_VERSION: &'static str = "0.1.0";
+
+ /// Static method that can be used to share cookies between connections to the Server (performed by
+ /// this library) and connections to other sites that are performed by the application. With this
+ /// method, cookies received by the application can be added (or replaced if already present) to
+ /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
+ /// is compatible with the Server domain will be used internally.
+ ///
+ /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
+ /// However it can be invoked at any time; it will affect the internal cookie set immediately
+ /// and the sending of cookies on the next HTTP request or WebSocket establishment.
+ ///
+ /// # Parameters
+ ///
+ /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
+ /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
+ ///
+ /// See also `getCookies()`
+ pub fn add_cookies(uri: &str, cookies: &Cookie) {
+ // Implementation for add_cookies
+ }
+
+ /// Adds a listener that will receive events from the `LightstreamerClient` instance.
+ ///
+ /// The same listener can be added to several different `LightstreamerClient` instances.
+ ///
+ /// A listener can be added at any time. A call to add a listener already present will be ignored.
+ ///
+ /// # Parameters
+ ///
+ /// * `listener`: An object that will receive the events as documented in the `ClientListener`
+ /// interface.
+ ///
+ /// See also `removeListener()`
+ pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
+ self.listeners.push(listener);
+ }
+
+ /// Operation method that requests to open a Session against the configured Lightstreamer Server.
+ ///
+ /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
+ /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
+ /// for some seconds from the streaming connection, then it will automatically open a polling
+ /// connection.
+ ///
+ /// A polling connection may also be opened if the environment is not suitable for a streaming
+ /// connection.
+ ///
+ /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
+ /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
+ ///
+ /// Note that the request to connect is accomplished by the client in a separate thread; this
+ /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
+ /// yet.
+ ///
+ /// When the request to connect is finally being executed, if the current status of the client
+ /// is not `DISCONNECTED`, then nothing will be done.
+ ///
+ /// # Raises
+ ///
+ /// * `IllegalStateException`: if no server address was configured.
+ ///
+ /// See also `getStatus()`
+ ///
+ /// See also `disconnect()`
+ ///
+ /// See also `ClientListener.onStatusChange()`
+ ///
+ /// See also `ConnectionDetails.setServerAddress()`
+ pub fn connect(&mut self) -> Result<(), IllegalStateException> {
+ if self.server_address.is_none() {
+ return Err(IllegalStateException::new("No server address was configured."));
+ }
+
+ // Implementation for connect
+ Ok(())
+ }
+
+ /// Operation method that requests to close the Session opened against the configured Lightstreamer
+ /// Server (if any).
+ ///
+ /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
+ ///
+ /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
+ /// are preserved to be re-subscribed to on future Sessions.
+ ///
+ /// Note that the request to disconnect is accomplished by the client in a separate thread; this
+ /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
+ /// change yet.
+ ///
+ /// When the request to disconnect is finally being executed, if the status of the client is
+ /// "DISCONNECTED", then nothing will be done.
+ ///
+ /// See also `connect()`
+ pub fn disconnect(&mut self) {
+ // Implementation for disconnect
+ }
+
+ /// Static inquiry method that can be used to share cookies between connections to the Server
+ /// (performed by this library) and connections to other sites that are performed by the application.
+ /// With this method, cookies received from the Server can be extracted for sending through other
+ /// connections, according with the URI to be accessed.
+ ///
+ /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
+ /// when not.
+ ///
+ /// # Parameters
+ ///
+ /// * `uri`: the URI to which the cookies should be sent, or `None`.
+ ///
+ /// # Returns
+ ///
+ /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
+ /// If a `None` URI was supplied, all available non-expired cookies will be returned.
+ pub fn get_cookies(uri: Option<&str>) -> Cookie {
+ // Implementation for get_cookies
+ unimplemented!()
+ }
+
+ /// Returns a list containing the `ClientListener` instances that were added to this client.
+ ///
+ /// # Returns
+ ///
+ /// A list containing the listeners that were added to this client.
+ ///
+ /// See also `addListener()`
+ pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
+ &self.listeners
+ }
+
+ /// Inquiry method that gets the current client status and transport (when applicable).
+ ///
+ /// # Returns
+ ///
+ /// The current client status. It can be one of the following values:
+ ///
+ /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
+ /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
+ /// and is currently verifying if a streaming connection is possible;
+ /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
+ /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
+ /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
+ /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
+ /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
+ /// longer than a configured time;
+ /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
+ /// (possibly after a timeout);
+ /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
+ /// as soon as possible, as an attempt to recover the current session after a connection issue;
+ /// - `"DISCONNECTED"`: no connection is currently active.
+ ///
+ /// See also `ClientListener.onStatusChange()`
+ pub fn get_status(&self) -> &str {
+ // Implementation for get_status
+ unimplemented!()
+ }
+
+ /// Inquiry method that returns a list containing all the `Subscription` instances that are
+ /// currently "active" on this `LightstreamerClient`.
+ ///
+ /// Internal second-level `Subscription` are not included.
+ ///
+ /// # Returns
+ ///
+ /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
+ /// The list can be empty.
+ ///
+ /// See also `subscribe()`
+ pub fn get_subscriptions(&self) -> &Vec<Subscription> {
+ &self.subscriptions
+ }
+
+ /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
+ /// events anymore.
+ ///
+ /// A listener can be removed at any time.
+ ///
+ /// # Parameters
+ ///
+ /// * `listener`: The listener to be removed.
+ ///
+ /// See also `addListener()`
+ pub fn remove_listener(&mut self, listener: Box<dyn ClientListener>) {
+ unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
+ //self.listeners.remove(&listener);
+ }
+
+ /// Operation method that sends a message to the Server. The message is interpreted and handled
+ /// by the Metadata Adapter associated to the current Session. This operation supports in-order
+ /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
+ /// to arrive exactly once and respecting the original order, whatever is the underlying transport
+ /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
+ /// necessary, to reduce network round trips.
+ ///
+ /// Upon subsequent calls to the method, the sequential management of the involved messages is
+ /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
+ /// issued.
+ ///
+ /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
+ /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
+ /// reason, each message can specify a "delayTimeout", which is the longest time the message,
+ /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
+ /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
+ /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
+ /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
+ /// is set for a message and the previous message was sent immediately before, it is possible
+ /// that the latter gets discarded even if no communication issues occur. The Server may also
+ /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
+ /// long time.
+ ///
+ /// Sequence identifiers can also be associated with the messages. In this case, the sequential
+ /// management is restricted to all subsets of messages with the same sequence identifier associated.
+ ///
+ /// Notifications of the operation outcome can be received by supplying a suitable listener. The
+ /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
+ /// are guaranteed to be invoked sequentially.
+ ///
+ /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
+ /// processing is guaranteed, while strict ordering and even sequentialization of the processing
+ /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
+ /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
+ /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
+ /// the listener eventually gets a notification.
+ ///
+ /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
+ /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
+ /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
+ ///
+ /// Since a message is handled by the Metadata Adapter associated to the current connection, a
+ /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
+ /// flag is specified it is possible to call the method at any time and the client will take
+ /// care of sending the message as soon as a connection is available, otherwise, if the current
+ /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
+ /// event will be fired.
+ ///
+ /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
+ /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
+ /// flag set to `true`.
+ ///
+ /// Also note that forwarding of the message to the server is made in a separate thread, hence,
+ /// if a message is sent while the connection is active, it could be aborted because of a subsequent
+ /// disconnection. In the same way a message sent while the connection is not active might be
+ /// sent because of a subsequent connection.
+ ///
+ /// # Parameters
+ ///
+ /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
+ /// associated to the current connection.
+ /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
+ /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
+ /// is supplied, the message will be processed in the special way described above. The parameter
+ /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
+ /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
+ /// timeout on missing messages, the latter will be used instead. The parameter is optional; if
+ /// a negative value is supplied, the Server configured timeout on missing messages will be applied.
+ /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
+ /// timeout on missing messages still applies.
+ /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
+ /// parameter is optional; if not supplied, no notification will be available.
+ /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
+ /// status when the provided message is handled, then the message is not aborted right away but
+ /// is queued waiting for a new session. Note that the message can still be aborted later when
+ /// a new session is established.
+ pub fn send_message(
+ &mut self,
+ message: &str,
+ sequence: Option<&str>,
+ delay_timeout: Option<u64>,
+ listener: Option<Box<dyn ClientMessageListener>>,
+ enqueue_while_disconnected: bool,
+ ) {
+ // Implementation for send_message
+ }
+
+ /// Static method that permits to configure the logging system used by the library. The logging
+ /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
+ /// third-party logging system.
+ ///
+ /// If no logging system is specified, all the generated log is discarded.
+ ///
+ /// The following categories are available to be consumed:
+ ///
+ /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
+ /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
+ /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
+ /// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
+ /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
+ /// are logged; at DEBUG level, lifecycle event details are logged.
+ /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
+ /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
+ /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
+ /// - `lightstreamer.actions`: logs settings / API calls.
+ ///
+ /// # Parameters
+ ///
+ /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
+ /// library classes.
+ pub fn set_logger_provider() {
+ unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
+ }
+ /*
+ pub fn set_logger_provider(provider: LoggerProvider) {
+ // Implementation for set_logger_provider
+ }
+ */
+
+ /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
+ /// accept untrusted ones.
+ ///
+ /// May be called only once before creating any `LightstreamerClient` instance.
+ ///
+ /// # Parameters
+ ///
+ /// * `factory`: an instance of `ssl.SSLContext`
+ ///
+ /// # Raises
+ ///
+ /// * `IllegalArgumentException`: if the factory is `None`
+ /// * `IllegalStateException`: if a factory is already installed
+ pub fn set_trust_manager_factory() {
+ unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
+ }
+ /*
+ pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
+ if factory.is_none() {
+ return Err(IllegalArgumentException::new(
+ "Factory cannot be None",
+ ));
+ }
+
+ // Implementation for set_trust_manager_factory
+ Ok(())
+ }
+ */
+
+ /// Operation method that adds a `Subscription` to the list of "active" Subscriptions. The `Subscription`
+ /// cannot already be in the "active" state.
+ ///
+ /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
+ /// as there is a session available). Active `Subscription` are automatically persisted across different
+ /// sessions as long as a related unsubscribe call is not issued.
+ ///
+ /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
+ /// immediately enters the "active" state.
+ ///
+ /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
+ /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
+ ///
+ /// Also note that forwarding of the subscription to the server is made in a separate thread.
+ ///
+ /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
+ /// event.
+ ///
+ /// # Parameters
+ ///
+ /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
+ /// values.
+ ///
+ /// See also `unsubscribe()`
+ pub fn subscribe(&mut self, subscription: Subscription) {
+ self.subscriptions.push(subscription);
+ // Implementation for subscribe
+ }
+
+ /// Operation method that removes a `Subscription` that is currently in the "active" state.
+ ///
+ /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
+ /// items is requested to Lightstreamer Server.
+ ///
+ /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
+ /// exits the "active" state.
+ ///
+ /// Note that forwarding of the unsubscription to the server is made in a separate thread.
+ ///
+ /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
+ ///
+ /// # Parameters
+ ///
+ /// * `subscription`: An "active" `Subscription` object that was activated by this `LightstreamerClient`
+ /// instance.
+ pub fn unsubscribe(&mut self, subscription: Subscription) {
+ unimplemented!("Implement mechanism to unsubscribe from LightstreamerClient.");
+ }
+ /*
+ pub fn unsubscribe(&mut self, subscription: &Subscription) {
+ if let Some(index) = self.subscriptions.iter().position(|s| s == subscription) {
+ self.subscriptions.remove(index);
+ // Implementation for unsubscribe
+ }
+ }
+ */
+} \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index 2e3c3c5..d782d2b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,7 +1,11 @@
use futures::stream::StreamExt;
+use futures::SinkExt;
use reqwest::Client;
+use serde_urlencoded;
use std::error::Error;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
+use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+use tokio::sync::Mutex;
async fn establish_persistent_http_connection(
session_id_shared: Arc<Mutex<String>>,
@@ -28,7 +32,7 @@ async fn establish_persistent_http_connection(
if let Some(end) = response_text.find(",50000,5000,*\r\n") {
let session_id = &response_text[start + 6..end];
println!("Session ID: {}", session_id);
- let mut session_id_lock = session_id_shared.lock().unwrap();
+ let mut session_id_lock = session_id_shared.lock().await;
*session_id_lock = session_id.to_string();
}
} else {
@@ -45,6 +49,54 @@ async fn establish_persistent_http_connection(
Ok(())
}
+/*
+// Establish a persistent WebSocket connection and handle the session creation
+async fn establish_persistent_ws_connection(
+ session_id_shared: Arc<Mutex<String>>,
+) -> Result<(), Box<dyn Error>> {
+ let ws_url = "wss://push.lightstreamer.com/lightstreamer";
+
+ let (ws_stream, _) = tokio_tungstenite::connect_async_with_config(
+ tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned()))
+ ).await.expect("Failed to connect");
+
+ let (mut write, mut read) = ws_stream.split();
+
+ // Session creation parameters
+ let params = [
+ ("LS_op2", "create_session"),
+ ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
+ ("LS_adapter_set", "DEMO"),
+ ];
+
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+
+ // Send the create session message
+ write
+ .send(Message::Text(format!("{}\n", encoded_params)))
+ .await?;
+
+ // Listen for messages from the server
+ while let Some(message) = read.next().await {
+ match message? {
+ Message::Text(text) => {
+ if text.starts_with("CONOK") {
+ let session_info: Vec<&str> = text.split(",").collect();
+ let session_id = session_info.get(1).unwrap_or(&"").to_string();
+ *session_id_shared.lock().await = session_id.clone();
+ println!("Session established with ID: {}", session_id);
+ subscribe_to_channel_ws(session_id, write).await?;
+ break; // Exit after successful subscription
+ }
+ }
+ _ => {}
+ }
+ }
+
+ Ok(())
+}
+*/
+
async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> {
let client = Client::new();
let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt";
@@ -61,6 +113,33 @@ async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error>
Ok(())
}
+// Function to subscribe to a channel using WebSocket
+async fn subscribe_to_channel_ws(
+ session_id: String,
+ mut write: futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::protocol::Message>,
+) -> Result<(), Box<dyn Error>> {
+ // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set
+ let sub_params = [
+ ("LS_table", "1"),
+ ("LS_op2", "add"),
+ ("LS_session", &session_id),
+ ("LS_id", "item1"),
+ ("LS_schema", "stock_name last_price"),
+ ("LS_mode", "MERGE"),
+ ];
+
+ let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?;
+
+ // Send the subscription message
+ write
+ .send(Message::Text(encoded_sub_params))
+ .await?;
+
+ println!("Subscribed to channel with session ID: {}", session_id);
+
+ Ok(())
+}
+
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
@@ -78,7 +157,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let session_id;
{
- session_id = session_id_shared.lock().unwrap().clone();
+ session_id = session_id_shared.lock().await.clone();
}
if !session_established && !session_id.is_empty() {
@@ -93,4 +172,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
task2.await?;
Ok(())
-}
+} \ No newline at end of file
diff --git a/src/proxy.rs b/src/proxy.rs
new file mode 100644
index 0000000..48b2758
--- /dev/null
+++ b/src/proxy.rs
@@ -0,0 +1,3 @@
+pub struct Proxy {
+
+} \ No newline at end of file
diff --git a/src/subscription.rs b/src/subscription.rs
new file mode 100644
index 0000000..035614d
--- /dev/null
+++ b/src/subscription.rs
@@ -0,0 +1,840 @@
+use crate::subscription_listener::SubscriptionListener;
+use std::collections::HashMap;
+
+/// Enum representing the subscription mode
+#[derive(Debug, PartialEq, Eq)]
+pub enum SubscriptionMode {
+ Merge,
+ Distinct,
+ Raw,
+ Command,
+}
+
+impl SubscriptionMode {
+ fn from_str(s: &str) -> Result<SubscriptionMode, String> {
+ match s.to_lowercase().as_str() {
+ "merge" => Ok(SubscriptionMode::Merge),
+ "distinct" => Ok(SubscriptionMode::Distinct),
+ "raw" => Ok(SubscriptionMode::Raw),
+ "command" => Ok(SubscriptionMode::Command),
+ _ => Err(format!("Invalid subscription mode: {}", s)),
+ }
+ }
+}
+
+/// Struct representing a Subscription to be submitted to a Lightstreamer Server.
+/// It contains subscription details and the listeners needed to process the real-time data.
+pub struct Subscription {
+ /// The subscription mode for the items, required by Lightstreamer Server.
+ mode: SubscriptionMode,
+ /// An array of items to be subscribed to through Lightstreamer server.
+ items: Option<Vec<String>>,
+ /// An "Item Group" identifier representing a list of items.
+ item_group: Option<String>,
+ /// An array of fields for the items to be subscribed to through Lightstreamer Server.
+ fields: Option<Vec<String>>,
+ /// A "Field Schema" identifier representing a list of fields.
+ field_schema: Option<String>,
+ /// The name of the Data Adapter that supplies all the items for this Subscription.
+ data_adapter: Option<String>,
+ /// The name of the second-level Data Adapter for a COMMAND Subscription.
+ command_second_level_data_adapter: Option<String>,
+ /// The "Field List" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
+ command_second_level_fields: Option<Vec<String>>,
+ /// The "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
+ command_second_level_field_schema: Option<String>,
+ /// The length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription.
+ requested_buffer_size: Option<usize>,
+ /// The maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription.
+ requested_max_frequency: Option<f64>,
+ /// The snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
+ requested_snapshot: Option<String>,
+ /// The selector name for all the items in the Subscription, used as a filter on the updates received.
+ selector: Option<String>,
+ /// A list of SubscriptionListener instances that will receive events from this Subscription.
+ listeners: Vec<Box<dyn SubscriptionListener>>,
+ /// A HashMap storing the latest values received for each item/field pair.
+ values: HashMap<(usize, usize), String>,
+ /// A HashMap storing the latest values received for each key/field pair in a COMMAND Subscription.
+ command_values: HashMap<String, HashMap<usize, String>>,
+ /// A flag indicating whether the Subscription is currently active or not.
+ is_active: bool,
+ /// A flag indicating whether the Subscription is currently subscribed to through the server or not.
+ is_subscribed: bool,
+}
+
+impl Subscription {
+ /// Constructor for creating a new Subscription instance.
+ ///
+ /// # Parameters
+ /// - `mode`: The subscription mode for the items, required by Lightstreamer Server.
+ /// - `items`: An array of items to be subscribed to through Lightstreamer server. It is also possible to specify the "Item List" or "Item Group" later.
+ /// - `fields`: An array of fields for the items to be subscribed to through Lightstreamer Server. It is also possible to specify the "Field List" or "Field Schema" later.
+ ///
+ /// # Errors
+ /// Returns an error if no items or fields are provided.
+ pub fn new(
+ mode: SubscriptionMode,
+ items: Option<Vec<String>>,
+ fields: Option<Vec<String>>,
+ ) -> Result<Subscription, String> {
+ if items.is_none() || fields.is_none() {
+ return Err("Items and fields must be provided".to_string());
+ }
+
+ Ok(Subscription {
+ mode,
+ items,
+ item_group: None,
+ fields,
+ field_schema: None,
+ data_adapter: None,
+ command_second_level_data_adapter: None,
+ command_second_level_fields: None,
+ command_second_level_field_schema: None,
+ requested_buffer_size: None,
+ requested_max_frequency: None,
+ requested_snapshot: None,
+ selector: None,
+ listeners: Vec::new(),
+ values: HashMap::new(),
+ command_values: HashMap::new(),
+ is_active: false,
+ is_subscribed: false,
+ })
+ }
+
+ /// Adds a listener that will receive events from the Subscription instance.
+ ///
+ /// The same listener can be added to several different Subscription instances.
+ ///
+ /// # Lifecycle
+ /// A listener can be added at any time. A call to add a listener already present will be ignored.
+ ///
+ /// # Parameters
+ /// - `listener`: An object that will receive the events as documented in the SubscriptionListener interface.
+ ///
+ /// # See also
+ /// `removeListener()`
+ pub fn add_listener(&mut self, listener: Box<dyn SubscriptionListener>) {
+ self.listeners.push(listener);
+ }
+
+ /// Removes a listener from the Subscription instance so that it will not receive events anymore.
+ ///
+ /// # Lifecycle
+ /// A listener can be removed at any time.
+ ///
+ /// # Parameters
+ /// - `listener`: The listener to be removed.
+ ///
+ /// # See also
+ /// `addListener()`
+ pub fn remove_listener<T>(&mut self, listener: &T)
+ where
+ T: SubscriptionListener,
+ {
+ self.listeners.retain(|l| {
+ let l_ref = l.as_ref() as &dyn SubscriptionListener;
+ let listener_ref = listener as &dyn SubscriptionListener;
+ !(l_ref as *const dyn SubscriptionListener == listener_ref as *const dyn SubscriptionListener)
+ });
+ }
+
+ /// Returns a list containing the SubscriptionListener instances that were added to this client.
+ ///
+ /// # Returns
+ /// A list containing the listeners that were added to this client.
+ ///
+ /// # See also
+ /// `addListener()`
+ pub fn get_listeners(&self) -> &Vec<Box<dyn SubscriptionListener>> {
+ &self.listeners
+ }
+
+ /// Inquiry method that can be used to read the mode specified for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// The Subscription mode specified in the constructor.
+ pub fn get_mode(&self) -> &SubscriptionMode {
+ &self.mode
+ }
+
+ /// Setter method that sets the "Item Group" to be subscribed to through Lightstreamer Server.
+ ///
+ /// Any call to this method will override any "Item List" or "Item Group" previously specified.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription is currently "active".
+ ///
+ /// # Parameters
+ /// - `group`: A String to be expanded into an item list by the Metadata Adapter.
+ pub fn set_item_group(&mut self, group: String) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ self.item_group = Some(group);
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the item group specified for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the Subscription has been initialized using an "Item Group"
+ ///
+ /// # Returns
+ /// The "Item Group" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item List" or was not initialized at all.
+ pub fn get_item_group(&self) -> Option<&String> {
+ self.item_group.as_ref()
+ }
+
+ /// Setter method that sets the "Item List" to be subscribed to through Lightstreamer Server.
+ ///
+ /// Any call to this method will override any "Item List" or "Item Group" previously specified.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if any of the item names in the "Item List" contains a space, is a number, or is empty/None.
+ ///
+ /// # Parameters
+ /// - `items`: An array of items to be subscribed to through the server.
+ pub fn set_items(&mut self, items: Vec<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ for item in &items {
+ if item.contains(" ") || item.parse::<usize>().is_ok() || item.is_empty() {
+ return Err("Invalid item name".to_string());
+ }
+ }
+ self.items = Some(items);
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the "Item List" specified for this Subscription.
+ /// Note that if the single-item-constructor was used, this method will return an array of length 1 containing such item.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the Subscription has been initialized with an "Item List".
+ ///
+ /// # Returns
+ /// The "Item List" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item Group" or was not initialized at all.
+ pub fn get_items(&self) -> Option<&Vec<String>> {
+ self.items.as_ref()
+ }
+
+ /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server.
+ ///
+ /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription is currently "active".
+ ///
+ /// # Parameters
+ /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
+ pub fn set_field_schema(&mut self, schema: String) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ self.field_schema = Some(schema);
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the field schema specified for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the Subscription has been initialized using a "Field Schema"
+ ///
+ /// # Returns
+ /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
+ pub fn get_field_schema(&self) -> Option<&String> {
+ self.field_schema.as_ref()
+ }
+
+ /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server.
+ ///
+ /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if any of the field names in the list contains a space or is empty/None.
+ ///
+ /// # Parameters
+ /// - `fields`: An array of fields to be subscribed to through the server.
+ pub fn set_fields(&mut self, fields: Vec<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ for field in &fields {
+ if field.contains(" ") || field.is_empty() {
+ return Err("Invalid field name".to_string());
+ }
+ }
+ self.fields = Some(fields);
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the "Field List" specified for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the Subscription has been initialized using a "Field List".
+ ///
+ /// # Returns
+ /// The "Field List" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
+ pub fn get_fields(&self) -> Option<&Vec<String>> {
+ self.fields.as_ref()
+ }
+
+ /// Setter method that sets the name of the Data Adapter (within the Adapter Set used by the current session) that supplies all the items for this Subscription.
+ ///
+ /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
+ ///
+ /// Note that if more than one Data Adapter is needed to supply all the items in a set of items, then it is not possible to group all the items of the set in a single Subscription. Multiple Subscriptions have to be defined.
+ ///
+ /// # Default
+ /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription is currently "active".
+ ///
+ /// # Parameters
+ /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
+ ///
+ /// # See also
+ /// `ConnectionDetails.setAdapterSet()`
+ pub fn set_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ self.data_adapter = adapter;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the name of the Data Adapter specified for this Subscription through `setDataAdapter()`.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// The name of the Data Adapter; returns `None` if no name has been configured, so that the "DEFAULT" Adapter Set is used.
+ pub fn get_data_adapter(&self) -> Option<&String> {
+ self.data_adapter.as_ref()
+ }
+
+ /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session)
+ /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session) that supplies all the second-level items for a COMMAND Subscription.
+ ///
+ /// All the possible second-level items should be supplied in "MERGE" mode with snapshot available.
+ ///
+ /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
+ ///
+ /// # Default
+ /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if the Subscription mode is not "COMMAND".
+ ///
+ /// # Parameters
+ /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFields()`
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFieldSchema()`
+ pub fn set_command_second_level_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ if self.mode != SubscriptionMode::Command {
+ return Err("Subscription mode is not Command".to_string());
+ }
+ self.command_second_level_data_adapter = adapter;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the name of the second-level Data Adapter specified for this Subscription through `setCommandSecondLevelDataAdapter()`.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription mode is not COMMAND.
+ ///
+ /// # Returns
+ /// The name of the second-level Data Adapter.
+ ///
+ /// # See also
+ /// `setCommandSecondLevelDataAdapter()`
+ pub fn get_command_second_level_data_adapter(&self) -> Option<&String> {
+ if self.mode != SubscriptionMode::Command {
+ return None;
+ }
+ self.command_second_level_data_adapter.as_ref()
+ }
+
+ /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
+ ///
+ /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
+ ///
+ /// Calling this method enables the two-level behavior:
+ ///
+ /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
+ ///
+ /// Specify `None` as parameter will disable the two-level behavior.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if the Subscription mode is not "COMMAND".
+ ///
+ /// # Parameters
+ /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFields()`
+ pub fn set_command_second_level_field_schema(&mut self, schema: Option<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ if self.mode != SubscriptionMode::Command {
+ return Err("Subscription mode is not Command".to_string());
+ }
+ self.command_second_level_field_schema = schema;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the "Field Schema" specified for second-level Subscriptions.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the second-level of this Subscription has been initialized using a "Field Schema".
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription mode is not COMMAND.
+ ///
+ /// # Returns
+ /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFieldSchema()`
+ pub fn get_command_second_level_field_schema(&self) -> Option<&String> {
+ if self.mode != SubscriptionMode::Command {
+ return None;
+ }
+ self.command_second_level_field_schema.as_ref()
+ }
+
+ /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
+ ///
+ /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
+ ///
+ /// Calling this method enables the two-level behavior:
+ ///
+ /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
+ ///
+ /// Specifying `None` as parameter will disable the two-level behavior.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if the Subscription mode is not "COMMAND".
+ /// - Returns an error if any of the field names in the "Field List" contains a space or is empty/None.
+ ///
+ /// # Parameters
+ /// - `fields`: An array of Strings containing a list of fields to be subscribed to through the server. Ensure that no name conflict is generated between first-level and second-level fields. In case of conflict, the second-level field will not be accessible by name, but only by position.
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFieldSchema()`
+ pub fn set_command_second_level_fields(&mut self, fields: Option<Vec<String>>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ if self.mode != SubscriptionMode::Command {
+ return Err("Subscription mode is not Command".to_string());
+ }
+ if let Some(ref fields) = fields {
+ for field in fields {
+ if field.contains(" ") || field.is_empty() {
+ return Err("Invalid field name".to_string());
+ }
+ }
+ }
+ self.command_second_level_fields = fields;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the "Field List" specified for second-level Subscriptions.
+ ///
+ /// # Lifecycle
+ /// This method can only be called if the second-level of this Subscription has been initialized using a "Field List"
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription mode is not COMMAND.
+ ///
+ /// # Returns
+ /// The list of fields to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
+ ///
+ /// # See also
+ /// `Subscription.setCommandSecondLevelFields()`
+ pub fn get_command_second_level_fields(&self) -> Option<&Vec<String>> {
+ if self.mode != SubscriptionMode::Command {
+ return None;
+ }
+ self.command_second_level_fields.as_ref()
+ }
+
+ /// Setter method that sets the length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription. A Queuing buffer is used by the Server to accumulate a burst of updates for an item, so that they can all be sent to the client, despite of bandwidth or frequency limits. It can be used only when the subscription mode is MERGE or DISTINCT and unfiltered dispatching has not been requested. Note that the Server may pose an upper limit on the size of its internal buffers.
+ ///
+ /// # Default
+ /// `None`, meaning to lean on the Server default based on the subscription mode. This means that the buffer size will be 1 for MERGE subscriptions and "unlimited" for DISTINCT subscriptions. See the "General Concepts" document for further details.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if the specified value is not `None` nor "unlimited" nor a valid positive integer number.
+ ///
+ /// # Parameters
+ /// - `size`: An integer number, representing the length of the internal queuing buffers to be used in the Server. If the string "unlimited" is supplied, then no buffer size limit is requested (the check is case insensitive). It is also possible to supply a `None` value to stick to the Server default (which currently depends on the subscription mode).
+ ///
+ /// # See also
+ /// `Subscription.setRequestedMaxFrequency()`
+ pub fn set_requested_buffer_size(&mut self, size: Option<usize>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ self.requested_buffer_size = size;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the buffer size, configured though `setRequestedBufferSize()`, to be requested to the Server for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// An integer number, representing the buffer size to be requested to the server, or the string "unlimited", or `None`.
+ pub fn get_requested_buffer_size(&self) -> Option<&usize> {
+ self.requested_buffer_size.as_ref()
+ }
+
+ /// Setter method that sets the maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription. It can be used only if the Subscription mode is MERGE, DISTINCT or COMMAND (in the latter case, the frequency limitation applies to the UPDATE events for each single key). For Subscriptions with two-level behavior (see `Subscription.setCommandSecondLevelFields()` and `Subscription.setCommandSecondLevelFieldSchema()`), the specified frequency limit applies to both first-level and second-level items.
+ ///
+ /// Note that frequency limits on the items can also be set on the server side and this request can only be issued in order to furtherly reduce the frequency, not to rise it beyond these limits.
+ ///
+ /// This method can also be used to request unfiltered dispatching for the items in the Subscription. However, unfiltered dispatching requests may be refused if any frequency limit is posed on the server side for some item.
+ ///
+ /// # General Edition Note
+ /// A further global frequency limit could also be imposed by the Server, depending on Edition and License Type; this specific limit also applies to RAW mode and to unfiltered dispatching. To know what features are enabled by your license, please see the License tab of the Monitoring Dashboard (by default, available at /dashboard).
+ ///
+ /// # Default
+ /// `None`, meaning to lean on the Server default based on the subscription mode. This consists, for all modes, in not applying any frequency limit to the subscription (the same as "unlimited"); see the "General Concepts" document for further details.
+ ///
+ /// # Lifecycle
+ /// This method can can be called at any time with some differences based on the Subscription status:
+ ///
+ /// - If the Subscription instance is in its "inactive" state then this method can be called at will.
+ ///
+ /// - If the Subscription instance is in its "active" state then the method can still be called unless the current value is "unfiltered" or the supplied value is "unfiltered" or `None`. If the Subscription instance is in its "active" state and the connection to the server is currently open, then a request to change the frequency of the Subscription on the fly is sent to the server.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active" and the current value of this property is "unfiltered".
+ /// - Returns an error if the Subscription is currently "active" and the given parameter is `None` or "unfiltered".
+ /// - Returns an error if the specified value is not `None` nor one of the special "unlimited" and "unfiltered" values nor a valid positive number.
+ ///
+ /// # Parameters
+ /// - `freq`: A decimal number, representing the maximum update frequency (expressed in updates per second) for each item in the Subscription; for instance, with a setting of 0.5, for each single item, no more than one update every 2 seconds will be received. If the string "unlimited" is supplied, then no frequency limit is requested. It is also possible to supply the string "unfiltered", to ask for unfiltered dispatching, if it is allowed for the items, or a `None` value to stick to the Server default (which currently corresponds to "unlimited"). The check for the string constants is case insensitive.
+ pub fn set_requested_max_frequency(&mut self, freq: Option<f64>) -> Result<(), String> {
+ if self.is_active && self.requested_max_frequency.is_none() {
+ return Err("Subscription is active and current value is unfiltered".to_string());
+ }
+ if self.is_active && freq.is_none() {
+ return Err("Cannot set unfiltered while active".to_string());
+ }
+ if self.is_active && freq.is_none() {
+ return Err("Cannot set None while active".to_string());
+ }
+ self.requested_max_frequency = freq;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the max frequency, configured through `setRequestedMaxFrequency()`, to be requested to the Server for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// A decimal number, representing the max frequency to be requested to the server (expressed in updates per second), or the strings "unlimited" or "unfiltered", or `None`.
+ pub fn get_requested_max_frequency(&self) -> Option<&f64> {
+ self.requested_max_frequency.as_ref()
+ }
+
+ /// Setter method that enables/disables snapshot delivery request for the items in the Subscription. The snapshot can be requested only if the Subscription mode is MERGE, DISTINCT or COMMAND.
+ ///
+ /// # Default
+ /// "yes" if the Subscription mode is not "RAW", `None` otherwise.
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription is currently "active".
+ /// - Returns an error if the specified value is not "yes" nor "no" nor `None` nor a valid integer positive number.
+ /// - Returns an error if the specified value is not compatible with the mode of the Subscription:
+ /// - In case of a RAW Subscription only `None` is a valid value;
+ /// - In case of a non-DISTINCT Subscription only `None` "yes" and "no" are valid values.
+ ///
+ /// # Parameters
+ /// - `snapshot`: "yes"/"no" to request/not request snapshot delivery (the check is case insensitive). If the Subscription mode is DISTINCT, instead of "yes", it is also possible to supply an integer number, to specify the requested length of the snapshot (though the length of the received snapshot may be less than
+ /// requested, because of insufficient data or server side limits); passing "yes" means that the snapshot length should be determined only by the Server. `None` is also a valid value; if specified, no snapshot preference will be sent to the server that will decide itself whether or not to send any snapshot.
+ ///
+ /// # See also
+ /// `ItemUpdate.isSnapshot()`
+ pub fn set_requested_snapshot(&mut self, snapshot: Option<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ if self.mode == SubscriptionMode::Raw && snapshot.is_some() {
+ return Err("Cannot request snapshot for Raw mode".to_string());
+ }
+ if self.mode != SubscriptionMode::Distinct && snapshot.is_some() && snapshot.as_ref().unwrap().parse::<usize>().is_ok() {
+ return Err("Cannot specify snapshot length for non-Distinct mode".to_string());
+ }
+ self.requested_snapshot = snapshot;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the snapshot preferences, configured through `setRequestedSnapshot()`, to be requested to the Server for this Subscription.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// "yes", "no", `None`, or an integer number.
+ pub fn get_requested_snapshot(&self) -> Option<&String> {
+ self.requested_snapshot.as_ref()
+ }
+
+ /// Setter method that sets the selector name for all the items in the Subscription. The selector is a filter on the updates received. It is executed on the Server and implemented by the Metadata Adapter.
+ ///
+ /// # Default
+ /// `None` (no selector).
+ ///
+ /// # Lifecycle
+ /// This method can only be called while the Subscription instance is in its "inactive" state.
+ ///
+ /// # Errors
+ /// Returns an error if the Subscription is currently "active".
+ ///
+ /// # Parameters
+ /// - `selector`: The name of a selector, to be recognized by the Metadata Adapter, or `None` to unset the selector.
+ pub fn set_selector(&mut self, selector: Option<String>) -> Result<(), String> {
+ if self.is_active {
+ return Err("Subscription is active".to_string());
+ }
+ self.selector = selector;
+ Ok(())
+ }
+
+ /// Inquiry method that can be used to read the selector name specified for this Subscription through `setSelector()`.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// The name of the selector.
+ pub fn get_selector(&self) -> Option<&String> {
+ self.selector.as_ref()
+ }
+
+ /// Returns the latest value received for the specified item/field pair.
+ ///
+ /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method. In case of COMMAND Subscriptions, the value returned by this method may be misleading, as in COMMAND mode all the keys received, being part of the same item, will overwrite each other; for COMMAND Subscriptions, use `Subscription.getCommandValue()` instead.
+ ///
+ /// Note that internal data is cleared when the Subscription is unsubscribed from.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
+ ///
+ /// # Errors
+ /// Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
+ ///
+ /// # Parameters
+ /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
+ /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
+ ///
+ /// # Returns
+ /// The current value for the specified field of the specified item(possibly `None`), or `None` if no value has been received yet.
+ pub fn get_value(&self, item_pos: usize, field_pos: usize) -> Option<&String> {
+ self.values.get(&(item_pos, field_pos))
+ }
+
+ /// Returns the latest value received for the specified item/key/field combination in a COMMAND Subscription. This method can only be used if the Subscription mode is COMMAND. Subscriptions with two-level behavior are also supported, hence the specified field can be either a first-level or a second-level one.
+ ///
+ /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method.
+ ///
+ /// Note that internal data is cleared when the Subscription is unsubscribed from.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
+ ///
+ /// # Errors
+ /// - Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
+ /// - Returns an error if the Subscription mode is not COMMAND.
+ ///
+ /// # Parameters
+ /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
+ /// - `key`: A String containing the value of a key received on the COMMAND subscription.
+ /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
+ ///
+ /// # Returns
+ /// The current value for the specified field of the specified key within the specified item (possibly `None`), or `None` if the specified key has not been added yet (note that it might have been added and eventually deleted).
+ pub fn get_command_value(&self, item_pos: usize, key: &str, field_pos: usize) -> Option<&String> {
+ self.command_values
+ .get(key)
+ .and_then(|fields| fields.get(&field_pos))
+ }
+
+ /// Inquiry method that checks if the Subscription is currently "active" or not. Most of the Subscription properties cannot be modified if a Subscription is "active".
+ ///
+ /// The status of a Subscription is changed to "active" through the `LightstreamerClient.subscribe()` method and back to "inactive" through the `LightstreamerClient.unsubscribe()` one.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// `true`/`false` if the Subscription is "active" or not.
+ ///
+ /// # See also
+ /// `LightstreamerClient.subscribe()`
+ ///
+ /// # See also
+ /// `LightstreamerClient.unsubscribe()`
+ pub fn is_active(&self) -> bool {
+ self.is_active
+ }
+
+ /// Inquiry method that checks if the Subscription is currently subscribed to through the server or not.
+ ///
+ /// This flag is switched to true by server sent Subscription events, and back to false in case of client disconnection, `LightstreamerClient.unsubscribe()` calls and server sent unsubscription events.
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time.
+ ///
+ /// # Returns
+ /// `true`/`false` if the Subscription is subscribed to through the server or not.
+ pub fn is_subscribed(&self) -> bool {
+ self.is_subscribed
+ }
+
+ /// Returns the position of the "key" field in a COMMAND Subscription.
+ ///
+ /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
+ /// - Returns an error if a "Field List" was specified.
+ ///
+ /// # Returns
+ /// The 1-based position of the "key" field within the "Field Schema".
+ pub fn get_key_position(&self) -> Option<usize> {
+ if self.mode != SubscriptionMode::Command || !self.is_subscribed {
+ return None;
+ }
+ if let Some(ref schema) = self.field_schema {
+ return schema.split(',').position(|field| field.trim() == "key");
+ }
+ None
+ }
+
+ /// Returns the position of the "command" field in a COMMAND Subscription.
+ ///
+ /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
+ ///
+ /// # Lifecycle
+ /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
+ ///
+ /// # Errors
+ /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
+ ///
+ /// # Returns
+ /// The 1-based position of the "command" field within the "Field Schema".
+ pub fn get_command_position(&self) -> Option<usize> {
+ if self.mode != SubscriptionMode::Command || !self.is_subscribed {
+ return None;
+ }
+ if let Some(ref schema) = self.field_schema {
+ return schema.split(',').position(|field| field.trim() == "command");
+ }
+ None
+ }
+
+ /*
+ /// Handles the subscription event.
+ pub fn on_subscription(&mut self) {
+ self.is_subscribed = true;
+ for listener in &mut self.listeners {
+ listener.on_subscription();
+ }
+ }
+
+ /// Handles the unsubscription event.
+ pub fn on_unsubscription(&mut self) {
+ self.is_subscribed = false;
+ self.values.clear();
+ self.command_values.clear();
+ for listener in &mut self.listeners {
+ listener.on_unsubscription();
+ }
+ }
+
+ /// Handles an update event for a regular Subscription.
+ pub fn on_update(&mut self, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
+ self.values.insert((item_pos, field_pos), value.clone());
+ for listener in &mut self.listeners {
+ listener.on_update(item_pos, field_pos, &value, is_snapshot);
+ }
+ }
+
+ /// Handles an update event for a COMMAND Subscription.
+ pub fn on_command_update(&mut self, key: String, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
+ self.command_values
+ .entry(key.clone())
+ .or_insert_with(HashMap::new)
+ .insert(field_pos, value.clone());
+ for listener in &mut self.listeners {
+ listener.on_command_update(&key, item_pos, field_pos, &value, is_snapshot);
+ }
+ }
+ */
+} \ No newline at end of file
diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs
new file mode 100644
index 0000000..5fb6f85
--- /dev/null
+++ b/src/subscription_listener.rs
@@ -0,0 +1,244 @@
+use crate::item_update::ItemUpdate;
+
+/// Interface to be implemented to listen to Subscription events comprehending notifications
+/// of subscription/unsubscription, updates, errors and others.
+///
+/// Events for these listeners are dispatched by a different thread than the one that generates them.
+/// This means that, upon reception of an event, it is possible that the internal state of the client
+/// has changed. On the other hand, all the notifications for a single LightstreamerClient,
+/// including notifications to ClientListener, SubscriptionListener and ClientMessageListener
+/// will be dispatched by the same thread.
+pub trait SubscriptionListener {
+ /// Event handler that is called by Lightstreamer each time a request to clear the snapshot
+ /// pertaining to an item in the Subscription has been received from the Server.
+ /// More precisely, this kind of request can occur in two cases:
+ ///
+ /// - For an item delivered in COMMAND mode, to notify that the state of the item becomes empty;
+ /// this is equivalent to receiving an update carrying a DELETE command once for each key
+ /// that is currently active.
+ ///
+ /// - For an item delivered in DISTINCT mode, to notify that all the previous updates received
+ /// for the item should be considered as obsolete; hence, if the listener were showing a list
+ /// of recent updates for the item, it should clear the list in order to keep a coherent view.
+ ///
+ /// Note that, if the involved Subscription has a two-level behavior enabled
+ /// (see `Subscription::set_command_second_level_fields()` and
+ /// `Subscription::set_command_second_level_field_schema()`), the notification refers to the
+ /// first-level item (which is in COMMAND mode). This kind of notification is not possible
+ /// for second-level items (which are in MERGE mode).
+ ///
+ /// # Parameters
+ ///
+ /// - `item_name`: name of the involved item. If the Subscription was initialized using an
+ /// "Item Group" then a `None` value is supplied.
+ /// - `item_pos`: 1-based position of the item within the "Item List" or "Item Group".
+ fn on_clear_snapshot(&mut self, item_name: Option<&str>, item_pos: usize);
+
+ /// Event handler that is called by Lightstreamer to notify that, due to internal resource
+ /// limitations, Lightstreamer Server dropped one or more updates for an item that was
+ /// subscribed to as a second-level subscription. Such notifications are sent only if the
+ /// Subscription was configured in unfiltered mode (second-level items are always in "MERGE"
+ /// mode and inherit the frequency configuration from the first-level Subscription).
+ ///
+ /// By implementing this method it is possible to perform recovery actions.
+ ///
+ /// # Parameters
+ ///
+ /// - `lost_updates`: The number of consecutive updates dropped for the item.
+ /// - `key`: The value of the key that identifies the second-level item.
+ ///
+ /// # See also
+ ///
+ /// - `Subscription::set_requested_max_frequency()`
+ /// - `Subscription::set_command_second_level_fields()`
+ /// - `Subscription::set_command_second_level_field_schema()`
+ fn on_command_second_level_item_lost_updates(&mut self, lost_updates: u32, key: &str);
+
+ /// Event handler that is called when the Server notifies an error on a second-level subscription.
+ ///
+ /// By implementing this method it is possible to perform recovery actions.
+ ///
+ /// # Parameters
+ ///
+ /// - `code`: The error code sent by the Server. It can be one of the following:
+ /// - 14 - the key value is not a valid name for the Item to be subscribed; only in this case,
+ /// the error is detected directly by the library before issuing the actual request to the Server
+ /// - 17 - bad Data Adapter name or default Data Adapter not defined for the current Adapter Set
+ /// - 21 - bad Group name
+ /// - 22 - bad Group name for this Schema
+ /// - 23 - bad Schema name
+ /// - 24 - mode not allowed for an Item
+ /// - 26 - unfiltered dispatching not allowed for an Item, because a frequency limit is associated to the item
+ /// - 27 - unfiltered dispatching not supported for an Item, because a frequency prefiltering is applied for the item
+ /// - 28 - unfiltered dispatching is not allowed by the current license terms (for special licenses only)
+ /// - 66 - an unexpected exception was thrown by the Metadata Adapter while authorizing the connection
+ /// - 68 - the Server could not fulfill the request because of an internal error.
+ /// - `<= 0` - the Metadata Adapter has refused the subscription or unsubscription request;
+ /// the code value is dependent on the specific Metadata Adapter implementation
+ /// - `message`: The description of the error sent by the Server; it can be `None`.
+ /// - `key`: The value of the key that identifies the second-level item.
+ ///
+ /// # See also
+ ///
+ /// - `ConnectionDetails::set_adapter_set()`
+ /// - `Subscription::set_command_second_level_fields()`
+ /// - `Subscription::set_command_second_level_field_schema()`
+ fn on_command_second_level_subscription_error(&mut self, code: i32, message: Option<&str>, key: &str);
+
+ /// Event handler that is called by Lightstreamer to notify that all snapshot events for an item
+ /// in the Subscription have been received, so that real time events are now going to be received.
+ /// The received snapshot could be empty. Such notifications are sent only if the items are delivered
+ /// in DISTINCT or COMMAND subscription mode and snapshot information was indeed requested for the items.
+ /// By implementing this method it is possible to perform actions which require that all the initial
+ /// values have been received.
+ ///
+ /// Note that, if the involved Subscription has a two-level behavior enabled
+ /// (see `Subscription::set_command_second_level_fields()` and
+ /// `Subscription::set_command_second_level_field_schema()`), the notification refers to the
+ /// first-level item (which is in COMMAND mode). Snapshot-related updates for the second-level
+ /// items (which are in MERGE mode) can be received both before and after this notification.
+ ///
+ /// # Parameters
+ ///
+ /// - `item_name`: name of the involved item. If the Subscription was initialized using an
+ /// "Item Group" then a `None` value is supplied.
+ /// - `item_pos`: 1-based position of the item within the "Item List" or "Item Group".
+ ///
+ /// # See also
+ ///
+ /// - `Subscription::set_requested_snapshot()`
+ /// - `ItemUpdate::is_snapshot()`
+ fn on_end_of_snapshot(&mut self, item_name: Option<&str>, item_pos: usize);
+
+ /// Event handler that is called by Lightstreamer to notify that, due to internal resource
+ /// limitations, Lightstreamer Server dropped one or more updates for an item in the Subscription.
+ /// Such notifications are sent only if the items are delivered in an unfiltered mode; this occurs if the subscription mode is:
+ ///
+ /// - RAW
+ /// - MERGE or DISTINCT, with unfiltered dispatching specified
+ /// - COMMAND, with unfiltered dispatching specified
+ /// - COMMAND, without unfiltered dispatching specified (in this case, notifications apply to ADD and DELETE events only)
+ ///
+ /// By implementing this method it is possible to perform recovery actions.
+ ///
+ /// # Parameters
+ ///
+ /// - `item_name`: name of the involved item. If the Subscription was initialized using an
+ /// "Item Group" then a `None` value is supplied.
+ /// - `item_pos`: 1-based position of the item within the "Item List" or "Item Group".
+ /// - `lost_updates`: The number of consecutive updates dropped for the item.
+ ///
+ /// # See also
+ ///
+ /// - `Subscription::set_requested_max_frequency()`
+ fn on_item_lost_updates(&mut self, item_name: Option<&str>, item_pos: usize, lost_updates: u32);
+
+ /// Event handler that is called by Lightstreamer each time an update pertaining to an item
+ /// in the Subscription has been received from the Server.
+ ///
+ /// # Parameters
+ ///
+ /// - `update`: a value object containing the updated values for all the fields, together with
+ /// meta-information about the update itself and some helper methods that can be used to
+ /// iterate through all or new values.
+ fn on_item_update(&mut self, update: ItemUpdate);
+
+ /// Event handler that receives a notification when the `SubscriptionListener` instance is
+ /// removed from a `Subscription` through `Subscription::remove_listener()`. This is the last
+ /// event to be fired on the listener.
+ fn on_listen_end(&mut self);
+
+ /// Event handler that receives a notification when the `SubscriptionListener` instance is
+ /// added to a `Subscription` through `Subscription::add_listener()`. This is the first event
+ /// to be fired on the listener.
+ fn on_listen_start(&mut self);
+
+ /// Event handler that is called by Lightstreamer to notify the client with the real maximum
+ /// update frequency of the Subscription. It is called immediately after the Subscription is
+ /// established and in response to a requested change (see `Subscription::set_requested_max_frequency()`).
+ /// Since the frequency limit is applied on an item basis and a Subscription can involve multiple
+ /// items, this is actually the maximum frequency among all items. For Subscriptions with two-level
+ /// behavior (see `Subscription::set_command_second_level_fields()` and
+ /// `Subscription::set_command_second_level_field_schema()`), the reported frequency limit applies
+ /// to both first-level and second-level items.
+ ///
+ /// The value may differ from the requested one because of restrictions operated on the server side,
+ /// but also because of number rounding.
+ ///
+ /// Note that a maximum update frequency (that is, a non-unlimited one) may be applied by the Server
+ /// even when the subscription mode is RAW or the Subscription was done with unfiltered dispatching.
+ ///
+ /// # Parameters
+ ///
+ /// - `frequency`: A decimal number, representing the maximum frequency applied by the Server
+ /// (expressed in updates per second), or the string "unlimited". A `None` value is possible in
+ /// rare cases, when the frequency can no longer be determined.
+ fn on_real_max_frequency(&mut self, frequency: Option<f64>);
+
+ /// Event handler that is called by Lightstreamer to notify that a Subscription has been successfully
+ /// subscribed to through the Server. This can happen multiple times in the life of a Subscription
+ /// instance, in case the Subscription is performed multiple times through `LightstreamerClient::unsubscribe()`
+ /// and `LightstreamerClient::subscribe()`. This can also happen multiple times in case of automatic
+ /// recovery after a connection restart.
+ ///
+ /// This notification is always issued before the other ones related to the same subscription.
+ /// It invalidates all data that has been received previously.
+ ///
+ /// Note that two consecutive calls to this method are not possible, as before a second
+ /// `on_subscription` event is fired an `on_unsubscription()` event is eventually fired.
+ ///
+ /// If the involved Subscription has a two-level behavior enabled
+ /// (see `Subscription::set_command_second_level_fields()` and
+ /// `Subscription::set_command_second_level_field_schema()`), second-level subscriptions are not notified.
+ fn on_subscription(&mut self);
+
+ /// Event handler that is called when the Server notifies an error on a Subscription.
+ /// By implementing this method it is possible to perform recovery actions.
+ ///
+ /// Note that, in order to perform a new subscription attempt, `LightstreamerClient::unsubscribe()`
+ /// and `LightstreamerClient::subscribe()` should be issued again, even if no change to the
+ /// Subscription attributes has been applied.
+ ///
+ /// # Parameters
+ ///
+ /// - `code`: The error code sent by the Server. It can be one of the following:
+ /// - 15 - "key" field not specified in the schema for a COMMAND mode subscription
+ /// - 16 - "command" field not specified in the schema for a COMMAND mode subscription
+ /// - 17 - bad Data Adapter name or default Data Adapter not defined for the current Adapter Set
+ /// - 21 - bad Group name
+ /// - 22 - bad Group name for this Schema
+ /// - 23 - bad Schema name
+ /// - 24 - mode not allowed for an Item
+ /// - 25 - bad Selector name
+ /// - 26 - unfiltered dispatching not allowed for an Item, because a frequency limit is associated to the item
+ /// - 27 - unfiltered dispatching not supported for an Item, because a frequency prefiltering is applied for the item
+ /// - 28 - unfiltered dispatching is not allowed by the current license terms (for special licenses only)
+ /// - 29 - RAW mode is not allowed by the current license terms (for special licenses only)
+ /// - 30 - subscriptions are not allowed by the current license terms (for special licenses only)
+ /// - 66 - an unexpected exception was thrown by the Metadata Adapter while authorizing the connection
+ /// - 68 - the Server could not fulfill the request because of an internal error.
+ /// - `<= 0` - the Metadata Adapter has refused the subscription or unsubscription request;
+ /// the code value is dependent on the specific Metadata Adapter implementation
+ /// - `message`: The description of the error sent by the Server; it can be `None`.
+ ///
+ /// # See also
+ ///
+ /// - `ConnectionDetails::set_adapter_set()`
+ fn on_subscription_error(&mut self, code: i32, message: Option<&str>);
+
+ /// Event handler that is called by Lightstreamer to notify that a Subscription has been successfully
+ /// unsubscribed from. This can happen multiple times in the life of a Subscription instance, in case
+ /// the Subscription is performed multiple times through `LightstreamerClient::unsubscribe()` and
+ /// `LightstreamerClient::subscribe()`. This can also happen multiple times in case of automatic
+ /// recovery after a connection restart.
+ ///
+ /// After this notification no more events can be received until a new `on_subscription` event.
+ ///
+ /// Note that two consecutive calls to this method are not possible, as before a second
+ /// `on_unsubscription` event is fired an `on_subscription()` event is eventually fired.
+ ///
+ /// If the involved Subscription has a two-level behavior enabled
+ /// (see `Subscription::set_command_second_level_fields()` and
+ /// `Subscription::set_command_second_level_field_schema()`), second-level unsubscriptions are not notified.
+ fn on_unsubscription(&mut self);
+} \ No newline at end of file