From 5c80b291fc776a7114572cebff898a7b3bf0671b Mon Sep 17 00:00:00 2001 From: Daniel López Azaña Date: Thu, 28 Mar 2024 20:21:52 +0100 Subject: ✨ (lib.rs): introduce `ls_client` module as a cleaner naming convention ✨ (ls_client.rs): add `LightstreamerClient` struct and associated methods for managing communication with Lightstreamer Server ✨ (main.rs): update imports to use new `ls_client` module and add data adapter and snapshot configuration to subscription ♻️ (lib.rs): refactor `lightstreamer_client` to `ls_client` for consistency with new module name 📝 (ls_client.rs): add comprehensive documentation for `LightstreamerClient` and its methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✨ (subscription.rs): add Snapshot enum to define snapshot delivery preferences ♻️ (subscription.rs): refactor requested_snapshot to use Snapshot enum for clarity 💡 (subscription.rs): update comments to reflect changes in snapshot handling --- src/lib.rs | 2 +- src/lightstreamer_client.rs | 521 -------------------------------------------- src/ls_client.rs | 521 ++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 14 +- src/subscription.rs | 34 ++- 5 files changed, 553 insertions(+), 539 deletions(-) delete mode 100644 src/lightstreamer_client.rs create mode 100644 src/ls_client.rs (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index 3ae6463..cf5cc4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,7 @@ pub mod item_update; pub mod subscription_listener; pub mod connection_details; pub mod connection_options; -pub mod lightstreamer_client; +pub mod ls_client; pub mod proxy; pub mod subscription; diff --git a/src/lightstreamer_client.rs b/src/lightstreamer_client.rs deleted file mode 100644 index 214a722..0000000 --- a/src/lightstreamer_client.rs +++ /dev/null @@ -1,521 +0,0 @@ -use crate::client_listener::ClientListener; -use crate::client_message_listener::ClientMessageListener; -use crate::connection_details::ConnectionDetails; -use crate::connection_options::ConnectionOptions; -use crate::subscription::Subscription; -use crate::IllegalStateException; - -use cookie::Cookie; -use std::fmt::{self, Debug, Formatter}; - -/// Facade class for the management of the communication to Lightstreamer Server. Used to provide -/// configuration settings, event handlers, operations for the control of the connection lifecycle, -/// Subscription handling and to send messages. -/// -/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a -/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions, -/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally, -/// a single instance of `LightstreamerClient` is needed. -/// -/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple -/// endpoints. -/// -/// You can listen to the events generated by a session by registering an event listener, such as -/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events, -/// such as session creation, connection status, subscription updates, and server messages. However, -/// you should be aware that the event notifications are dispatched by a single thread, the so-called -/// event thread. This means that if the operations of a listener are slow or blocking, they will -/// delay the processing of the other listeners and affect the performance of your application. -/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep -/// the listener methods as fast and simple as possible. Note that even if you create multiple -/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared -/// among them. -/// -/// # Parameters -/// -/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient` -/// will connect to. It is possible to specify it later by using `None` here. See -/// `ConnectionDetails.setServerAddress()` for details. -/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle -/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to -/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()` -/// for details. -/// -/// # Raises -/// -/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()` -/// for details. -pub struct LightstreamerClient { - server_address: Option, - adapter_set: Option, - /// Data object that contains the details needed to open a connection to a Lightstreamer Server. - /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties - /// of this object can be overwritten by values received from a Lightstreamer Server. - pub connection_details: ConnectionDetails, - /// Data object that contains options and policies for the connection to the server. This instance - /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object - /// can be overwritten by values received from a Lightstreamer Server. - pub connection_options: ConnectionOptions, - listeners: Vec>, - subscriptions: Vec, -} - -impl Debug for LightstreamerClient { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("LightstreamerClient") - .field("server_address", &self.server_address) - .field("adapter_set", &self.adapter_set) - .field("connection_details", &self.connection_details) - .field("connection_options", &self.connection_options) - .field("listeners", &self.listeners) - .field("subscriptions", &self.subscriptions) - .finish() - } -} - -impl LightstreamerClient { - /// A constant string representing the name of the library. - pub const LIB_NAME: &'static str = "rust_client"; - - /// A constant string representing the version of the library. - pub const LIB_VERSION: &'static str = "0.1.0"; - - /// Static method that can be used to share cookies between connections to the Server (performed by - /// this library) and connections to other sites that are performed by the application. With this - /// method, cookies received by the application can be added (or replaced if already present) to - /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain - /// is compatible with the Server domain will be used internally. - /// - /// This method should be invoked before calling the `LightstreamerClient.connect()` method. - /// However it can be invoked at any time; it will affect the internal cookie set immediately - /// and the sending of cookies on the next HTTP request or WebSocket establishment. - /// - /// # Parameters - /// - /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`. - /// * `cookies`: an instance of `http.cookies.SimpleCookie`. - /// - /// See also `getCookies()` - pub fn add_cookies(uri: &str, cookies: &Cookie) { - // Implementation for add_cookies - } - - /// Adds a listener that will receive events from the `LightstreamerClient` instance. - /// - /// The same listener can be added to several different `LightstreamerClient` instances. - /// - /// A listener can be added at any time. A call to add a listener already present will be ignored. - /// - /// # Parameters - /// - /// * `listener`: An object that will receive the events as documented in the `ClientListener` - /// interface. - /// - /// See also `removeListener()` - pub fn add_listener(&mut self, listener: Box) { - self.listeners.push(listener); - } - - /// Operation method that requests to open a Session against the configured Lightstreamer Server. - /// - /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`, - /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer - /// for some seconds from the streaming connection, then it will automatically open a polling - /// connection. - /// - /// A polling connection may also be opened if the environment is not suitable for a streaming - /// connection. - /// - /// Note that as "polling connection" we mean a loop of polling requests, each of which requires - /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server. - /// - /// Note that the request to connect is accomplished by the client in a separate thread; this - /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change - /// yet. - /// - /// When the request to connect is finally being executed, if the current status of the client - /// is not `DISCONNECTED`, then nothing will be done. - /// - /// # Raises - /// - /// * `IllegalStateException`: if no server address was configured. - /// - /// See also `getStatus()` - /// - /// See also `disconnect()` - /// - /// See also `ClientListener.onStatusChange()` - /// - /// See also `ConnectionDetails.setServerAddress()` - pub fn connect(&mut self) -> Result<(), IllegalStateException> { - // Implementation for connect - Ok(()) - } - - /// Operation method that requests to close the Session opened against the configured Lightstreamer - /// Server (if any). - /// - /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped. - /// - /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance, - /// are preserved to be re-subscribed to on future Sessions. - /// - /// Note that the request to disconnect is accomplished by the client in a separate thread; this - /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the - /// change yet. - /// - /// When the request to disconnect is finally being executed, if the status of the client is - /// "DISCONNECTED", then nothing will be done. - /// - /// See also `connect()` - pub fn disconnect(&mut self) { - // Implementation for disconnect - } - - /// Static inquiry method that can be used to share cookies between connections to the Server - /// (performed by this library) and connections to other sites that are performed by the application. - /// With this method, cookies received from the Server can be extracted for sending through other - /// connections, according with the URI to be accessed. - /// - /// See `addCookies()` for clarifications on when cookies are directly stored by the library and - /// when not. - /// - /// # Parameters - /// - /// * `uri`: the URI to which the cookies should be sent, or `None`. - /// - /// # Returns - /// - /// A list with the various cookies that can be sent in a HTTP request for the specified URI. - /// If a `None` URI was supplied, all available non-expired cookies will be returned. - pub fn get_cookies(uri: Option<&str>) -> Cookie { - // Implementation for get_cookies - unimplemented!() - } - - /// Returns a list containing the `ClientListener` instances that were added to this client. - /// - /// # Returns - /// - /// A list containing the listeners that were added to this client. - /// - /// See also `addListener()` - pub fn get_listeners(&self) -> &Vec> { - &self.listeners - } - - /// Inquiry method that gets the current client status and transport (when applicable). - /// - /// # Returns - /// - /// The current client status. It can be one of the following values: - /// - /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection; - /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server - /// and is currently verifying if a streaming connection is possible; - /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active; - /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active; - /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress; - /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress; - /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for - /// longer than a configured time; - /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened - /// (possibly after a timeout); - /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened - /// as soon as possible, as an attempt to recover the current session after a connection issue; - /// - `"DISCONNECTED"`: no connection is currently active. - /// - /// See also `ClientListener.onStatusChange()` - pub fn get_status(&self) -> &str { - // Implementation for get_status - unimplemented!() - } - - /// Inquiry method that returns a list containing all the `Subscription` instances that are - /// currently "active" on this `LightstreamerClient`. - /// - /// Internal second-level `Subscription` are not included. - /// - /// # Returns - /// - /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`. - /// The list can be empty. - /// - /// See also `subscribe()` - pub fn get_subscriptions(&self) -> &Vec { - &self.subscriptions - } - - /// Creates a new instance of `LightstreamerClient`. - /// - /// The constructor initializes the client with the server address and adapter set, if provided. - /// It sets up the connection details and options for the client. If no server address or - /// adapter set is specified, those properties on the client will be `None`. This allows - /// for late configuration of these details before connecting to the Lightstreamer server. - /// - /// # Arguments - /// * `server_address` - An optional reference to a string slice that represents the server - /// address to connect to. If `None`, the server address must be set later. - /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name. - /// If `None`, the adapter set must be set later. - /// - /// # Returns - /// A result containing the new `LightstreamerClient` instance if successful, or an - /// `IllegalStateException` if the initialization fails due to invalid state conditions. - /// - /// # Panics - /// Does not panic under normal circumstances. However, unexpected internal errors during - /// the creation of internal components could cause panics, which should be considered when - /// using this function in production code. - /// - /// # Example - /// ``` - /// // Example usage of `new` to create a LightstreamerClient with specified server address and - /// // adapter set. - /// let server_address = Some("http://myserver.com"); - /// let adapter_set = Some("MY_ADAPTER_SET"); - /// let ls_client = LightstreamerClient::new(server_address, adapter_set); - /// - /// assert!(ls_client.is_ok()); - /// if let Ok(client) = ls_client { - /// assert_eq!(client.server_address.unwrap(), "http://myserver.com".to_string()); - /// assert_eq!(client.adapter_set.unwrap(), "MY_ADAPTER_SET".to_string()); - /// } - /// ``` - pub fn new( - server_address: Option<&str>, - adapter_set: Option<&str>, - ) -> Result { - let connection_details = ConnectionDetails::new(server_address, adapter_set); - let connection_options = ConnectionOptions::new(); - - Ok(LightstreamerClient { - server_address: server_address.map(|s| s.to_string()), - adapter_set: adapter_set.map(|s| s.to_string()), - connection_details, - connection_options, - listeners: Vec::new(), - subscriptions: Vec::new(), - }) - } - - /// Removes a listener from the `LightstreamerClient` instance so that it will not receive - /// events anymore. - /// - /// A listener can be removed at any time. - /// - /// # Parameters - /// - /// * `listener`: The listener to be removed. - /// - /// See also `addListener()` - pub fn remove_listener(&mut self, listener: Box) { - unimplemented!("Implement mechanism to remove listener from LightstreamerClient"); - //self.listeners.remove(&listener); - } - - /// Operation method that sends a message to the Server. The message is interpreted and handled - /// by the Metadata Adapter associated to the current Session. This operation supports in-order - /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed - /// to arrive exactly once and respecting the original order, whatever is the underlying transport - /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if - /// necessary, to reduce network round trips. - /// - /// Upon subsequent calls to the method, the sequential management of the involved messages is - /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are - /// issued. - /// - /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport), - /// it will be resent; however, this may cause the subsequent messages to be delayed. For this - /// reason, each message can specify a "delayTimeout", which is the longest time the message, - /// after reaching the Server, can be kept waiting if one of more preceding messages haven't - /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded; - /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`. - /// Note that, because of the parallel transport of the messages, if a zero or very low timeout - /// is set for a message and the previous message was sent immediately before, it is possible - /// that the latter gets discarded even if no communication issues occur. The Server may also - /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for - /// long time. - /// - /// Sequence identifiers can also be associated with the messages. In this case, the sequential - /// management is restricted to all subsets of messages with the same sequence identifier associated. - /// - /// Notifications of the operation outcome can be received by supplying a suitable listener. The - /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence - /// are guaranteed to be invoked sequentially. - /// - /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate - /// processing is guaranteed, while strict ordering and even sequentialization of the processing - /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However, - /// messages that, for any reason, should fail to reach the Server whereas subsequent messages - /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that - /// the listener eventually gets a notification. - /// - /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget" - /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages - /// are performed at all, so as to optimize the processing and allow the highest possible throughput. - /// - /// Since a message is handled by the Metadata Adapter associated to the current connection, a - /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected` - /// flag is specified it is possible to call the method at any time and the client will take - /// care of sending the message as soon as a connection is available, otherwise, if the current - /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()` - /// event will be fired. - /// - /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message - /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected` - /// flag set to `true`. - /// - /// Also note that forwarding of the message to the server is made in a separate thread, hence, - /// if a message is sent while the connection is active, it could be aborted because of a subsequent - /// disconnection. In the same way a message sent while the connection is not active might be - /// sent because of a subsequent connection. - /// - /// # Parameters - /// - /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter - /// associated to the current connection. - /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed - /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier - /// is supplied, the message will be processed in the special way described above. The parameter - /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name. - /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured - /// timeout on missing messages, the latter will be used instead. The parameter is optional; if - /// a negative value is supplied, the Server configured timeout on missing messages will be applied. - /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side - /// timeout on missing messages still applies. - /// * `listener`: an object suitable for receiving notifications about the processing outcome. The - /// parameter is optional; if not supplied, no notification will be available. - /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected - /// status when the provided message is handled, then the message is not aborted right away but - /// is queued waiting for a new session. Note that the message can still be aborted later when - /// a new session is established. - pub fn send_message( - &mut self, - message: &str, - sequence: Option<&str>, - delay_timeout: Option, - listener: Option>, - enqueue_while_disconnected: bool, - ) { - // Implementation for send_message - } - - /// Static method that permits to configure the logging system used by the library. The logging - /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any - /// third-party logging system. - /// - /// If no logging system is specified, all the generated log is discarded. - /// - /// The following categories are available to be consumed: - /// - /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO - /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged. - /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO - /// level, requests are logged; at DEBUG level, request details and events from the Server are logged. - /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events - /// are logged; at DEBUG level, lifecycle event details are logged. - /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related - /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions - /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged. - /// - `lightstreamer.actions`: logs settings / API calls. - /// - /// # Parameters - /// - /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the - /// library classes. - pub fn set_logger_provider() { - unimplemented!("Implement mechanism to set logger provider for LightstreamerClient."); - } - /* - pub fn set_logger_provider(provider: LoggerProvider) { - // Implementation for set_logger_provider - } - */ - - /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to - /// accept untrusted ones. - /// - /// May be called only once before creating any `LightstreamerClient` instance. - /// - /// # Parameters - /// - /// * `factory`: an instance of `ssl.SSLContext` - /// - /// # Raises - /// - /// * `IllegalArgumentException`: if the factory is `None` - /// * `IllegalStateException`: if a factory is already installed - pub fn set_trust_manager_factory() { - unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient."); - } - /* - pub fn set_trust_manager_factory(factory: Option) -> Result<(), IllegalArgumentException> { - if factory.is_none() { - return Err(IllegalArgumentException::new( - "Factory cannot be None", - )); - } - - // Implementation for set_trust_manager_factory - Ok(()) - } - */ - - /// Operation method that adds a `Subscription` to the list of "active" Subscriptions. The `Subscription` - /// cannot already be in the "active" state. - /// - /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon - /// as there is a session available). Active `Subscription` are automatically persisted across different - /// sessions as long as a related unsubscribe call is not issued. - /// - /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription` - /// immediately enters the "active" state. - /// - /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient` - /// unless it is first removed from the "active" state through a call to `unsubscribe()`. - /// - /// Also note that forwarding of the subscription to the server is made in a separate thread. - /// - /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()` - /// event. - /// - /// # Parameters - /// - /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time - /// values. - /// - /// See also `unsubscribe()` - pub fn subscribe(&mut self, subscription: Subscription) { - self.subscriptions.push(subscription); - // Implementation for subscribe - } - - /// Operation method that removes a `Subscription` that is currently in the "active" state. - /// - /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its - /// items is requested to Lightstreamer Server. - /// - /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately - /// exits the "active" state. - /// - /// Note that forwarding of the unsubscription to the server is made in a separate thread. - /// - /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event. - /// - /// # Parameters - /// - /// * `subscription`: An "active" `Subscription` object that was activated by this `LightstreamerClient` - /// instance. - pub fn unsubscribe(&mut self, subscription: Subscription) { - unimplemented!("Implement mechanism to unsubscribe from LightstreamerClient."); - } - /* - pub fn unsubscribe(&mut self, subscription: &Subscription) { - if let Some(index) = self.subscriptions.iter().position(|s| s == subscription) { - self.subscriptions.remove(index); - // Implementation for unsubscribe - } - } - */ -} \ No newline at end of file diff --git a/src/ls_client.rs b/src/ls_client.rs new file mode 100644 index 0000000..214a722 --- /dev/null +++ b/src/ls_client.rs @@ -0,0 +1,521 @@ +use crate::client_listener::ClientListener; +use crate::client_message_listener::ClientMessageListener; +use crate::connection_details::ConnectionDetails; +use crate::connection_options::ConnectionOptions; +use crate::subscription::Subscription; +use crate::IllegalStateException; + +use cookie::Cookie; +use std::fmt::{self, Debug, Formatter}; + +/// Facade class for the management of the communication to Lightstreamer Server. Used to provide +/// configuration settings, event handlers, operations for the control of the connection lifecycle, +/// Subscription handling and to send messages. +/// +/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a +/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions, +/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally, +/// a single instance of `LightstreamerClient` is needed. +/// +/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple +/// endpoints. +/// +/// You can listen to the events generated by a session by registering an event listener, such as +/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events, +/// such as session creation, connection status, subscription updates, and server messages. However, +/// you should be aware that the event notifications are dispatched by a single thread, the so-called +/// event thread. This means that if the operations of a listener are slow or blocking, they will +/// delay the processing of the other listeners and affect the performance of your application. +/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep +/// the listener methods as fast and simple as possible. Note that even if you create multiple +/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared +/// among them. +/// +/// # Parameters +/// +/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient` +/// will connect to. It is possible to specify it later by using `None` here. See +/// `ConnectionDetails.setServerAddress()` for details. +/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle +/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to +/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()` +/// for details. +/// +/// # Raises +/// +/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()` +/// for details. +pub struct LightstreamerClient { + server_address: Option, + adapter_set: Option, + /// Data object that contains the details needed to open a connection to a Lightstreamer Server. + /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties + /// of this object can be overwritten by values received from a Lightstreamer Server. + pub connection_details: ConnectionDetails, + /// Data object that contains options and policies for the connection to the server. This instance + /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object + /// can be overwritten by values received from a Lightstreamer Server. + pub connection_options: ConnectionOptions, + listeners: Vec>, + subscriptions: Vec, +} + +impl Debug for LightstreamerClient { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("LightstreamerClient") + .field("server_address", &self.server_address) + .field("adapter_set", &self.adapter_set) + .field("connection_details", &self.connection_details) + .field("connection_options", &self.connection_options) + .field("listeners", &self.listeners) + .field("subscriptions", &self.subscriptions) + .finish() + } +} + +impl LightstreamerClient { + /// A constant string representing the name of the library. + pub const LIB_NAME: &'static str = "rust_client"; + + /// A constant string representing the version of the library. + pub const LIB_VERSION: &'static str = "0.1.0"; + + /// Static method that can be used to share cookies between connections to the Server (performed by + /// this library) and connections to other sites that are performed by the application. With this + /// method, cookies received by the application can be added (or replaced if already present) to + /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain + /// is compatible with the Server domain will be used internally. + /// + /// This method should be invoked before calling the `LightstreamerClient.connect()` method. + /// However it can be invoked at any time; it will affect the internal cookie set immediately + /// and the sending of cookies on the next HTTP request or WebSocket establishment. + /// + /// # Parameters + /// + /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`. + /// * `cookies`: an instance of `http.cookies.SimpleCookie`. + /// + /// See also `getCookies()` + pub fn add_cookies(uri: &str, cookies: &Cookie) { + // Implementation for add_cookies + } + + /// Adds a listener that will receive events from the `LightstreamerClient` instance. + /// + /// The same listener can be added to several different `LightstreamerClient` instances. + /// + /// A listener can be added at any time. A call to add a listener already present will be ignored. + /// + /// # Parameters + /// + /// * `listener`: An object that will receive the events as documented in the `ClientListener` + /// interface. + /// + /// See also `removeListener()` + pub fn add_listener(&mut self, listener: Box) { + self.listeners.push(listener); + } + + /// Operation method that requests to open a Session against the configured Lightstreamer Server. + /// + /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`, + /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer + /// for some seconds from the streaming connection, then it will automatically open a polling + /// connection. + /// + /// A polling connection may also be opened if the environment is not suitable for a streaming + /// connection. + /// + /// Note that as "polling connection" we mean a loop of polling requests, each of which requires + /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server. + /// + /// Note that the request to connect is accomplished by the client in a separate thread; this + /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change + /// yet. + /// + /// When the request to connect is finally being executed, if the current status of the client + /// is not `DISCONNECTED`, then nothing will be done. + /// + /// # Raises + /// + /// * `IllegalStateException`: if no server address was configured. + /// + /// See also `getStatus()` + /// + /// See also `disconnect()` + /// + /// See also `ClientListener.onStatusChange()` + /// + /// See also `ConnectionDetails.setServerAddress()` + pub fn connect(&mut self) -> Result<(), IllegalStateException> { + // Implementation for connect + Ok(()) + } + + /// Operation method that requests to close the Session opened against the configured Lightstreamer + /// Server (if any). + /// + /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped. + /// + /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance, + /// are preserved to be re-subscribed to on future Sessions. + /// + /// Note that the request to disconnect is accomplished by the client in a separate thread; this + /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the + /// change yet. + /// + /// When the request to disconnect is finally being executed, if the status of the client is + /// "DISCONNECTED", then nothing will be done. + /// + /// See also `connect()` + pub fn disconnect(&mut self) { + // Implementation for disconnect + } + + /// Static inquiry method that can be used to share cookies between connections to the Server + /// (performed by this library) and connections to other sites that are performed by the application. + /// With this method, cookies received from the Server can be extracted for sending through other + /// connections, according with the URI to be accessed. + /// + /// See `addCookies()` for clarifications on when cookies are directly stored by the library and + /// when not. + /// + /// # Parameters + /// + /// * `uri`: the URI to which the cookies should be sent, or `None`. + /// + /// # Returns + /// + /// A list with the various cookies that can be sent in a HTTP request for the specified URI. + /// If a `None` URI was supplied, all available non-expired cookies will be returned. + pub fn get_cookies(uri: Option<&str>) -> Cookie { + // Implementation for get_cookies + unimplemented!() + } + + /// Returns a list containing the `ClientListener` instances that were added to this client. + /// + /// # Returns + /// + /// A list containing the listeners that were added to this client. + /// + /// See also `addListener()` + pub fn get_listeners(&self) -> &Vec> { + &self.listeners + } + + /// Inquiry method that gets the current client status and transport (when applicable). + /// + /// # Returns + /// + /// The current client status. It can be one of the following values: + /// + /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection; + /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server + /// and is currently verifying if a streaming connection is possible; + /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active; + /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active; + /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress; + /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress; + /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for + /// longer than a configured time; + /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened + /// (possibly after a timeout); + /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened + /// as soon as possible, as an attempt to recover the current session after a connection issue; + /// - `"DISCONNECTED"`: no connection is currently active. + /// + /// See also `ClientListener.onStatusChange()` + pub fn get_status(&self) -> &str { + // Implementation for get_status + unimplemented!() + } + + /// Inquiry method that returns a list containing all the `Subscription` instances that are + /// currently "active" on this `LightstreamerClient`. + /// + /// Internal second-level `Subscription` are not included. + /// + /// # Returns + /// + /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`. + /// The list can be empty. + /// + /// See also `subscribe()` + pub fn get_subscriptions(&self) -> &Vec { + &self.subscriptions + } + + /// Creates a new instance of `LightstreamerClient`. + /// + /// The constructor initializes the client with the server address and adapter set, if provided. + /// It sets up the connection details and options for the client. If no server address or + /// adapter set is specified, those properties on the client will be `None`. This allows + /// for late configuration of these details before connecting to the Lightstreamer server. + /// + /// # Arguments + /// * `server_address` - An optional reference to a string slice that represents the server + /// address to connect to. If `None`, the server address must be set later. + /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name. + /// If `None`, the adapter set must be set later. + /// + /// # Returns + /// A result containing the new `LightstreamerClient` instance if successful, or an + /// `IllegalStateException` if the initialization fails due to invalid state conditions. + /// + /// # Panics + /// Does not panic under normal circumstances. However, unexpected internal errors during + /// the creation of internal components could cause panics, which should be considered when + /// using this function in production code. + /// + /// # Example + /// ``` + /// // Example usage of `new` to create a LightstreamerClient with specified server address and + /// // adapter set. + /// let server_address = Some("http://myserver.com"); + /// let adapter_set = Some("MY_ADAPTER_SET"); + /// let ls_client = LightstreamerClient::new(server_address, adapter_set); + /// + /// assert!(ls_client.is_ok()); + /// if let Ok(client) = ls_client { + /// assert_eq!(client.server_address.unwrap(), "http://myserver.com".to_string()); + /// assert_eq!(client.adapter_set.unwrap(), "MY_ADAPTER_SET".to_string()); + /// } + /// ``` + pub fn new( + server_address: Option<&str>, + adapter_set: Option<&str>, + ) -> Result { + let connection_details = ConnectionDetails::new(server_address, adapter_set); + let connection_options = ConnectionOptions::new(); + + Ok(LightstreamerClient { + server_address: server_address.map(|s| s.to_string()), + adapter_set: adapter_set.map(|s| s.to_string()), + connection_details, + connection_options, + listeners: Vec::new(), + subscriptions: Vec::new(), + }) + } + + /// Removes a listener from the `LightstreamerClient` instance so that it will not receive + /// events anymore. + /// + /// A listener can be removed at any time. + /// + /// # Parameters + /// + /// * `listener`: The listener to be removed. + /// + /// See also `addListener()` + pub fn remove_listener(&mut self, listener: Box) { + unimplemented!("Implement mechanism to remove listener from LightstreamerClient"); + //self.listeners.remove(&listener); + } + + /// Operation method that sends a message to the Server. The message is interpreted and handled + /// by the Metadata Adapter associated to the current Session. This operation supports in-order + /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed + /// to arrive exactly once and respecting the original order, whatever is the underlying transport + /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if + /// necessary, to reduce network round trips. + /// + /// Upon subsequent calls to the method, the sequential management of the involved messages is + /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are + /// issued. + /// + /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport), + /// it will be resent; however, this may cause the subsequent messages to be delayed. For this + /// reason, each message can specify a "delayTimeout", which is the longest time the message, + /// after reaching the Server, can be kept waiting if one of more preceding messages haven't + /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded; + /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`. + /// Note that, because of the parallel transport of the messages, if a zero or very low timeout + /// is set for a message and the previous message was sent immediately before, it is possible + /// that the latter gets discarded even if no communication issues occur. The Server may also + /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for + /// long time. + /// + /// Sequence identifiers can also be associated with the messages. In this case, the sequential + /// management is restricted to all subsets of messages with the same sequence identifier associated. + /// + /// Notifications of the operation outcome can be received by supplying a suitable listener. The + /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence + /// are guaranteed to be invoked sequentially. + /// + /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate + /// processing is guaranteed, while strict ordering and even sequentialization of the processing + /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However, + /// messages that, for any reason, should fail to reach the Server whereas subsequent messages + /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that + /// the listener eventually gets a notification. + /// + /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget" + /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages + /// are performed at all, so as to optimize the processing and allow the highest possible throughput. + /// + /// Since a message is handled by the Metadata Adapter associated to the current connection, a + /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected` + /// flag is specified it is possible to call the method at any time and the client will take + /// care of sending the message as soon as a connection is available, otherwise, if the current + /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()` + /// event will be fired. + /// + /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message + /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected` + /// flag set to `true`. + /// + /// Also note that forwarding of the message to the server is made in a separate thread, hence, + /// if a message is sent while the connection is active, it could be aborted because of a subsequent + /// disconnection. In the same way a message sent while the connection is not active might be + /// sent because of a subsequent connection. + /// + /// # Parameters + /// + /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter + /// associated to the current connection. + /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed + /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier + /// is supplied, the message will be processed in the special way described above. The parameter + /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name. + /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured + /// timeout on missing messages, the latter will be used instead. The parameter is optional; if + /// a negative value is supplied, the Server configured timeout on missing messages will be applied. + /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side + /// timeout on missing messages still applies. + /// * `listener`: an object suitable for receiving notifications about the processing outcome. The + /// parameter is optional; if not supplied, no notification will be available. + /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected + /// status when the provided message is handled, then the message is not aborted right away but + /// is queued waiting for a new session. Note that the message can still be aborted later when + /// a new session is established. + pub fn send_message( + &mut self, + message: &str, + sequence: Option<&str>, + delay_timeout: Option, + listener: Option>, + enqueue_while_disconnected: bool, + ) { + // Implementation for send_message + } + + /// Static method that permits to configure the logging system used by the library. The logging + /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any + /// third-party logging system. + /// + /// If no logging system is specified, all the generated log is discarded. + /// + /// The following categories are available to be consumed: + /// + /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO + /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged. + /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO + /// level, requests are logged; at DEBUG level, request details and events from the Server are logged. + /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events + /// are logged; at DEBUG level, lifecycle event details are logged. + /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related + /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions + /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged. + /// - `lightstreamer.actions`: logs settings / API calls. + /// + /// # Parameters + /// + /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the + /// library classes. + pub fn set_logger_provider() { + unimplemented!("Implement mechanism to set logger provider for LightstreamerClient."); + } + /* + pub fn set_logger_provider(provider: LoggerProvider) { + // Implementation for set_logger_provider + } + */ + + /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to + /// accept untrusted ones. + /// + /// May be called only once before creating any `LightstreamerClient` instance. + /// + /// # Parameters + /// + /// * `factory`: an instance of `ssl.SSLContext` + /// + /// # Raises + /// + /// * `IllegalArgumentException`: if the factory is `None` + /// * `IllegalStateException`: if a factory is already installed + pub fn set_trust_manager_factory() { + unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient."); + } + /* + pub fn set_trust_manager_factory(factory: Option) -> Result<(), IllegalArgumentException> { + if factory.is_none() { + return Err(IllegalArgumentException::new( + "Factory cannot be None", + )); + } + + // Implementation for set_trust_manager_factory + Ok(()) + } + */ + + /// Operation method that adds a `Subscription` to the list of "active" Subscriptions. The `Subscription` + /// cannot already be in the "active" state. + /// + /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon + /// as there is a session available). Active `Subscription` are automatically persisted across different + /// sessions as long as a related unsubscribe call is not issued. + /// + /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription` + /// immediately enters the "active" state. + /// + /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient` + /// unless it is first removed from the "active" state through a call to `unsubscribe()`. + /// + /// Also note that forwarding of the subscription to the server is made in a separate thread. + /// + /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()` + /// event. + /// + /// # Parameters + /// + /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time + /// values. + /// + /// See also `unsubscribe()` + pub fn subscribe(&mut self, subscription: Subscription) { + self.subscriptions.push(subscription); + // Implementation for subscribe + } + + /// Operation method that removes a `Subscription` that is currently in the "active" state. + /// + /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its + /// items is requested to Lightstreamer Server. + /// + /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately + /// exits the "active" state. + /// + /// Note that forwarding of the unsubscription to the server is made in a separate thread. + /// + /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event. + /// + /// # Parameters + /// + /// * `subscription`: An "active" `Subscription` object that was activated by this `LightstreamerClient` + /// instance. + pub fn unsubscribe(&mut self, subscription: Subscription) { + unimplemented!("Implement mechanism to unsubscribe from LightstreamerClient."); + } + /* + pub fn unsubscribe(&mut self, subscription: &Subscription) { + if let Some(index) = self.subscriptions.iter().position(|s| s == subscription) { + self.subscriptions.remove(index); + // Implementation for unsubscribe + } + } + */ +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 2dddb0d..5c2acc5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ -use crate::item_update::ItemUpdate; -use crate::subscription::{Subscription, SubscriptionMode}; -use crate::subscription_listener::SubscriptionListener; +use lightstreamer_client::item_update::ItemUpdate; +use lightstreamer_client::ls_client::LightstreamerClient; +use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode}; +use lightstreamer_client::subscription_listener::SubscriptionListener; use futures::stream::StreamExt; use futures::SinkExt; -use lightstreamer_client::lightstreamer_client::LightstreamerClient; use reqwest::Client; use serde_urlencoded; use std::error::Error; @@ -12,10 +12,6 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -mod item_update; -mod subscription; -mod subscription_listener; - async fn establish_persistent_http_connection( session_id_shared: Arc>, ) -> Result<(), reqwest::Error> { @@ -225,6 +221,8 @@ async fn main() -> Result<(), Box> { )?; subscription.add_listener(Box::new(MySubscriptionListener {})); + subscription.set_data_adapter(Some(String::from("QUOTE_ADAPTER")))?; + subscription.set_requested_snapshot(Some(Snapshot::Yes))?; let client = LightstreamerClient::new( Some("http://push.lightstreamer.com/lightstreamer"), diff --git a/src/subscription.rs b/src/subscription.rs index f7e0d88..be10380 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -2,7 +2,16 @@ use crate::subscription_listener::SubscriptionListener; use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; -/// Enum representing the subscription mode +/// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription. +#[derive(Debug)] +pub enum Snapshot { + Yes, + No, + Number(usize), + None, +} + +/// Enum representing the subscription mode. #[derive(Debug, PartialEq, Eq)] pub enum SubscriptionMode { Merge, @@ -49,7 +58,7 @@ pub struct Subscription { /// The maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription. requested_max_frequency: Option, /// The snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription. - requested_snapshot: Option, + requested_snapshot: Option, /// The selector name for all the items in the Subscription, used as a filter on the updates received. selector: Option, /// A list of SubscriptionListener instances that will receive events from this Subscription. @@ -618,15 +627,22 @@ impl Subscription { /// /// # See also /// `ItemUpdate.isSnapshot()` - pub fn set_requested_snapshot(&mut self, snapshot: Option) -> Result<(), String> { + pub fn set_requested_snapshot(&mut self, snapshot: Option) -> Result<(), String> { if self.is_active { return Err("Subscription is active".to_string()); } - if self.mode == SubscriptionMode::Raw && snapshot.is_some() { - return Err("Cannot request snapshot for Raw mode".to_string()); - } - if self.mode != SubscriptionMode::Distinct && snapshot.is_some() && snapshot.as_ref().unwrap().parse::().is_ok() { - return Err("Cannot specify snapshot length for non-Distinct mode".to_string()); + match snapshot { + Some(Snapshot::None) => { + if self.mode == SubscriptionMode::Raw { + return Err("Cannot request snapshot for Raw mode".to_string()); + } + } + Some(Snapshot::Number(_)) => { + if self.mode != SubscriptionMode::Distinct { + return Err("Cannot specify snapshot length for non-Distinct mode".to_string()); + } + } + _ => {} } self.requested_snapshot = snapshot; Ok(()) @@ -639,7 +655,7 @@ impl Subscription { /// /// # Returns /// "yes", "no", `None`, or an integer number. - pub fn get_requested_snapshot(&self) -> Option<&String> { + pub fn get_requested_snapshot(&self) -> Option<&Snapshot> { self.requested_snapshot.as_ref() } -- cgit v1.2.3