use crate::client_listener::ClientListener; use crate::client_message_listener::ClientMessageListener; use crate::connection_details::ConnectionDetails; use crate::connection_options::ConnectionOptions; use crate::error::IllegalStateException; use crate::item_update::ItemUpdate; use crate::subscription::{Snapshot, Subscription, SubscriptionMode}; use crate::util::*; use cookie::Cookie; use futures_util::{SinkExt, StreamExt}; use std::collections::HashMap; use std::error::Error; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use tokio::sync::Notify; use tokio_tungstenite::{ connect_async, tungstenite::{ http::{HeaderName, HeaderValue, Request}, Message, }, }; use tracing::{debug, error, info, instrument, trace, warn, Level}; use url::Url; /// Represents the current status of the `LightstreamerClient`. pub enum ClientStatus { Connecting, Connected(ConnectionType), Stalled, Disconnected(DisconnectionType), } pub enum ConnectionType { HttpPolling, HttpStreaming, StreamSensing, WsPolling, WsStreaming, } pub enum DisconnectionType { WillRetry, TryingRecovery, } pub enum LogType { TracingLogs, StdLogs, } /// Facade class for the management of the communication to Lightstreamer Server. Used to provide /// configuration settings, event handlers, operations for the control of the connection lifecycle, /// Subscription handling and to send messages. /// /// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a /// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions, /// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally, /// a single instance of `LightstreamerClient` is needed. /// /// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple /// endpoints. /// /// You can listen to the events generated by a session by registering an event listener, such as /// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events, /// such as session creation, connection status, subscription updates, and server messages. However, /// you should be aware that the event notifications are dispatched by a single thread, the so-called /// event thread. This means that if the operations of a listener are slow or blocking, they will /// delay the processing of the other listeners and affect the performance of your application. /// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep /// the listener methods as fast and simple as possible. Note that even if you create multiple /// instances of `LightstreamerClient`, they will all use a single event thread, that is shared /// among them. /// /// # Parameters /// /// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient` /// will connect to. It is possible to specify it later by using `None` here. See /// `ConnectionDetails.setServerAddress()` for details. /// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle /// all requests in the Session associated with this `LightstreamerClient`. It is possible not to /// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()` /// for details. /// /// # Raises /// /// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()` /// for details. pub struct LightstreamerClient { /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect. server_address: Option, /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all /// requests in the Session associated with this `LightstreamerClient`. adapter_set: Option, /// Data object that contains the details needed to open a connection to a Lightstreamer Server. /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties /// of this object can be overwritten by values received from a Lightstreamer Server. pub connection_details: ConnectionDetails, /// Data object that contains options and policies for the connection to the server. This instance /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object /// can be overwritten by values received from a Lightstreamer Server. pub connection_options: ConnectionOptions, /// A list of listeners that will receive events from the `LightstreamerClient` instance. listeners: Vec>, /// A list containing all the `Subscription` instances that are currently "active" on this /// `LightstreamerClient`. subscriptions: Vec, /// The current status of the client. status: ClientStatus, /// Logging Type to be used logging: LogType, } impl Debug for LightstreamerClient { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("LightstreamerClient") .field("server_address", &self.server_address) .field("adapter_set", &self.adapter_set) .field("connection_details", &self.connection_details) .field("connection_options", &self.connection_options) .field("listeners", &self.listeners) .field("subscriptions", &self.subscriptions) .finish() } } impl LightstreamerClient { /// A constant string representing the name of the library. pub const LIB_NAME: &'static str = "rust_client"; /// A constant string representing the version of the library. pub const LIB_VERSION: &'static str = "0.1.0"; // // Constants for WebSocket connection. // pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w=="; pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com"; pub const SEC_WEBSOCKET_VERSION: &'static str = "13"; pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket"; /// A constant string representing the version of the TLCP protocol used by the library. pub const TLCP_VERSION: &'static str = "TLCP-2.4.0"; /// Static method that can be used to share cookies between connections to the Server (performed by /// this library) and connections to other sites that are performed by the application. With this /// method, cookies received by the application can be added (or replaced if already present) to /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain /// is compatible with the Server domain will be used internally. /// /// This method should be invoked before calling the `LightstreamerClient.connect()` method. /// However it can be invoked at any time; it will affect the internal cookie set immediately /// and the sending of cookies on the next HTTP request or WebSocket establishment. /// /// # Parameters /// /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`. /// * `cookies`: an instance of `http.cookies.SimpleCookie`. /// /// See also `getCookies()` pub fn add_cookies(_uri: &str, _cookies: &Cookie) { // Implementation for add_cookies unimplemented!("Implement mechanism to add cookies to LightstreamerClient"); } /// Adds a listener that will receive events from the `LightstreamerClient` instance. /// /// The same listener can be added to several different `LightstreamerClient` instances. /// /// A listener can be added at any time. A call to add a listener already present will be ignored. /// /// # Parameters /// /// * `listener`: An object that will receive the events as documented in the `ClientListener` /// interface. /// /// See also `removeListener()` pub fn add_listener(&mut self, listener: Box) { self.listeners.push(listener); } /// Operation method that requests to open a Session against the configured Lightstreamer Server. /// /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`, /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer /// for some seconds from the streaming connection, then it will automatically open a polling /// connection. /// /// A polling connection may also be opened if the environment is not suitable for a streaming /// connection. /// /// Note that as "polling connection" we mean a loop of polling requests, each of which requires /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server. /// /// Note that the request to connect is accomplished by the client in a separate thread; this /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change /// yet. /// /// When the request to connect is finally being executed, if the current status of the client /// is not `DISCONNECTED`, then nothing will be done. /// /// # Raises /// /// * `IllegalStateException`: if no server address was configured. /// /// See also `getStatus()` /// /// See also `disconnect()` /// /// See also `ClientListener.onStatusChange()` /// /// See also `ConnectionDetails.setServerAddress()` #[instrument] pub async fn connect(&mut self, shutdown_signal: Arc) -> Result<(), Box> { // Check if the server address is configured. if self.server_address.is_none() { return Err(Box::new(IllegalStateException::new( "No server address was configured.", ))); } // // Only WebSocket streaming transport is currently supported. // let forced_transport = self.connection_options.get_forced_transport(); if forced_transport.is_none() || *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming { return Err(Box::new(IllegalStateException::new( "Only WebSocket streaming transport is currently supported.", ))); } // // Convert the HTTP URL to a WebSocket URL. // let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here. let mut url = Url::parse(&http_url) .expect("Failed to parse server address URL from connection details."); match url.scheme() { "http" => url .set_scheme("ws") .expect("Failed to set scheme to ws for WebSocket URL."), "https" => url .set_scheme("wss") .expect("Failed to set scheme to wss for WebSocket URL."), invalid_scheme => { return Err(Box::new(IllegalStateException::new(&format!( "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.", invalid_scheme )))); } } let ws_url = url.as_str(); // Build the WebSocket request with the necessary headers. let request = Request::builder() .uri(ws_url) .header( HeaderName::from_static("connection"), HeaderValue::from_static("Upgrade"), ) .header( HeaderName::from_static("host"), HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| { IllegalStateException::new(&format!( "Invalid header value for header with name 'host': {}", err )) })?, ) .header( HeaderName::from_static("sec-websocket-key"), HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY), ) .header( HeaderName::from_static("sec-websocket-protocol"), HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL), ) .header( HeaderName::from_static("sec-websocket-version"), HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION), ) .header( HeaderName::from_static("upgrade"), HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE), ) .body(())?; // Connect to the Lightstreamer server using WebSocket. let ws_stream = match connect_async(request).await { Ok((ws_stream, response)) => { if let Some(server_header) = response.headers().get("server") { self.make_log( Level::INFO, &format!( "Connected to Lightstreamer server: {}", server_header.to_str().unwrap_or("") ), ); } else { self.make_log(Level::INFO, "Connected to Lightstreamer server"); } ws_stream } Err(err) => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, format!( "Failed to connect to Lightstreamer server with WebSocket: {}", err ), ))); } }; // Split the WebSocket stream into a write and a read stream. let (mut write_stream, mut read_stream) = ws_stream.split(); // // Initiate communication with the server by sending a 'wsok' message. // write_stream.send(Message::Text("wsok".into())).await?; // // Start reading and processing messages from the server. // let mut request_id: usize = 0; let mut _session_id: Option = None; let mut subscription_id: usize = 0; let mut subscription_item_updates: HashMap> = HashMap::new(); loop { tokio::select! { message = read_stream.next() => { match message { Some(Ok(Message::Text(text))) => { // Messages could include multiple submessages separated by /r/n. // Split the message into submessages and process each one separately. let submessages: Vec<&str> = text.split("\r\n") .filter(|&line| !line.trim().is_empty()) // Filter out empty lines. .collect(); for submessage in submessages { let clean_text = clean_message(&submessage); let submessage_fields: Vec<&str> = clean_text.split(",").collect(); match *submessage_fields.first().unwrap_or(&"") { // // Errors from server. // "conerr" | "reqerr" => { self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) ); break; }, // // Session created successfully. // "conok" => { if let Some(session_id) = submessage_fields.get(1).as_deref() { self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) ); self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) ); // // Subscribe to the desired items. // while let Some(subscription) = self.subscriptions.get(subscription_id) { // // Gather all the necessary subscription parameters. // request_id += 1; let ls_req_id = request_id.to_string(); subscription_id += 1; let ls_sub_id = subscription_id.to_string(); let ls_mode = subscription.get_mode().to_string(); let ls_group = match subscription.get_item_group() { Some(item_group) => item_group.to_string(), None => match subscription.get_items() { Some(items) => { items.join(" ") }, None => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, "No item group or items found in subscription.", ))); }, }, }; let ls_schema = match subscription.get_field_schema() { Some(field_schema) => field_schema.to_string(), None => match subscription.get_fields() { Some(fields) => { fields.join(" ") }, None => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, "No field schema or fields found in subscription.", ))); }, }, }; let ls_data_adapter = match subscription.get_data_adapter() { Some(data_adapter) => data_adapter.to_string(), None => "".to_string(), }; let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string(); // // Prepare the subscription request. // let mut params: Vec<(&str, &str)> = vec![ ("LS_data_adapter", &ls_data_adapter), ("LS_reqId", &ls_req_id), ("LS_op", "add"), ("LS_subId", &ls_sub_id), ("LS_mode", &ls_mode), ("LS_group", &ls_group), ("LS_schema", &ls_schema), ("LS_ack", "false"), ]; // Remove the data adapter parameter if not specified. if ls_data_adapter == "" { params.remove(0); } if ls_snapshot != "" { params.push(("LS_snapshot", &ls_snapshot)); } let encoded_params = serde_urlencoded::to_string(¶ms)?; write_stream .send(Message::Text(format!("control\r\n{}", encoded_params).into())) .await?; info!("Sent subscription request: '{}'", encoded_params); } } else { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, "Session ID not found in 'conok' message from server", ))); } }, // // Notifications from server. // "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => { self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) ); // Don't do anything with these notifications for now. }, "probe" => { self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) ); }, "reqok" => { self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) ); }, // // Subscription confirmation from server. // "subok" => { self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) ); }, // // Data updates from server. // "u" => { // Parse arguments from the received message. let arguments = clean_text.split(",").collect::>(); // // Extract the subscription from the first argument. // let subscription_index = arguments.get(1).unwrap_or(&"").parse::().unwrap_or(0); let subscription = match self.get_subscriptions().get(subscription_index-1) { Some(subscription) => subscription, None => { self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) ); continue; } }; // // Extract the item from the second argument. // let item_index = arguments.get(2).unwrap_or(&"").parse::().unwrap_or(0); let item = match subscription.get_items() { Some(items) => items.get(item_index-1), None => { self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) ); continue; } }; // // Determine if the update is a snapshot or real-time update based on the subscription parameters. // let is_snapshot = match subscription.get_requested_snapshot() { Some(ls_snapshot) => { match ls_snapshot { Snapshot::No => false, Snapshot::Yes => { match subscription.get_mode() { SubscriptionMode::Merge => { if arguments.len() == 4 && arguments[3] == "$" { // EOS notification received true } else { // If item doesn't exist in item_updates yet, the first update // is always a snapshot. if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) { if let Some(_) = item_updates.get(&(item_index)) { // Item update already exists in item_updates, so it's not a snapshot. false } else { // Item update doesn't exist in item_updates, so the first update is always a snapshot. true } } else { // Item updates not found for subscription, so the first update is always a snapshot. true } } }, SubscriptionMode::Distinct | SubscriptionMode::Command => { if !subscription.is_subscribed() { true } else { false } }, _ => false, } }, _ => false, } }, None => false, }; // Extract the field values from the third argument. let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect(); // // Get fields from subscription and create a HashMap of field names and values. // let subscription_fields = subscription.get_fields(); let mut field_map: HashMap> = subscription_fields .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect()) .unwrap_or_default(); let mut field_index = 0; for value in field_values { match value { "" => { // An empty value means the field is unchanged compared to the previous update of the same field. if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { field_map.insert(field_name.to_string(), None); } field_index += 1; } "#" | "$" => { // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty. if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { field_map.insert(field_name.to_string(), Some("".to_string())); } field_index += 1; } value if value.starts_with('^') => { let command = value.chars().nth(1).unwrap_or(' '); match command { '0'..='9' => { let count = value[1..].parse().unwrap_or(0); for i in 0..count { if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) { field_map.insert(field_name.to_string(), None); } } field_index += count; } 'P' | 'T' => { let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string()); if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) { let new_value = match command { 'P' => { // Apply JSON Patch let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null); let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null); let patch_operations: Vec = serde_json::from_value(patch).unwrap_or_default(); let _ = json_patch::patch(&mut prev_json, &patch_operations); prev_json.to_string() } 'T' => { // Apply TLCP-diff //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string()) unimplemented!("Implement TLCP-diff"); } _ => unreachable!(), }; field_map.insert(field_name.to_string(), Some(new_value.to_string())); } } field_index += 1; } _ => { let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { field_map.insert(field_name.to_string(), Some(decoded_value)); } field_index += 1; } } } _ => { let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { field_map.insert(field_name.to_string(), Some(decoded_value)); } field_index += 1; } } } // Store only item_update's changed fields. let changed_fields: HashMap = field_map.iter() .filter_map(|(k, v)| { if let Some(v) = v { Some((k.clone(), v.clone())) } else { None } }) .collect(); // // Take the proper item_update from item_updates and update it with changed fields. // If the item_update doesn't exist yet, create a new one. // let current_item_update: ItemUpdate; match subscription_item_updates.get_mut(&(subscription_index)) { Some(item_updates) => match item_updates.get_mut(&(item_index)) { Some(item_update) => { // // Iterate changed_fields and update existing item_update.fields assigning the new values. // for (field_name, new_value) in &changed_fields { if item_update.fields.contains_key(field_name) { item_update.fields.insert((*field_name).clone(), Some(new_value.clone())); } } item_update.changed_fields = changed_fields; item_update.is_snapshot = is_snapshot; current_item_update = item_update.clone(); }, None => { // Create a new item_update and add it to item_updates. let item_update = ItemUpdate { item_name: item.cloned(), item_pos: item_index, fields: field_map, changed_fields: changed_fields, is_snapshot: is_snapshot, }; current_item_update = item_update.clone(); item_updates.insert(item_index, item_update); } }, None => { // Create a new item_update and add it to item_updates. let item_update = ItemUpdate { item_name: item.cloned(), item_pos: item_index, fields: field_map, changed_fields: changed_fields, is_snapshot: is_snapshot, }; current_item_update = item_update.clone(); let mut item_updates = HashMap::new(); item_updates.insert(item_index, item_update); subscription_item_updates.insert(subscription_index, item_updates); } }; // Get mutable subscription listeners directly. let subscription_listeners = subscription.get_listeners(); // Iterate subscription listeners and call on_item_update for each listener. for listener in subscription_listeners { listener.on_item_update(¤t_item_update); } } // // Connection confirmation from server. // "wsok" => { self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) ); // // Request session creation. // let ls_adapter_set = match self.connection_details.get_adapter_set() { Some(adapter_set) => adapter_set, None => { return Err(Box::new(IllegalStateException::new( "No adapter set found in connection details.", ))); }, }; let ls_send_sync = self.connection_options.get_send_sync().to_string(); let mut params: Vec<(&str, &str)> = vec![ ("LS_adapter_set", &ls_adapter_set), ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"), ("LS_send_sync", &ls_send_sync), ]; if let Some(user) = &self.connection_details.get_user() { params.push(("LS_user", user)); } if let Some(password) = &self.connection_details.get_password() { params.push(("LS_password", password)); } params.push(("LS_protocol", Self::TLCP_VERSION)); let encoded_params = serde_urlencoded::to_string(¶ms)?; write_stream .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into())) .await?; self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) ); }, unexpected_message => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!( "Unexpected message received from server: '{:?}'", unexpected_message ), ))); }, } } }, Some(Ok(non_text_message)) => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!( "Unexpected non-text message from server: {:?}", non_text_message ), ))); }, Some(Err(err)) => { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("Error reading message from server: {}", err), ))); }, None => { self.make_log( Level::DEBUG, "No more messages from server" ); break; }, } }, _ = shutdown_signal.notified() => { self.make_log( Level::INFO, &format!("Received shutdown signal") ); break; }, } } Ok(()) } /// Operation method that requests to close the Session opened against the configured Lightstreamer /// Server (if any). /// /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped. /// /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance, /// are preserved to be re-subscribed to on future Sessions. /// /// Note that the request to disconnect is accomplished by the client in a separate thread; this /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the /// change yet. /// /// When the request to disconnect is finally being executed, if the status of the client is /// "DISCONNECTED", then nothing will be done. /// /// See also `connect()` #[instrument] pub async fn disconnect(&mut self) { // Implementation for disconnect self.make_log( Level::INFO, "Disconnecting from Lightstreamer server" ); } /// Static inquiry method that can be used to share cookies between connections to the Server /// (performed by this library) and connections to other sites that are performed by the application. /// With this method, cookies received from the Server can be extracted for sending through other /// connections, according with the URI to be accessed. /// /// See `addCookies()` for clarifications on when cookies are directly stored by the library and /// when not. /// /// # Parameters /// /// * `uri`: the URI to which the cookies should be sent, or `None`. /// /// # Returns /// /// A list with the various cookies that can be sent in a HTTP request for the specified URI. /// If a `None` URI was supplied, all available non-expired cookies will be returned. pub fn get_cookies(_uri: Option<&str>) -> Cookie { // Implementation for get_cookies unimplemented!() } /// Returns a list containing the `ClientListener` instances that were added to this client. /// /// # Returns /// /// A list containing the listeners that were added to this client. /// /// See also `addListener()` pub fn get_listeners(&self) -> &Vec> { &self.listeners } /// Inquiry method that gets the current client status and transport (when applicable). /// /// # Returns /// /// The current client status. It can be one of the following values: /// /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection; /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server /// and is currently verifying if a streaming connection is possible; /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active; /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active; /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress; /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress; /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for /// longer than a configured time; /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened /// (possibly after a timeout); /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened /// as soon as possible, as an attempt to recover the current session after a connection issue; /// - `"DISCONNECTED"`: no connection is currently active. /// /// See also `ClientListener.onStatusChange()` pub fn get_status(&self) -> &ClientStatus { &self.status } /// Inquiry method that returns a list containing all the `Subscription` instances that are /// currently "active" on this `LightstreamerClient`. /// /// Internal second-level `Subscription` are not included. /// /// # Returns /// /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`. /// The list can be empty. /// /// See also `subscribe()` pub fn get_subscriptions(&self) -> &Vec { &self.subscriptions } /// Creates a new instance of `LightstreamerClient`. /// /// The constructor initializes the client with the server address and adapter set, if provided. /// It sets up the connection details and options for the client. If no server address or /// adapter set is specified, those properties on the client will be `None`. This allows /// for late configuration of these details before connecting to the Lightstreamer server. /// /// # Arguments /// * `server_address` - An optional reference to a string slice that represents the server /// address to connect to. If `None`, the server address must be set later. /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name. /// If `None`, the adapter set must be set later. /// /// # Returns /// A result containing the new `LightstreamerClient` instance if successful, or an /// `IllegalStateException` if the initialization fails due to invalid state conditions. /// /// # Panics /// Does not panic under normal circumstances. However, unexpected internal errors during /// the creation of internal components could cause panics, which should be considered when /// using this function in production code. /// /// # Example /// ``` /// // Example usage of `new` to create a LightstreamerClient with specified server address and /// // adapter set. /// let server_address = Some("http://myserver.com"); /// let adapter_set = Some("MY_ADAPTER_SET"); /// let ls_client = LightstreamerClient::new(server_address, adapter_set); /// /// assert!(ls_client.is_ok()); /// if let Ok(client) = ls_client { /// assert_eq!(client.server_address.unwrap(), "http://myserver.com".to_string()); /// assert_eq!(client.adapter_set.unwrap(), "MY_ADAPTER_SET".to_string()); /// } /// ``` pub fn new( server_address: Option<&str>, adapter_set: Option<&str>, username: Option<&str>, password: Option<&str>, ) -> Result> { let connection_details = ConnectionDetails::new(server_address, adapter_set, username, password)?; let connection_options = ConnectionOptions::default(); Ok(LightstreamerClient { server_address: server_address.map(|s| s.to_string()), adapter_set: adapter_set.map(|s| s.to_string()), connection_details, connection_options, listeners: Vec::new(), subscriptions: Vec::new(), status: ClientStatus::Disconnected(DisconnectionType::WillRetry), logging: LogType::StdLogs, }) } /// Removes a listener from the `LightstreamerClient` instance so that it will not receive /// events anymore. /// /// A listener can be removed at any time. /// /// # Parameters /// /// * `listener`: The listener to be removed. /// /// See also `addListener()` pub fn remove_listener(&mut self, _listener: Box) { unimplemented!("Implement mechanism to remove listener from LightstreamerClient"); //self.listeners.remove(&listener); } /// Operation method that sends a message to the Server. The message is interpreted and handled /// by the Metadata Adapter associated to the current Session. This operation supports in-order /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed /// to arrive exactly once and respecting the original order, whatever is the underlying transport /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if /// necessary, to reduce network round trips. /// /// Upon subsequent calls to the method, the sequential management of the involved messages is /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are /// issued. /// /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport), /// it will be resent; however, this may cause the subsequent messages to be delayed. For this /// reason, each message can specify a "delayTimeout", which is the longest time the message, /// after reaching the Server, can be kept waiting if one of more preceding messages haven't /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded; /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`. /// Note that, because of the parallel transport of the messages, if a zero or very low timeout /// is set for a message and the previous message was sent immediately before, it is possible /// that the latter gets discarded even if no communication issues occur. The Server may also /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for /// long time. /// /// Sequence identifiers can also be associated with the messages. In this case, the sequential /// management is restricted to all subsets of messages with the same sequence identifier associated. /// /// Notifications of the operation outcome can be received by supplying a suitable listener. The /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence /// are guaranteed to be invoked sequentially. /// /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate /// processing is guaranteed, while strict ordering and even sequentialization of the processing /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However, /// messages that, for any reason, should fail to reach the Server whereas subsequent messages /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that /// the listener eventually gets a notification. /// /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget" /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages /// are performed at all, so as to optimize the processing and allow the highest possible throughput. /// /// Since a message is handled by the Metadata Adapter associated to the current connection, a /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected` /// flag is specified it is possible to call the method at any time and the client will take /// care of sending the message as soon as a connection is available, otherwise, if the current /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()` /// event will be fired. /// /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected` /// flag set to `true`. /// /// Also note that forwarding of the message to the server is made in a separate thread, hence, /// if a message is sent while the connection is active, it could be aborted because of a subsequent /// disconnection. In the same way a message sent while the connection is not active might be /// sent because of a subsequent connection. /// /// # Parameters /// /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter /// associated to the current connection. /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier /// is supplied, the message will be processed in the special way described above. The parameter /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name. /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured /// timeout on missing messages, the latter will be used instead. The parameter is optional; if /// a negative value is supplied, the Server configured timeout on missing messages will be applied. /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side /// timeout on missing messages still applies. /// * `listener`: an object suitable for receiving notifications about the processing outcome. The /// parameter is optional; if not supplied, no notification will be available. /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected /// status when the provided message is handled, then the message is not aborted right away but /// is queued waiting for a new session. Note that the message can still be aborted later when /// a new session is established. pub fn send_message( &mut self, message: &str, sequence: Option<&str>, _delay_timeout: Option, listener: Option>, enqueue_while_disconnected: bool, ) { let _sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES"); // Handle the message based on the current connection status match &self.status { ClientStatus::Connected(_connection_type) => { // Send the message to the server in a separate thread // ... } ClientStatus::Disconnected(_disconnection_type) => { if enqueue_while_disconnected { // Enqueue the message to be sent when a connection is available // ... } else { // Abort the message and notify the listener if let Some(listener) = listener { listener.on_abort(message, false); } } } _ => { // Enqueue the message to be sent when a connection is available // ... } } unimplemented!("Complete mechanism to send message to LightstreamerClient."); } /// Static method that permits to configure the logging system used by the library. The logging /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any /// third-party logging system. /// /// If no logging system is specified, all the generated log is discarded. /// /// The following categories are available to be consumed: /// /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged. /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO /// level, requests are logged; at DEBUG level, request details and events from the Server are logged. /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events /// are logged; at DEBUG level, lifecycle event details are logged. /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged. /// - `lightstreamer.actions`: logs settings / API calls. /// /// # Parameters /// /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the /// library classes. pub fn set_logger_provider() { unimplemented!("Implement mechanism to set logger provider for LightstreamerClient."); } /* pub fn set_logger_provider(provider: LoggerProvider) { // Implementation for set_logger_provider } */ /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to /// accept untrusted ones. /// /// May be called only once before creating any `LightstreamerClient` instance. /// /// # Parameters /// /// * `factory`: an instance of `ssl.SSLContext` /// /// # Raises /// /// * `IllegalArgumentException`: if the factory is `None` /// * `IllegalStateException`: if a factory is already installed pub fn set_trust_manager_factory() { unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient."); } /* pub fn set_trust_manager_factory(factory: Option) -> Result<(), IllegalArgumentException> { if factory.is_none() { return Err(IllegalArgumentException::new( "Factory cannot be None", )); } // Implementation for set_trust_manager_factory Ok(()) } */ /// Operation method that adds a `Subscription` to the list of "active" Subscriptions. The `Subscription` /// cannot already be in the "active" state. /// /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon /// as there is a session available). Active `Subscription` are automatically persisted across different /// sessions as long as a related unsubscribe call is not issued. /// /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription` /// immediately enters the "active" state. /// /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient` /// unless it is first removed from the "active" state through a call to `unsubscribe()`. /// /// Also note that forwarding of the subscription to the server is made in a separate thread. /// /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()` /// event. /// /// # Parameters /// /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time /// values. /// /// See also `unsubscribe()` pub fn subscribe(&mut self, subscription: Subscription) { self.subscriptions.push(subscription); // Implementation for subscribe } /// Operation method that removes a `Subscription` that is currently in the "active" state. /// /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its /// items is requested to Lightstreamer Server. /// /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately /// exits the "active" state. /// /// Note that forwarding of the unsubscription to the server is made in a separate thread. /// /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event. /// /// # Parameters /// /// * `subscription`: An "active" `Subscription` object that was activated by this `LightstreamerClient` /// instance. pub fn unsubscribe(&mut self, _subscription: Subscription) { unimplemented!("Implement mechanism to unsubscribe from LightstreamerClient."); } /* pub fn unsubscribe(&mut self, subscription: &Subscription) { if let Some(index) = self.subscriptions.iter().position(|s| s == subscription) { self.subscriptions.remove(index); // Implementation for unsubscribe } } */ /// Method setting enum for the logging of this instance. /// /// Default logging type is StdLogs, corresponding to `stdout` /// /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate. /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber /// and its configuration and formatting. /// /// # Parameters /// /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance. pub fn set_logging_type(&mut self, logging: LogType) { self.logging = logging; } /// Method for logging messages /// /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout. /// /// # Parameters /// /// * `loglevel` Enum determining use of stdout or Tracing subscriber. pub fn make_log(&mut self, loglevel: Level, log: &str) { match self.logging { LogType::StdLogs => { println!("{}", log); } LogType::TracingLogs => match loglevel { Level::INFO => { info!(log); } Level::WARN => { warn!(log); } Level::ERROR => { error!(log); } Level::TRACE => { trace!(log); } Level::DEBUG => { debug!(log); } }, } } } /// The transport type to be used by the client. /// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will /// only use WebSocket based connections. If a connection over WebSocket is not possible /// because of the environment the client will not connect at all. /// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client /// will only use HTTP based connections. If a connection over HTTP is not possible because /// of the environment the client will not connect at all. /// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect /// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of /// the environment the client will not connect at all. /// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only /// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the /// browser/environment the client will not connect at all. /// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect /// on Polling over WebSocket. If Polling over WebSocket is not possible because of the /// environment the client will not connect at all. /// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect /// on Polling over HTTP. If Polling over HTTP is not possible because of the environment /// the client will not connect at all. #[derive(Debug, PartialEq)] pub enum Transport { Ws, Http, WsStreaming, HttpStreaming, WsPolling, HttpPolling, }