WARNING: unstable commit.
🔧 Update .gitignore to exclude .vscode directory ✨ Add futures-util and url dependencies to Cargo.toml ♻️ Refactor error handling into separate error module in Rust project 💡 Add get_password method documentation in connection_details.rs ♻️ Replace String with Transport enum for forced_transport in connection_options.rs ✨ Implement WebSocket connection logic in ls_client.rs with async support ✨ Add ClientStatus, ConnectionType, and DisconnectionType enums to manage client states in ls_client.rs ✨ (main.rs): add Transport enum to LightstreamerClient imports for WebSocket support ♻️ (main.rs): refactor signal handling to use SharedState struct for clean shutdown ✨ (main.rs): implement AtomicBool for graceful disconnect handling 📝 (main.rs): update comments to reflect new signal handling logic ✨ (main.rs): set forced transport to WebSocket streaming in Lightstreamer client options ✨ (util.rs): create new util module with clean_message function for message sanitization
This commit is contained in:
parent
c6745a22e7
commit
7af7a7626a
3
.gitignore
vendored
3
.gitignore
vendored
@ -12,3 +12,6 @@ Cargo.lock
|
||||
|
||||
# MSVC Windows builds of rustc generate these, which store debugging information
|
||||
*.pdb
|
||||
|
||||
# Visual Studio Code
|
||||
.vscode
|
||||
|
@ -13,6 +13,7 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme"
|
||||
[dependencies]
|
||||
cookie = { version = "0", features = ["percent-encode"]}
|
||||
futures = "0"
|
||||
futures-util = "0"
|
||||
hyper = { version = "1", features = ["full"] }
|
||||
reqwest = { version = "0", features = ["json", "stream"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
@ -21,3 +22,4 @@ serde_urlencoded = "0"
|
||||
signal-hook = "0"
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||
tokio-tungstenite = { version = "0", features = ["native-tls"] }
|
||||
url = "2"
|
||||
|
@ -1,7 +1,5 @@
|
||||
use hyper::server;
|
||||
|
||||
use crate::client_listener::ClientListener;
|
||||
use crate::IllegalArgumentException;
|
||||
use crate::error::IllegalArgumentException;
|
||||
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
|
||||
@ -64,6 +62,22 @@ impl ConnectionDetails {
|
||||
self.client_ip.as_ref()
|
||||
}
|
||||
|
||||
/// Retrieves a reference to the password, if set.
|
||||
///
|
||||
/// This method is crucial for accessing sensitive information in a controlled manner. It returns
|
||||
/// an immutable reference to the password, encapsulated within an `Option`. The use of `Option`
|
||||
/// signifies that the password may or may not be present, thus providing flexibility in scenarios
|
||||
/// where a password is optional. By returning a reference, we avoid unnecessary cloning of the
|
||||
/// password data, which could have security implications and also incur a performance cost.
|
||||
///
|
||||
/// # Returns
|
||||
/// An `Option` containing a reference to the password `String` if it exists, or `None` if the
|
||||
/// password has not been set. This allows calling code to handle the presence or absence of a
|
||||
/// password appropriately without risking exposure of the password itself.
|
||||
pub fn get_password(&self) -> Option<&String> {
|
||||
self.password.as_ref()
|
||||
}
|
||||
|
||||
/// Inquiry method that gets the configured address of Lightstreamer Server.
|
||||
///
|
||||
/// # Returns
|
||||
|
@ -1,9 +1,9 @@
|
||||
use crate::error::IllegalArgumentException;
|
||||
use crate::ls_client::Transport;
|
||||
use crate::proxy::Proxy;
|
||||
use crate::IllegalArgumentException;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::hash::DefaultHasher;
|
||||
|
||||
/// Used by LightstreamerClient to provide an extra connection properties data object.
|
||||
/// Data struct that contains the policy settings used to connect to a Lightstreamer Server.
|
||||
@ -13,7 +13,7 @@ use std::hash::DefaultHasher;
|
||||
pub struct ConnectionOptions {
|
||||
content_length: Option<u64>,
|
||||
first_retry_max_delay: u64,
|
||||
forced_transport: Option<String>,
|
||||
forced_transport: Option<Transport>,
|
||||
http_extra_headers: Option<HashMap<String, String>>,
|
||||
http_extra_headers_on_session_creation_only: bool,
|
||||
idle_timeout: u64,
|
||||
@ -97,7 +97,7 @@ impl ConnectionOptions {
|
||||
/// The forced transport or `None`
|
||||
///
|
||||
/// See also `setForcedTransport()`
|
||||
pub fn get_forced_transport(&self) -> Option<&String> {
|
||||
pub fn get_forced_transport(&self) -> Option<&Transport> {
|
||||
self.forced_transport.as_ref()
|
||||
}
|
||||
|
||||
@ -423,25 +423,8 @@ impl ConnectionOptions {
|
||||
/// # Raises
|
||||
///
|
||||
/// * `IllegalArgumentException`: if the given value is not in the list of the admitted ones.
|
||||
pub fn set_forced_transport(&mut self, forced_transport: Option<String>) -> Result<(), IllegalArgumentException> {
|
||||
let valid_transports = vec![
|
||||
None,
|
||||
Some("WS".to_string()),
|
||||
Some("HTTP".to_string()),
|
||||
Some("WS-STREAMING".to_string()),
|
||||
Some("HTTP-STREAMING".to_string()),
|
||||
Some("WS-POLLING".to_string()),
|
||||
Some("HTTP-POLLING".to_string()),
|
||||
];
|
||||
|
||||
if !valid_transports.contains(&forced_transport) {
|
||||
return Err(IllegalArgumentException::new(
|
||||
"Invalid forced transport value",
|
||||
));
|
||||
}
|
||||
|
||||
pub fn set_forced_transport(&mut self, forced_transport: Option<Transport>) {
|
||||
self.forced_transport = forced_transport;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setter method that enables/disables the setting of extra HTTP headers to all the request
|
||||
|
70
src/error.rs
Normal file
70
src/error.rs
Normal file
@ -0,0 +1,70 @@
|
||||
use std::fmt;
|
||||
use std::error::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IllegalArgumentException(String);
|
||||
|
||||
impl IllegalArgumentException {
|
||||
pub fn new(msg: &str) -> IllegalArgumentException {
|
||||
IllegalArgumentException(msg.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IllegalArgumentException {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for IllegalArgumentException {
|
||||
fn description(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Box<dyn Error>> for IllegalArgumentException {
|
||||
fn from(error: Box<dyn Error>) -> Self {
|
||||
IllegalArgumentException::new(&error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IllegalStateException {
|
||||
details: String
|
||||
}
|
||||
|
||||
impl IllegalStateException {
|
||||
pub fn new(msg: &str) -> IllegalStateException {
|
||||
IllegalStateException{details: msg.to_string()}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IllegalStateException {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f,"{}",self.details)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for IllegalStateException {
|
||||
fn description(&self) -> &str {
|
||||
&self.details
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Box<dyn Error>> for IllegalStateException {
|
||||
fn from(error: Box<dyn Error>) -> Self {
|
||||
IllegalStateException::new(&error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_urlencoded::ser::Error> for IllegalStateException {
|
||||
fn from(err: serde_urlencoded::ser::Error) -> Self {
|
||||
IllegalStateException::new(&format!("Serialization error: {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio_tungstenite::tungstenite::Error> for IllegalStateException {
|
||||
fn from(err: tokio_tungstenite::tungstenite::Error) -> Self {
|
||||
IllegalStateException::new(&format!("WebSocket error: {}", err))
|
||||
}
|
||||
}
|
49
src/lib.rs
49
src/lib.rs
@ -1,8 +1,6 @@
|
||||
use std::fmt;
|
||||
use std::error::Error;
|
||||
|
||||
pub mod client_listener;
|
||||
pub mod client_message_listener;
|
||||
pub mod error;
|
||||
pub mod item_update;
|
||||
pub mod subscription_listener;
|
||||
pub mod connection_details;
|
||||
@ -10,47 +8,4 @@ pub mod connection_options;
|
||||
pub mod ls_client;
|
||||
pub mod proxy;
|
||||
pub mod subscription;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IllegalArgumentException(String);
|
||||
|
||||
impl IllegalArgumentException {
|
||||
pub fn new(msg: &str) -> IllegalArgumentException {
|
||||
IllegalArgumentException(msg.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IllegalArgumentException {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for IllegalArgumentException {
|
||||
fn description(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IllegalStateException {
|
||||
details: String
|
||||
}
|
||||
|
||||
impl IllegalStateException {
|
||||
pub fn new(msg: &str) -> IllegalStateException {
|
||||
IllegalStateException{details: msg.to_string()}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IllegalStateException {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f,"{}",self.details)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for IllegalStateException {
|
||||
fn description(&self) -> &str {
|
||||
&self.details
|
||||
}
|
||||
}
|
||||
pub mod util;
|
||||
|
335
src/ls_client.rs
335
src/ls_client.rs
@ -3,10 +3,42 @@ use crate::client_message_listener::ClientMessageListener;
|
||||
use crate::connection_details::ConnectionDetails;
|
||||
use crate::connection_options::ConnectionOptions;
|
||||
use crate::subscription::Subscription;
|
||||
use crate::IllegalStateException;
|
||||
use crate::error::IllegalStateException;
|
||||
use crate::util::*;
|
||||
|
||||
use cookie::Cookie;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use tokio_tungstenite::{
|
||||
connect_async,
|
||||
tungstenite::{
|
||||
http::{HeaderName, HeaderValue, Request},
|
||||
Message,
|
||||
},
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
/// Represents the current status of the `LightstreamerClient`.
|
||||
pub enum ClientStatus {
|
||||
Connecting,
|
||||
Connected(ConnectionType),
|
||||
Stalled,
|
||||
Disconnected(DisconnectionType),
|
||||
}
|
||||
|
||||
pub enum ConnectionType {
|
||||
HttpPolling,
|
||||
HttpStreaming,
|
||||
StreamSensing,
|
||||
WsPolling,
|
||||
WsStreaming,
|
||||
}
|
||||
|
||||
pub enum DisconnectionType {
|
||||
WillRetry,
|
||||
TryingRecovery,
|
||||
}
|
||||
|
||||
/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
|
||||
/// configuration settings, event handlers, operations for the control of the connection lifecycle,
|
||||
@ -46,7 +78,10 @@ use std::fmt::{self, Debug, Formatter};
|
||||
/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
|
||||
/// for details.
|
||||
pub struct LightstreamerClient {
|
||||
/// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
|
||||
server_address: Option<String>,
|
||||
/// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
|
||||
/// requests in the Session associated with this `LightstreamerClient`.
|
||||
adapter_set: Option<String>,
|
||||
/// Data object that contains the details needed to open a connection to a Lightstreamer Server.
|
||||
/// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
|
||||
@ -56,8 +91,13 @@ pub struct LightstreamerClient {
|
||||
/// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
|
||||
/// can be overwritten by values received from a Lightstreamer Server.
|
||||
pub connection_options: ConnectionOptions,
|
||||
/// A list of listeners that will receive events from the `LightstreamerClient` instance.
|
||||
listeners: Vec<Box<dyn ClientListener>>,
|
||||
/// A list containing all the `Subscription` instances that are currently "active" on this
|
||||
/// `LightstreamerClient`.
|
||||
subscriptions: Vec<Subscription>,
|
||||
/// The current status of the client.
|
||||
status: ClientStatus,
|
||||
}
|
||||
|
||||
impl Debug for LightstreamerClient {
|
||||
@ -147,11 +187,239 @@ impl LightstreamerClient {
|
||||
/// See also `ClientListener.onStatusChange()`
|
||||
///
|
||||
/// See also `ConnectionDetails.setServerAddress()`
|
||||
pub fn connect(&mut self) -> Result<(), IllegalStateException> {
|
||||
pub async fn connect(&mut self) -> Result<(), IllegalStateException> {
|
||||
if self.server_address.is_none() {
|
||||
return Err(IllegalStateException::new("No server address was configured."));
|
||||
return Err(IllegalStateException::new(
|
||||
"No server address was configured.",
|
||||
));
|
||||
}
|
||||
|
||||
let forced_transport = self.connection_options.get_forced_transport();
|
||||
if forced_transport.is_none() || *forced_transport.unwrap() != Transport::WsStreaming {
|
||||
// unwrap() is safe here.
|
||||
return Err(IllegalStateException::new(
|
||||
"Only WebSocket streaming transport is currently supported.",
|
||||
));
|
||||
}
|
||||
|
||||
let mut params = HashMap::new();
|
||||
|
||||
//
|
||||
// Build the mandatory request parameters.
|
||||
//
|
||||
|
||||
params.insert("LS_protocol", "TLCP-2.5.0");
|
||||
params.insert("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg");
|
||||
|
||||
//
|
||||
// Add optional parameters
|
||||
//
|
||||
|
||||
/*
|
||||
|
||||
if let Some(user) = &self.connection_details.get_user() {
|
||||
params.insert("LS_user", user);
|
||||
}
|
||||
|
||||
if let Some(password) = &self.connection_details.get_password() {
|
||||
params.insert("LS_password", password);
|
||||
}
|
||||
|
||||
if let Some(adapter_set) = &self.adapter_set {
|
||||
params.insert("LS_adapter_set", adapter_set);
|
||||
}
|
||||
|
||||
if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() {
|
||||
params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string());
|
||||
}
|
||||
|
||||
if let Some(content_length) = self.connection_options.get_content_length() {
|
||||
params.insert("LS_content_length", &content_length.to_string());
|
||||
}
|
||||
|
||||
if let Some(supported_diffs) = &self.connection_options.get_supported_diffs() {
|
||||
params.insert("LS_supported_diffs", supported_diffs);
|
||||
}
|
||||
|
||||
if self.connection_options.is_polling() {
|
||||
params.insert("LS_polling", "true");
|
||||
if let Some(polling_millis) = self.connection_options.get_polling_millis() {
|
||||
params.insert("LS_polling_millis", &polling_millis.to_string());
|
||||
}
|
||||
|
||||
if let Some(idle_millis) = self.connection_options.get_idle_millis() {
|
||||
params.insert("LS_idle_millis", &idle_millis.to_string());
|
||||
}
|
||||
} else {
|
||||
if let Some(inactivity_millis) = self.connection_options.get_inactivity_millis() {
|
||||
params.insert("LS_inactivity_millis", &inactivity_millis.to_string());
|
||||
}
|
||||
|
||||
if let Some(keepalive_millis) = self.connection_options.get_keepalive_millis() {
|
||||
params.insert("LS_keepalive_millis", &keepalive_millis.to_string());
|
||||
}
|
||||
|
||||
if !self.connection_options.is_send_sync() {
|
||||
params.insert("LS_send_sync", "false");
|
||||
}
|
||||
}
|
||||
|
||||
if self.connection_options.is_reduce_head() {
|
||||
params.insert("LS_reduce_head", "true");
|
||||
}
|
||||
|
||||
if let Some(ttl_millis) = self.connection_options.get_ttl_millis() {
|
||||
params.insert("LS_ttl_millis", &ttl_millis.to_string());
|
||||
}
|
||||
|
||||
// Build the request body
|
||||
let request_body = build_request_body(¶ms);
|
||||
|
||||
// Send the create session request
|
||||
let response = send_create_session_request(
|
||||
&self.server_address.as_ref().unwrap(),
|
||||
&request_body,
|
||||
)?;
|
||||
|
||||
// Process the server response
|
||||
process_create_session_response(&response)?;
|
||||
|
||||
*/
|
||||
|
||||
//
|
||||
// Convert the HTTP URL to a WebSocket URL.
|
||||
//
|
||||
let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
|
||||
let mut url = Url::parse(&http_url)
|
||||
.expect("Failed to parse server address URL from connection details.");
|
||||
match url.scheme() {
|
||||
"http" => url
|
||||
.set_scheme("ws")
|
||||
.expect("Failed to set scheme to ws for WebSocket URL."),
|
||||
"https" => url
|
||||
.set_scheme("wss")
|
||||
.expect("Failed to set scheme to wss for WebSocket URL."),
|
||||
invalid_scheme => {
|
||||
return Err(IllegalStateException::new(&format!(
|
||||
"Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
|
||||
invalid_scheme
|
||||
)));
|
||||
}
|
||||
}
|
||||
let ws_url = url.as_str();
|
||||
|
||||
// Build the WebSocket request with the necessary headers.
|
||||
let request = Request::builder()
|
||||
.uri(ws_url)
|
||||
.header(
|
||||
HeaderName::from_static("connection"),
|
||||
HeaderValue::from_static("Upgrade"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("host"),
|
||||
HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
|
||||
IllegalStateException::new(&format!("Invalid header value for header with name 'host': {}", err))
|
||||
})?,
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("sec-websocket-key"),
|
||||
HeaderValue::from_static("PNDUibe9ex7PnsrLbt0N4w=="),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("sec-websocket-protocol"),
|
||||
HeaderValue::from_static("TLCP-2.5.0.lightstreamer.com"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("sec-websocket-version"),
|
||||
HeaderValue::from_static("13"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("upgrade"),
|
||||
HeaderValue::from_static("websocket"),
|
||||
)
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
// Connect to the Lightstreamer server using WebSocket.
|
||||
let ws_stream = match connect_async(request).await {
|
||||
Ok((ws_stream, response)) => {
|
||||
if let Some(server_header) = response.headers().get("server") {
|
||||
println!("Connected to Lightstreamer server: {}", server_header.to_str().unwrap_or(""));
|
||||
} else {
|
||||
println!("Connected to Lightstreamer server");
|
||||
}
|
||||
ws_stream
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(IllegalStateException::new(&format!(
|
||||
"Failed to connect to Lightstreamer server with WebSocket: {}",
|
||||
err
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
// Split the WebSocket stream into a write and read stream.
|
||||
let (mut write_stream, mut read_stream) = ws_stream.split();
|
||||
|
||||
//
|
||||
// Confirm the connection by sending a 'wsok' message to the server.
|
||||
//
|
||||
write_stream.send(Message::Text("wsok".into())).await.expect("Failed to send message");
|
||||
if let Some(result) = read_stream.next().await {
|
||||
match result? {
|
||||
Message::Text(text) => {
|
||||
let clean_text = clean_message(&text);
|
||||
if clean_text == "wsok" {
|
||||
println!("Connection confirmed by server");
|
||||
} else {
|
||||
return Err(IllegalStateException::new(&format!(
|
||||
"Unexpected message received from server: {}",
|
||||
clean_text
|
||||
)));
|
||||
}
|
||||
},
|
||||
non_text_message => {
|
||||
println!("Unexpected non-text message from server: {:?}", non_text_message);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// Session creation parameters
|
||||
let params = [
|
||||
("LS_op2", "create_session"),
|
||||
("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
|
||||
("LS_adapter_set", "DEMO"),
|
||||
];
|
||||
|
||||
let encoded_params = serde_urlencoded::to_string(¶ms)?;
|
||||
|
||||
// Send the create session message
|
||||
write_stream
|
||||
.send(Message::Text(format!("{}\n", encoded_params)))
|
||||
.await?;
|
||||
*/
|
||||
|
||||
// Listen for messages from the server
|
||||
while let Some(message) = read_stream.next().await {
|
||||
match message? {
|
||||
Message::Text(text) => {
|
||||
if text.starts_with("CONOK") {
|
||||
let session_info: Vec<&str> = text.split(",").collect();
|
||||
let session_id = session_info.get(1).unwrap_or(&"").to_string();
|
||||
println!("Session established with ID: {}", session_id);
|
||||
//subscribe_to_channel_ws(session_id, write_stream).await?;
|
||||
break; // Exit after successful subscription
|
||||
} else {
|
||||
println!("Received unexpected message from server: {}", text);
|
||||
}
|
||||
}
|
||||
msg => { println!("Received non-text message from server: {:?}", msg); }
|
||||
}
|
||||
}
|
||||
|
||||
println!("No more messages from server");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -173,6 +441,7 @@ impl LightstreamerClient {
|
||||
/// See also `connect()`
|
||||
pub fn disconnect(&mut self) {
|
||||
// Implementation for disconnect
|
||||
println!("Disconnecting from Lightstreamer server");
|
||||
}
|
||||
|
||||
/// Static inquiry method that can be used to share cookies between connections to the Server
|
||||
@ -229,9 +498,8 @@ impl LightstreamerClient {
|
||||
/// - `"DISCONNECTED"`: no connection is currently active.
|
||||
///
|
||||
/// See also `ClientListener.onStatusChange()`
|
||||
pub fn get_status(&self) -> &str {
|
||||
// Implementation for get_status
|
||||
unimplemented!()
|
||||
pub fn get_status(&self) -> &ClientStatus {
|
||||
&self.status
|
||||
}
|
||||
|
||||
/// Inquiry method that returns a list containing all the `Subscription` instances that are
|
||||
@ -299,6 +567,7 @@ impl LightstreamerClient {
|
||||
connection_options,
|
||||
listeners: Vec::new(),
|
||||
subscriptions: Vec::new(),
|
||||
status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
|
||||
})
|
||||
}
|
||||
|
||||
@ -401,7 +670,30 @@ impl LightstreamerClient {
|
||||
listener: Option<Box<dyn ClientMessageListener>>,
|
||||
enqueue_while_disconnected: bool,
|
||||
) {
|
||||
// Implementation for send_message
|
||||
let sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES");
|
||||
|
||||
// Handle the message based on the current connection status
|
||||
match &self.status {
|
||||
ClientStatus::Connected(connection_type) => {
|
||||
// Send the message to the server in a separate thread
|
||||
// ...
|
||||
}
|
||||
ClientStatus::Disconnected(disconnection_type) => {
|
||||
if enqueue_while_disconnected {
|
||||
// Enqueue the message to be sent when a connection is available
|
||||
// ...
|
||||
} else {
|
||||
// Abort the message and notify the listener
|
||||
if let Some(listener) = listener {
|
||||
listener.on_abort(message, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Enqueue the message to be sent when a connection is available
|
||||
// ...
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Static method that permits to configure the logging system used by the library. The logging
|
||||
@ -522,3 +814,32 @@ impl LightstreamerClient {
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/// The transport type to be used by the client.
|
||||
/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will
|
||||
/// only use WebSocket based connections. If a connection over WebSocket is not possible
|
||||
/// because of the environment the client will not connect at all.
|
||||
/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client
|
||||
/// will only use HTTP based connections. If a connection over HTTP is not possible because
|
||||
/// of the environment the client will not connect at all.
|
||||
/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect
|
||||
/// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of
|
||||
/// the environment the client will not connect at all.
|
||||
/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only
|
||||
/// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the
|
||||
/// browser/environment the client will not connect at all.
|
||||
/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
|
||||
/// on Polling over WebSocket. If Polling over WebSocket is not possible because of the
|
||||
/// environment the client will not connect at all.
|
||||
/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
|
||||
/// on Polling over HTTP. If Polling over HTTP is not possible because of the environment
|
||||
/// the client will not connect at all.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Transport {
|
||||
Ws,
|
||||
Http,
|
||||
WsStreaming,
|
||||
HttpStreaming,
|
||||
WsPolling,
|
||||
HttpPolling,
|
||||
}
|
||||
|
211
src/main.rs
211
src/main.rs
@ -1,6 +1,6 @@
|
||||
use hyper::client;
|
||||
use lightstreamer_client::item_update::ItemUpdate;
|
||||
use lightstreamer_client::ls_client::LightstreamerClient;
|
||||
use lightstreamer_client::ls_client::{LightstreamerClient, Transport};
|
||||
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
|
||||
use lightstreamer_client::subscription_listener::SubscriptionListener;
|
||||
|
||||
@ -11,10 +11,16 @@ use serde_urlencoded;
|
||||
use signal_hook::low_level::signal_name;
|
||||
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
|
||||
use std::error::Error;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||
|
||||
struct SharedState {
|
||||
client: Arc<Mutex<LightstreamerClient>>,
|
||||
should_disconnect: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
/// Sets up a signal hook for SIGINT and SIGTERM.
|
||||
///
|
||||
/// Creates a signal hook for the specified signals and spawns a thread to handle them.
|
||||
@ -29,7 +35,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||
///
|
||||
/// The function panics if it fails to create the signal iterator.
|
||||
///
|
||||
async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
|
||||
async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) {
|
||||
// Create a signal set of signals to be handled and a signal iterator to monitor them.
|
||||
let signals = &[SIGINT, SIGTERM];
|
||||
let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
|
||||
@ -41,8 +47,12 @@ async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
|
||||
//
|
||||
// Clean up and prepare to exit...
|
||||
// ...
|
||||
let mut client = client.lock().await;
|
||||
{
|
||||
let shared_state = shared_state.lock().await;
|
||||
shared_state.should_disconnect.store(true, Ordering::Relaxed);
|
||||
let mut client = shared_state.client.lock().await;
|
||||
client.disconnect();
|
||||
}
|
||||
|
||||
// Exit with 0 code to indicate orderly shutdown.
|
||||
std::process::exit(0);
|
||||
@ -50,190 +60,6 @@ async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
|
||||
});
|
||||
}
|
||||
|
||||
async fn establish_persistent_http_connection(
|
||||
session_id_shared: Arc<Mutex<String>>,
|
||||
) -> Result<(), reqwest::Error> {
|
||||
let client = Client::new();
|
||||
let params = [
|
||||
("LS_adapter_set", "DEMO"),
|
||||
("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
|
||||
];
|
||||
let request_url =
|
||||
"http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0";
|
||||
|
||||
let response = client.post(request_url).form(¶ms).send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
match item {
|
||||
Ok(bytes) => {
|
||||
let response_text = String::from_utf8(bytes.to_vec())
|
||||
.expect("Failed to convert bytes to string");
|
||||
if let Some(start) = response_text.find("CONOK,") {
|
||||
if let Some(end) = response_text.find(",50000,5000,*\r\n") {
|
||||
let session_id = &response_text[start + 6..end];
|
||||
println!("Session ID: {}", session_id);
|
||||
let mut session_id_lock = session_id_shared.lock().await;
|
||||
*session_id_lock = session_id.to_string();
|
||||
}
|
||||
} else {
|
||||
println!("New message: {}", response_text);
|
||||
}
|
||||
}
|
||||
Err(e) => println!("Error while receiving: {:?}", e),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("Response was not successful: {}", response.status());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
// Establish a persistent WebSocket connection and handle the session creation
|
||||
async fn establish_persistent_ws_connection(
|
||||
session_id_shared: Arc<Mutex<String>>,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let ws_url = "wss://push.lightstreamer.com/lightstreamer";
|
||||
|
||||
let (ws_stream, _) = tokio_tungstenite::connect_async_with_config(
|
||||
tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned()))
|
||||
).await.expect("Failed to connect");
|
||||
|
||||
let (mut write, mut read) = ws_stream.split();
|
||||
|
||||
// Session creation parameters
|
||||
let params = [
|
||||
("LS_op2", "create_session"),
|
||||
("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
|
||||
("LS_adapter_set", "DEMO"),
|
||||
];
|
||||
|
||||
let encoded_params = serde_urlencoded::to_string(¶ms)?;
|
||||
|
||||
// Send the create session message
|
||||
write
|
||||
.send(Message::Text(format!("{}\n", encoded_params)))
|
||||
.await?;
|
||||
|
||||
// Listen for messages from the server
|
||||
while let Some(message) = read.next().await {
|
||||
match message? {
|
||||
Message::Text(text) => {
|
||||
if text.starts_with("CONOK") {
|
||||
let session_info: Vec<&str> = text.split(",").collect();
|
||||
let session_id = session_info.get(1).unwrap_or(&"").to_string();
|
||||
*session_id_shared.lock().await = session_id.clone();
|
||||
println!("Session established with ID: {}", session_id);
|
||||
subscribe_to_channel_ws(session_id, write).await?;
|
||||
break; // Exit after successful subscription
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
*/
|
||||
|
||||
async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> {
|
||||
let client = Client::new();
|
||||
//let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt";
|
||||
//let params = [("LS_session", &session_id)];
|
||||
let subscribe_url =
|
||||
"http://push.lightstreamer.com/lightstreamer/control.txt?LS_protocol=TLCP-2.0.0";
|
||||
let params = [
|
||||
("LS_session", &session_id),
|
||||
("LS_op", &"add".to_string()),
|
||||
("LS_subId", &"1".to_string()),
|
||||
("LS_data_adapter", &"CHAT_ROOM".to_string()),
|
||||
("LS_group", &"chat_room".to_string()),
|
||||
("LS_schema", &"timestamp message".to_string()),
|
||||
("LS_mode", &"DISTINCT".to_string()),
|
||||
("LS_reqId", &"1".to_string()),
|
||||
];
|
||||
|
||||
let response = client.post(subscribe_url).form(¶ms).send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
println!("Subscription successful!");
|
||||
} else {
|
||||
println!("Subscription failed: {}", response.status());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Function to subscribe to a channel using WebSocket
|
||||
async fn subscribe_to_channel_ws(
|
||||
session_id: String,
|
||||
mut write: futures::stream::SplitSink<
|
||||
tokio_tungstenite::WebSocketStream<
|
||||
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
tokio_tungstenite::tungstenite::protocol::Message,
|
||||
>,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
// Example subscription to ITEM1 in MERGE mode from the DEMO adapter set
|
||||
let sub_params = [
|
||||
("LS_table", "1"),
|
||||
("LS_op2", "add"),
|
||||
("LS_session", &session_id),
|
||||
("LS_id", "item1"),
|
||||
("LS_schema", "stock_name last_price"),
|
||||
("LS_mode", "MERGE"),
|
||||
];
|
||||
|
||||
let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?;
|
||||
|
||||
// Send the subscription message
|
||||
write.send(Message::Text(encoded_sub_params)).await?;
|
||||
|
||||
println!("Subscribed to channel with session ID: {}", session_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
let session_id_shared = Arc::new(Mutex::new(String::new()));
|
||||
let session_id_shared_clone = session_id_shared.clone();
|
||||
|
||||
let task1 = tokio::spawn(async move {
|
||||
establish_persistent_http_connection(session_id_shared_clone).await.unwrap();
|
||||
});
|
||||
|
||||
println!("Established connection to Lightstreamer server");
|
||||
let task2 = tokio::spawn(async move {
|
||||
let mut session_established = false;
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let session_id;
|
||||
{
|
||||
session_id = session_id_shared.lock().await.clone();
|
||||
}
|
||||
|
||||
if !session_established && !session_id.is_empty() {
|
||||
println!("Accessed Session ID from another thread: {}", session_id);
|
||||
session_established = true;
|
||||
subscribe_to_channel(session_id).await.unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
task1.await?;
|
||||
task2.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
*/
|
||||
|
||||
pub struct MySubscriptionListener {}
|
||||
|
||||
impl SubscriptionListener for MySubscriptionListener {
|
||||
@ -277,10 +103,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
{
|
||||
let mut client = client.lock().await;
|
||||
client.subscribe(my_subscription);
|
||||
client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
|
||||
}
|
||||
|
||||
let should_disconnect = Arc::new(AtomicBool::new(false));
|
||||
let shared_state = Arc::new(Mutex::new(SharedState {
|
||||
client: client.clone(),
|
||||
should_disconnect: should_disconnect.clone(),
|
||||
}));
|
||||
|
||||
// Spawn a new thread to handle SIGINT and SIGTERM process signals.
|
||||
setup_signal_hook(client.clone()).await;
|
||||
setup_signal_hook(shared_state).await;
|
||||
|
||||
//
|
||||
// Infinite loop that will indefinitely retry failed connections unless
|
||||
@ -290,7 +123,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let mut retry_counter: u64 = 0;
|
||||
loop {
|
||||
let mut client = client.lock().await;
|
||||
match client.connect() {
|
||||
match client.connect().await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("Failed to connect: {:?}", e);
|
||||
|
4
src/util.rs
Normal file
4
src/util.rs
Normal file
@ -0,0 +1,4 @@
|
||||
/// Clean the message from newlines and carriage returns and convert it to lowercase.
|
||||
pub fn clean_message(text: &str) -> String {
|
||||
text.replace("\n", "").replace("\r", "").to_lowercase()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user