diff options
author | 2024-03-30 20:59:54 +0100 | |
---|---|---|
committer | 2024-03-30 20:59:58 +0100 | |
commit | 7af7a7626a8e83fe3f9c3b0d2ad7d2b32da41d45 (patch) | |
tree | 319b9976f6274f42e748d1b501229bb219584f66 | |
parent | c6745a22e79f0556a19b0d44a181fb9d8ed78f90 (diff) |
WARNING: unstable commit.
🔧 Update .gitignore to exclude .vscode directory
✨ Add futures-util and url dependencies to Cargo.toml
♻️ Refactor error handling into separate error module in Rust project
💡 Add get_password method documentation in connection_details.rs
♻️ Replace String with Transport enum for forced_transport in connection_options.rs
✨ Implement WebSocket connection logic in ls_client.rs with async support
✨ Add ClientStatus, ConnectionType, and DisconnectionType enums to manage client states in ls_client.rs
✨ (main.rs): add Transport enum to LightstreamerClient imports for WebSocket support
♻️ (main.rs): refactor signal handling to use SharedState struct for clean shutdown
✨ (main.rs): implement AtomicBool for graceful disconnect handling
📝 (main.rs): update comments to reflect new signal handling logic
✨ (main.rs): set forced transport to WebSocket streaming in Lightstreamer client options
✨ (util.rs): create new util module with clean_message function for message sanitization
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/connection_details.rs | 20 | ||||
-rw-r--r-- | src/connection_options.rs | 29 | ||||
-rw-r--r-- | src/error.rs | 70 | ||||
-rw-r--r-- | src/lib.rs | 49 | ||||
-rw-r--r-- | src/ls_client.rs | 339 | ||||
-rw-r--r-- | src/main.rs | 213 | ||||
-rw-r--r-- | src/util.rs | 4 |
9 files changed, 457 insertions, 272 deletions
@@ -12,3 +12,6 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +# Visual Studio Code +.vscode @@ -13,6 +13,7 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme" [dependencies] cookie = { version = "0", features = ["percent-encode"]} futures = "0" +futures-util = "0" hyper = { version = "1", features = ["full"] } reqwest = { version = "0", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } @@ -21,3 +22,4 @@ serde_urlencoded = "0" signal-hook = "0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-tungstenite = { version = "0", features = ["native-tls"] } +url = "2" diff --git a/src/connection_details.rs b/src/connection_details.rs index 06b6d15..228a597 100644 --- a/src/connection_details.rs +++ b/src/connection_details.rs @@ -1,7 +1,5 @@ -use hyper::server; - use crate::client_listener::ClientListener; -use crate::IllegalArgumentException; +use crate::error::IllegalArgumentException; use std::fmt::{self, Debug, Formatter}; @@ -64,6 +62,22 @@ impl ConnectionDetails { self.client_ip.as_ref() } + /// Retrieves a reference to the password, if set. + /// + /// This method is crucial for accessing sensitive information in a controlled manner. It returns + /// an immutable reference to the password, encapsulated within an `Option`. The use of `Option` + /// signifies that the password may or may not be present, thus providing flexibility in scenarios + /// where a password is optional. By returning a reference, we avoid unnecessary cloning of the + /// password data, which could have security implications and also incur a performance cost. + /// + /// # Returns + /// An `Option` containing a reference to the password `String` if it exists, or `None` if the + /// password has not been set. This allows calling code to handle the presence or absence of a + /// password appropriately without risking exposure of the password itself. + pub fn get_password(&self) -> Option<&String> { + self.password.as_ref() + } + /// Inquiry method that gets the configured address of Lightstreamer Server. /// /// # Returns diff --git a/src/connection_options.rs b/src/connection_options.rs index c143fb9..e3218df 100644 --- a/src/connection_options.rs +++ b/src/connection_options.rs @@ -1,9 +1,9 @@ +use crate::error::IllegalArgumentException; +use crate::ls_client::Transport; use crate::proxy::Proxy; -use crate::IllegalArgumentException; use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; -use std::hash::DefaultHasher; /// Used by LightstreamerClient to provide an extra connection properties data object. /// Data struct that contains the policy settings used to connect to a Lightstreamer Server. @@ -13,7 +13,7 @@ use std::hash::DefaultHasher; pub struct ConnectionOptions { content_length: Option<u64>, first_retry_max_delay: u64, - forced_transport: Option<String>, + forced_transport: Option<Transport>, http_extra_headers: Option<HashMap<String, String>>, http_extra_headers_on_session_creation_only: bool, idle_timeout: u64, @@ -97,7 +97,7 @@ impl ConnectionOptions { /// The forced transport or `None` /// /// See also `setForcedTransport()` - pub fn get_forced_transport(&self) -> Option<&String> { + pub fn get_forced_transport(&self) -> Option<&Transport> { self.forced_transport.as_ref() } @@ -423,25 +423,8 @@ impl ConnectionOptions { /// # Raises /// /// * `IllegalArgumentException`: if the given value is not in the list of the admitted ones. - pub fn set_forced_transport(&mut self, forced_transport: Option<String>) -> Result<(), IllegalArgumentException> { - let valid_transports = vec![ - None, - Some("WS".to_string()), - Some("HTTP".to_string()), - Some("WS-STREAMING".to_string()), - Some("HTTP-STREAMING".to_string()), - Some("WS-POLLING".to_string()), - Some("HTTP-POLLING".to_string()), - ]; - - if !valid_transports.contains(&forced_transport) { - return Err(IllegalArgumentException::new( - "Invalid forced transport value", - )); - } - + pub fn set_forced_transport(&mut self, forced_transport: Option<Transport>) { self.forced_transport = forced_transport; - Ok(()) } /// Setter method that enables/disables the setting of extra HTTP headers to all the request @@ -1157,4 +1140,4 @@ impl Default for ConnectionOptions { supported_diffs: None, } } -}
\ No newline at end of file +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..9c3a916 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,70 @@ +use std::fmt; +use std::error::Error; + +#[derive(Debug)] +pub struct IllegalArgumentException(String); + +impl IllegalArgumentException { + pub fn new(msg: &str) -> IllegalArgumentException { + IllegalArgumentException(msg.to_string()) + } +} + +impl fmt::Display for IllegalArgumentException { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Error for IllegalArgumentException { + fn description(&self) -> &str { + &self.0 + } +} + +impl From<Box<dyn Error>> for IllegalArgumentException { + fn from(error: Box<dyn Error>) -> Self { + IllegalArgumentException::new(&error.to_string()) + } +} + +#[derive(Debug)] +pub struct IllegalStateException { + details: String +} + +impl IllegalStateException { + pub fn new(msg: &str) -> IllegalStateException { + IllegalStateException{details: msg.to_string()} + } +} + +impl fmt::Display for IllegalStateException { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f,"{}",self.details) + } +} + +impl Error for IllegalStateException { + fn description(&self) -> &str { + &self.details + } +} + +impl From<Box<dyn Error>> for IllegalStateException { + fn from(error: Box<dyn Error>) -> Self { + IllegalStateException::new(&error.to_string()) + } +} + +impl From<serde_urlencoded::ser::Error> for IllegalStateException { + fn from(err: serde_urlencoded::ser::Error) -> Self { + IllegalStateException::new(&format!("Serialization error: {}", err)) + } +} + +impl From<tokio_tungstenite::tungstenite::Error> for IllegalStateException { + fn from(err: tokio_tungstenite::tungstenite::Error) -> Self { + IllegalStateException::new(&format!("WebSocket error: {}", err)) + } +}
\ No newline at end of file @@ -1,8 +1,6 @@ -use std::fmt; -use std::error::Error; - pub mod client_listener; pub mod client_message_listener; +pub mod error; pub mod item_update; pub mod subscription_listener; pub mod connection_details; @@ -10,47 +8,4 @@ pub mod connection_options; pub mod ls_client; pub mod proxy; pub mod subscription; - -#[derive(Debug)] -pub struct IllegalArgumentException(String); - -impl IllegalArgumentException { - pub fn new(msg: &str) -> IllegalArgumentException { - IllegalArgumentException(msg.to_string()) - } -} - -impl fmt::Display for IllegalArgumentException { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Error for IllegalArgumentException { - fn description(&self) -> &str { - &self.0 - } -} - -#[derive(Debug)] -pub struct IllegalStateException { - details: String -} - -impl IllegalStateException { - pub fn new(msg: &str) -> IllegalStateException { - IllegalStateException{details: msg.to_string()} - } -} - -impl fmt::Display for IllegalStateException { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f,"{}",self.details) - } -} - -impl Error for IllegalStateException { - fn description(&self) -> &str { - &self.details - } -}
\ No newline at end of file +pub mod util; diff --git a/src/ls_client.rs b/src/ls_client.rs index 59dd247..26328f7 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -3,10 +3,42 @@ use crate::client_message_listener::ClientMessageListener; use crate::connection_details::ConnectionDetails; use crate::connection_options::ConnectionOptions; use crate::subscription::Subscription; -use crate::IllegalStateException; +use crate::error::IllegalStateException; +use crate::util::*; use cookie::Cookie; +use futures_util::{SinkExt, StreamExt}; +use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{ + http::{HeaderName, HeaderValue, Request}, + Message, + }, +}; +use url::Url; + +/// Represents the current status of the `LightstreamerClient`. +pub enum ClientStatus { + Connecting, + Connected(ConnectionType), + Stalled, + Disconnected(DisconnectionType), +} + +pub enum ConnectionType { + HttpPolling, + HttpStreaming, + StreamSensing, + WsPolling, + WsStreaming, +} + +pub enum DisconnectionType { + WillRetry, + TryingRecovery, +} /// Facade class for the management of the communication to Lightstreamer Server. Used to provide /// configuration settings, event handlers, operations for the control of the connection lifecycle, @@ -46,7 +78,10 @@ use std::fmt::{self, Debug, Formatter}; /// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()` /// for details. pub struct LightstreamerClient { + /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect. server_address: Option<String>, + /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all + /// requests in the Session associated with this `LightstreamerClient`. adapter_set: Option<String>, /// Data object that contains the details needed to open a connection to a Lightstreamer Server. /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties @@ -56,8 +91,13 @@ pub struct LightstreamerClient { /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object /// can be overwritten by values received from a Lightstreamer Server. pub connection_options: ConnectionOptions, + /// A list of listeners that will receive events from the `LightstreamerClient` instance. listeners: Vec<Box<dyn ClientListener>>, + /// A list containing all the `Subscription` instances that are currently "active" on this + /// `LightstreamerClient`. subscriptions: Vec<Subscription>, + /// The current status of the client. + status: ClientStatus, } impl Debug for LightstreamerClient { @@ -147,11 +187,239 @@ impl LightstreamerClient { /// See also `ClientListener.onStatusChange()` /// /// See also `ConnectionDetails.setServerAddress()` - pub fn connect(&mut self) -> Result<(), IllegalStateException> { + pub async fn connect(&mut self) -> Result<(), IllegalStateException> { if self.server_address.is_none() { - return Err(IllegalStateException::new("No server address was configured.")); + return Err(IllegalStateException::new( + "No server address was configured.", + )); + } + + let forced_transport = self.connection_options.get_forced_transport(); + if forced_transport.is_none() || *forced_transport.unwrap() != Transport::WsStreaming { + // unwrap() is safe here. + return Err(IllegalStateException::new( + "Only WebSocket streaming transport is currently supported.", + )); + } + + let mut params = HashMap::new(); + + // + // Build the mandatory request parameters. + // + + params.insert("LS_protocol", "TLCP-2.5.0"); + params.insert("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"); + + // + // Add optional parameters + // + + /* + + if let Some(user) = &self.connection_details.get_user() { + params.insert("LS_user", user); + } + + if let Some(password) = &self.connection_details.get_password() { + params.insert("LS_password", password); + } + + if let Some(adapter_set) = &self.adapter_set { + params.insert("LS_adapter_set", adapter_set); + } + + if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() { + params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string()); + } + + if let Some(content_length) = self.connection_options.get_content_length() { + params.insert("LS_content_length", &content_length.to_string()); + } + + if let Some(supported_diffs) = &self.connection_options.get_supported_diffs() { + params.insert("LS_supported_diffs", supported_diffs); + } + + if self.connection_options.is_polling() { + params.insert("LS_polling", "true"); + if let Some(polling_millis) = self.connection_options.get_polling_millis() { + params.insert("LS_polling_millis", &polling_millis.to_string()); + } + + if let Some(idle_millis) = self.connection_options.get_idle_millis() { + params.insert("LS_idle_millis", &idle_millis.to_string()); + } + } else { + if let Some(inactivity_millis) = self.connection_options.get_inactivity_millis() { + params.insert("LS_inactivity_millis", &inactivity_millis.to_string()); + } + + if let Some(keepalive_millis) = self.connection_options.get_keepalive_millis() { + params.insert("LS_keepalive_millis", &keepalive_millis.to_string()); + } + + if !self.connection_options.is_send_sync() { + params.insert("LS_send_sync", "false"); + } + } + + if self.connection_options.is_reduce_head() { + params.insert("LS_reduce_head", "true"); + } + + if let Some(ttl_millis) = self.connection_options.get_ttl_millis() { + params.insert("LS_ttl_millis", &ttl_millis.to_string()); + } + + // Build the request body + let request_body = build_request_body(¶ms); + + // Send the create session request + let response = send_create_session_request( + &self.server_address.as_ref().unwrap(), + &request_body, + )?; + + // Process the server response + process_create_session_response(&response)?; + + */ + + // + // Convert the HTTP URL to a WebSocket URL. + // + let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here. + let mut url = Url::parse(&http_url) + .expect("Failed to parse server address URL from connection details."); + match url.scheme() { + "http" => url + .set_scheme("ws") + .expect("Failed to set scheme to ws for WebSocket URL."), + "https" => url + .set_scheme("wss") + .expect("Failed to set scheme to wss for WebSocket URL."), + invalid_scheme => { + return Err(IllegalStateException::new(&format!( + "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.", + invalid_scheme + ))); + } + } + let ws_url = url.as_str(); + + // Build the WebSocket request with the necessary headers. + let request = Request::builder() + .uri(ws_url) + .header( + HeaderName::from_static("connection"), + HeaderValue::from_static("Upgrade"), + ) + .header( + HeaderName::from_static("host"), + HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| { + IllegalStateException::new(&format!("Invalid header value for header with name 'host': {}", err)) + })?, + ) + .header( + HeaderName::from_static("sec-websocket-key"), + HeaderValue::from_static("PNDUibe9ex7PnsrLbt0N4w=="), + ) + .header( + HeaderName::from_static("sec-websocket-protocol"), + HeaderValue::from_static("TLCP-2.5.0.lightstreamer.com"), + ) + .header( + HeaderName::from_static("sec-websocket-version"), + HeaderValue::from_static("13"), + ) + .header( + HeaderName::from_static("upgrade"), + HeaderValue::from_static("websocket"), + ) + .body(()) + .unwrap(); + + // Connect to the Lightstreamer server using WebSocket. + let ws_stream = match connect_async(request).await { + Ok((ws_stream, response)) => { + if let Some(server_header) = response.headers().get("server") { + println!("Connected to Lightstreamer server: {}", server_header.to_str().unwrap_or("")); + } else { + println!("Connected to Lightstreamer server"); + } + ws_stream + }, + Err(err) => { + return Err(IllegalStateException::new(&format!( + "Failed to connect to Lightstreamer server with WebSocket: {}", + err + ))); + } + }; + + // Split the WebSocket stream into a write and read stream. + let (mut write_stream, mut read_stream) = ws_stream.split(); + + // + // Confirm the connection by sending a 'wsok' message to the server. + // + write_stream.send(Message::Text("wsok".into())).await.expect("Failed to send message"); + if let Some(result) = read_stream.next().await { + match result? { + Message::Text(text) => { + let clean_text = clean_message(&text); + if clean_text == "wsok" { + println!("Connection confirmed by server"); + } else { + return Err(IllegalStateException::new(&format!( + "Unexpected message received from server: {}", + clean_text + ))); + } + }, + non_text_message => { + println!("Unexpected non-text message from server: {:?}", non_text_message); + }, + } + } + + /* + // Session creation parameters + let params = [ + ("LS_op2", "create_session"), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"), + ("LS_adapter_set", "DEMO"), + ]; + + let encoded_params = serde_urlencoded::to_string(¶ms)?; + + // Send the create session message + write_stream + .send(Message::Text(format!("{}\n", encoded_params))) + .await?; + */ + + // Listen for messages from the server + while let Some(message) = read_stream.next().await { + match message? { + Message::Text(text) => { + if text.starts_with("CONOK") { + let session_info: Vec<&str> = text.split(",").collect(); + let session_id = session_info.get(1).unwrap_or(&"").to_string(); + println!("Session established with ID: {}", session_id); + //subscribe_to_channel_ws(session_id, write_stream).await?; + break; // Exit after successful subscription + } else { + println!("Received unexpected message from server: {}", text); + } + } + msg => { println!("Received non-text message from server: {:?}", msg); } + } } + println!("No more messages from server"); + Ok(()) } @@ -173,6 +441,7 @@ impl LightstreamerClient { /// See also `connect()` pub fn disconnect(&mut self) { // Implementation for disconnect + println!("Disconnecting from Lightstreamer server"); } /// Static inquiry method that can be used to share cookies between connections to the Server @@ -229,9 +498,8 @@ impl LightstreamerClient { /// - `"DISCONNECTED"`: no connection is currently active. /// /// See also `ClientListener.onStatusChange()` - pub fn get_status(&self) -> &str { - // Implementation for get_status - unimplemented!() + pub fn get_status(&self) -> &ClientStatus { + &self.status } /// Inquiry method that returns a list containing all the `Subscription` instances that are @@ -291,7 +559,7 @@ impl LightstreamerClient { ) -> Result<LightstreamerClient, IllegalStateException> { let connection_details = ConnectionDetails::new(server_address, adapter_set); let connection_options = ConnectionOptions::new(); - + Ok(LightstreamerClient { server_address: server_address.map(|s| s.to_string()), adapter_set: adapter_set.map(|s| s.to_string()), @@ -299,6 +567,7 @@ impl LightstreamerClient { connection_options, listeners: Vec::new(), subscriptions: Vec::new(), + status: ClientStatus::Disconnected(DisconnectionType::WillRetry), }) } @@ -401,7 +670,30 @@ impl LightstreamerClient { listener: Option<Box<dyn ClientMessageListener>>, enqueue_while_disconnected: bool, ) { - // Implementation for send_message + let sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES"); + + // Handle the message based on the current connection status + match &self.status { + ClientStatus::Connected(connection_type) => { + // Send the message to the server in a separate thread + // ... + } + ClientStatus::Disconnected(disconnection_type) => { + if enqueue_while_disconnected { + // Enqueue the message to be sent when a connection is available + // ... + } else { + // Abort the message and notify the listener + if let Some(listener) = listener { + listener.on_abort(message, false); + } + } + } + _ => { + // Enqueue the message to be sent when a connection is available + // ... + } + } } /// Static method that permits to configure the logging system used by the library. The logging @@ -521,4 +813,33 @@ impl LightstreamerClient { } } */ -}
\ No newline at end of file +} + +/// The transport type to be used by the client. +/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will +/// only use WebSocket based connections. If a connection over WebSocket is not possible +/// because of the environment the client will not connect at all. +/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client +/// will only use HTTP based connections. If a connection over HTTP is not possible because +/// of the environment the client will not connect at all. +/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect +/// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of +/// the environment the client will not connect at all. +/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only +/// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the +/// browser/environment the client will not connect at all. +/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect +/// on Polling over WebSocket. If Polling over WebSocket is not possible because of the +/// environment the client will not connect at all. +/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect +/// on Polling over HTTP. If Polling over HTTP is not possible because of the environment +/// the client will not connect at all. +#[derive(Debug, PartialEq)] +pub enum Transport { + Ws, + Http, + WsStreaming, + HttpStreaming, + WsPolling, + HttpPolling, +} diff --git a/src/main.rs b/src/main.rs index 1c4b05a..87f9da4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use hyper::client; use lightstreamer_client::item_update::ItemUpdate; -use lightstreamer_client::ls_client::LightstreamerClient; +use lightstreamer_client::ls_client::{LightstreamerClient, Transport}; use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode}; use lightstreamer_client::subscription_listener::SubscriptionListener; @@ -11,10 +11,16 @@ use serde_urlencoded; use signal_hook::low_level::signal_name; use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals}; use std::error::Error; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +struct SharedState { + client: Arc<Mutex<LightstreamerClient>>, + should_disconnect: Arc<AtomicBool>, +} + /// Sets up a signal hook for SIGINT and SIGTERM. /// /// Creates a signal hook for the specified signals and spawns a thread to handle them. @@ -29,7 +35,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; /// /// The function panics if it fails to create the signal iterator. /// -async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) { +async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) { // Create a signal set of signals to be handled and a signal iterator to monitor them. let signals = &[SIGINT, SIGTERM]; let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator"); @@ -41,198 +47,18 @@ async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) { // // Clean up and prepare to exit... // ... - let mut client = client.lock().await; - client.disconnect(); - - // Exit with 0 code to indicate orderly shutdown. - std::process::exit(0); - } - }); -} - -async fn establish_persistent_http_connection( - session_id_shared: Arc<Mutex<String>>, -) -> Result<(), reqwest::Error> { - let client = Client::new(); - let params = [ - ("LS_adapter_set", "DEMO"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), - ]; - let request_url = - "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0"; - - let response = client.post(request_url).form(¶ms).send().await?; - - if response.status().is_success() { - let mut stream = response.bytes_stream(); - - while let Some(item) = stream.next().await { - match item { - Ok(bytes) => { - let response_text = String::from_utf8(bytes.to_vec()) - .expect("Failed to convert bytes to string"); - if let Some(start) = response_text.find("CONOK,") { - if let Some(end) = response_text.find(",50000,5000,*\r\n") { - let session_id = &response_text[start + 6..end]; - println!("Session ID: {}", session_id); - let mut session_id_lock = session_id_shared.lock().await; - *session_id_lock = session_id.to_string(); - } - } else { - println!("New message: {}", response_text); - } - } - Err(e) => println!("Error while receiving: {:?}", e), - } - } - } else { - println!("Response was not successful: {}", response.status()); - } - - Ok(()) -} - -/* -// Establish a persistent WebSocket connection and handle the session creation -async fn establish_persistent_ws_connection( - session_id_shared: Arc<Mutex<String>>, -) -> Result<(), Box<dyn Error>> { - let ws_url = "wss://push.lightstreamer.com/lightstreamer"; - - let (ws_stream, _) = tokio_tungstenite::connect_async_with_config( - tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned())) - ).await.expect("Failed to connect"); - - let (mut write, mut read) = ws_stream.split(); - - // Session creation parameters - let params = [ - ("LS_op2", "create_session"), - ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"), - ("LS_adapter_set", "DEMO"), - ]; - - let encoded_params = serde_urlencoded::to_string(¶ms)?; - - // Send the create session message - write - .send(Message::Text(format!("{}\n", encoded_params))) - .await?; - - // Listen for messages from the server - while let Some(message) = read.next().await { - match message? { - Message::Text(text) => { - if text.starts_with("CONOK") { - let session_info: Vec<&str> = text.split(",").collect(); - let session_id = session_info.get(1).unwrap_or(&"").to_string(); - *session_id_shared.lock().await = session_id.clone(); - println!("Session established with ID: {}", session_id); - subscribe_to_channel_ws(session_id, write).await?; - break; // Exit after successful subscription - } - } - _ => {} - } - } - - Ok(()) -} -*/ - -async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> { - let client = Client::new(); - //let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt"; - //let params = [("LS_session", &session_id)]; - let subscribe_url = - "http://push.lightstreamer.com/lightstreamer/control.txt?LS_protocol=TLCP-2.0.0"; - let params = [ - ("LS_session", &session_id), - ("LS_op", &"add".to_string()), - ("LS_subId", &"1".to_string()), - ("LS_data_adapter", &"CHAT_ROOM".to_string()), - ("LS_group", &"chat_room".to_string()), - ("LS_schema", &"timestamp message".to_string()), - ("LS_mode", &"DISTINCT".to_string()), - ("LS_reqId", &"1".to_string()), - ]; - - let response = client.post(subscribe_url).form(¶ms).send().await?; - - if response.status().is_success() { - println!("Subscription successful!"); - } else { - println!("Subscription failed: {}", response.status()); - } - - Ok(()) -} - -// Function to subscribe to a channel using WebSocket -async fn subscribe_to_channel_ws( - session_id: String, - mut write: futures::stream::SplitSink< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, - >, - tokio_tungstenite::tungstenite::protocol::Message, - >, -) -> Result<(), Box<dyn Error>> { - // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set - let sub_params = [ - ("LS_table", "1"), - ("LS_op2", "add"), - ("LS_session", &session_id), - ("LS_id", "item1"), - ("LS_schema", "stock_name last_price"), - ("LS_mode", "MERGE"), - ]; - - let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?; - - // Send the subscription message - write.send(Message::Text(encoded_sub_params)).await?; - - println!("Subscribed to channel with session ID: {}", session_id); - - Ok(()) -} - -/* -#[tokio::main] -async fn main() -> Result<(), Box<dyn Error>> { - - let session_id_shared = Arc::new(Mutex::new(String::new())); - let session_id_shared_clone = session_id_shared.clone(); - - let task1 = tokio::spawn(async move { - establish_persistent_http_connection(session_id_shared_clone).await.unwrap(); - }); - - println!("Established connection to Lightstreamer server"); - let task2 = tokio::spawn(async move { - let mut session_established = false; - loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let session_id; { - session_id = session_id_shared.lock().await.clone(); + let shared_state = shared_state.lock().await; + shared_state.should_disconnect.store(true, Ordering::Relaxed); + let mut client = shared_state.client.lock().await; + client.disconnect(); } - if !session_established && !session_id.is_empty() { - println!("Accessed Session ID from another thread: {}", session_id); - session_established = true; - subscribe_to_channel(session_id).await.unwrap(); - } + // Exit with 0 code to indicate orderly shutdown. + std::process::exit(0); } }); - - task1.await?; - task2.await?; - - Ok(()) } -*/ pub struct MySubscriptionListener {} @@ -277,10 +103,17 @@ async fn main() -> Result<(), Box<dyn Error>> { { let mut client = client.lock().await; client.subscribe(my_subscription); + client.connection_options.set_forced_transport(Some(Transport::WsStreaming)); } + let should_disconnect = Arc::new(AtomicBool::new(false)); + let shared_state = Arc::new(Mutex::new(SharedState { + client: client.clone(), + should_disconnect: should_disconnect.clone(), + })); + // Spawn a new thread to handle SIGINT and SIGTERM process signals. - setup_signal_hook(client.clone()).await; + setup_signal_hook(shared_state).await; // // Infinite loop that will indefinitely retry failed connections unless @@ -290,7 +123,7 @@ async fn main() -> Result<(), Box<dyn Error>> { let mut retry_counter: u64 = 0; loop { let mut client = client.lock().await; - match client.connect() { + match client.connect().await { Ok(_) => {} Err(e) => { println!("Failed to connect: {:?}", e); diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..8de27e3 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,4 @@ +/// Clean the message from newlines and carriage returns and convert it to lowercase. +pub fn clean_message(text: &str) -> String { + text.replace("\n", "").replace("\r", "").to_lowercase() +}
\ No newline at end of file |