1235 lines
68 KiB
Rust
1235 lines
68 KiB
Rust
use crate::client_listener::ClientListener;
|
|
use crate::client_message_listener::ClientMessageListener;
|
|
use crate::connection_details::ConnectionDetails;
|
|
use crate::connection_options::ConnectionOptions;
|
|
use crate::error::IllegalStateException;
|
|
use crate::item_update::ItemUpdate;
|
|
use crate::subscription::{Snapshot, Subscription, SubscriptionMode};
|
|
use crate::util::*;
|
|
|
|
use cookie::Cookie;
|
|
use futures_util::{SinkExt, StreamExt};
|
|
use std::collections::HashMap;
|
|
use std::error::Error;
|
|
use std::fmt::{self, Debug, Formatter};
|
|
use std::sync::Arc;
|
|
use tokio::sync::Notify;
|
|
use tokio_tungstenite::{
|
|
connect_async,
|
|
tungstenite::{
|
|
http::{HeaderName, HeaderValue, Request},
|
|
Message,
|
|
},
|
|
};
|
|
use tracing::{debug, error, info, instrument, trace, warn, Level};
|
|
use url::Url;
|
|
|
|
/// Represents the current status of the `LightstreamerClient`.
|
|
pub enum ClientStatus {
|
|
Connecting,
|
|
Connected(ConnectionType),
|
|
Stalled,
|
|
Disconnected(DisconnectionType),
|
|
}
|
|
|
|
pub enum ConnectionType {
|
|
HttpPolling,
|
|
HttpStreaming,
|
|
StreamSensing,
|
|
WsPolling,
|
|
WsStreaming,
|
|
}
|
|
|
|
pub enum DisconnectionType {
|
|
WillRetry,
|
|
TryingRecovery,
|
|
}
|
|
|
|
pub enum LogType {
|
|
TracingLogs,
|
|
StdLogs,
|
|
}
|
|
|
|
/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
|
|
/// configuration settings, event handlers, operations for the control of the connection lifecycle,
|
|
/// Subscription handling and to send messages.
|
|
///
|
|
/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
|
|
/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
|
|
/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
|
|
/// a single instance of `LightstreamerClient` is needed.
|
|
///
|
|
/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
|
|
/// endpoints.
|
|
///
|
|
/// You can listen to the events generated by a session by registering an event listener, such as
|
|
/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
|
|
/// such as session creation, connection status, subscription updates, and server messages. However,
|
|
/// you should be aware that the event notifications are dispatched by a single thread, the so-called
|
|
/// event thread. This means that if the operations of a listener are slow or blocking, they will
|
|
/// delay the processing of the other listeners and affect the performance of your application.
|
|
/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
|
|
/// the listener methods as fast and simple as possible. Note that even if you create multiple
|
|
/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
|
|
/// among them.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
|
|
/// will connect to. It is possible to specify it later by using `None` here. See
|
|
/// `ConnectionDetails.setServerAddress()` for details.
|
|
/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
|
|
/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
|
|
/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
|
|
/// for details.
|
|
///
|
|
/// # Raises
|
|
///
|
|
/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
|
|
/// for details.
|
|
pub struct LightstreamerClient {
|
|
/// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
|
|
server_address: Option<String>,
|
|
/// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
|
|
/// requests in the Session associated with this `LightstreamerClient`.
|
|
adapter_set: Option<String>,
|
|
/// Data object that contains the details needed to open a connection to a Lightstreamer Server.
|
|
/// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
|
|
/// of this object can be overwritten by values received from a Lightstreamer Server.
|
|
pub connection_details: ConnectionDetails,
|
|
/// Data object that contains options and policies for the connection to the server. This instance
|
|
/// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
|
|
/// can be overwritten by values received from a Lightstreamer Server.
|
|
pub connection_options: ConnectionOptions,
|
|
/// A list of listeners that will receive events from the `LightstreamerClient` instance.
|
|
listeners: Vec<Box<dyn ClientListener>>,
|
|
/// A list containing all the `Subscription` instances that are currently "active" on this
|
|
/// `LightstreamerClient`.
|
|
subscriptions: Vec<Subscription>,
|
|
/// The current status of the client.
|
|
status: ClientStatus,
|
|
/// Logging Type to be used
|
|
logging: LogType,
|
|
}
|
|
|
|
impl Debug for LightstreamerClient {
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("LightstreamerClient")
|
|
.field("server_address", &self.server_address)
|
|
.field("adapter_set", &self.adapter_set)
|
|
.field("connection_details", &self.connection_details)
|
|
.field("connection_options", &self.connection_options)
|
|
.field("listeners", &self.listeners)
|
|
.field("subscriptions", &self.subscriptions)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl LightstreamerClient {
|
|
/// A constant string representing the name of the library.
|
|
pub const LIB_NAME: &'static str = "rust_client";
|
|
|
|
/// A constant string representing the version of the library.
|
|
pub const LIB_VERSION: &'static str = "0.1.0";
|
|
|
|
//
|
|
// Constants for WebSocket connection.
|
|
//
|
|
pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
|
|
pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
|
|
pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
|
|
pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
|
|
|
|
/// A constant string representing the version of the TLCP protocol used by the library.
|
|
pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
|
|
|
|
/// Static method that can be used to share cookies between connections to the Server (performed by
|
|
/// this library) and connections to other sites that are performed by the application. With this
|
|
/// method, cookies received by the application can be added (or replaced if already present) to
|
|
/// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
|
|
/// is compatible with the Server domain will be used internally.
|
|
///
|
|
/// This method should be invoked before calling the `LightstreamerClient.connect()` method.
|
|
/// However it can be invoked at any time; it will affect the internal cookie set immediately
|
|
/// and the sending of cookies on the next HTTP request or WebSocket establishment.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
|
|
/// * `cookies`: an instance of `http.cookies.SimpleCookie`.
|
|
///
|
|
/// See also `getCookies()`
|
|
pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
|
|
// Implementation for add_cookies
|
|
unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
|
|
}
|
|
|
|
/// Adds a listener that will receive events from the `LightstreamerClient` instance.
|
|
///
|
|
/// The same listener can be added to several different `LightstreamerClient` instances.
|
|
///
|
|
/// A listener can be added at any time. A call to add a listener already present will be ignored.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `listener`: An object that will receive the events as documented in the `ClientListener`
|
|
/// interface.
|
|
///
|
|
/// See also `removeListener()`
|
|
pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
|
|
self.listeners.push(listener);
|
|
}
|
|
|
|
/// Operation method that requests to open a Session against the configured Lightstreamer Server.
|
|
///
|
|
/// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
|
|
/// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
|
|
/// for some seconds from the streaming connection, then it will automatically open a polling
|
|
/// connection.
|
|
///
|
|
/// A polling connection may also be opened if the environment is not suitable for a streaming
|
|
/// connection.
|
|
///
|
|
/// Note that as "polling connection" we mean a loop of polling requests, each of which requires
|
|
/// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
|
|
///
|
|
/// Note that the request to connect is accomplished by the client in a separate thread; this
|
|
/// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
|
|
/// yet.
|
|
///
|
|
/// When the request to connect is finally being executed, if the current status of the client
|
|
/// is not `DISCONNECTED`, then nothing will be done.
|
|
///
|
|
/// # Raises
|
|
///
|
|
/// * `IllegalStateException`: if no server address was configured.
|
|
///
|
|
/// See also `getStatus()`
|
|
///
|
|
/// See also `disconnect()`
|
|
///
|
|
/// See also `ClientListener.onStatusChange()`
|
|
///
|
|
/// See also `ConnectionDetails.setServerAddress()`
|
|
#[instrument]
|
|
pub async fn connect(&mut self, shutdown_signal: Arc<Notify>) -> Result<(), Box<dyn Error>> {
|
|
// Check if the server address is configured.
|
|
if self.server_address.is_none() {
|
|
return Err(Box::new(IllegalStateException::new(
|
|
"No server address was configured.",
|
|
)));
|
|
}
|
|
//
|
|
// Only WebSocket streaming transport is currently supported.
|
|
//
|
|
let forced_transport = self.connection_options.get_forced_transport();
|
|
if forced_transport.is_none()
|
|
|| *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming
|
|
{
|
|
return Err(Box::new(IllegalStateException::new(
|
|
"Only WebSocket streaming transport is currently supported.",
|
|
)));
|
|
}
|
|
//
|
|
// Convert the HTTP URL to a WebSocket URL.
|
|
//
|
|
let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
|
|
let mut url = Url::parse(&http_url)
|
|
.expect("Failed to parse server address URL from connection details.");
|
|
match url.scheme() {
|
|
"http" => url
|
|
.set_scheme("ws")
|
|
.expect("Failed to set scheme to ws for WebSocket URL."),
|
|
"https" => url
|
|
.set_scheme("wss")
|
|
.expect("Failed to set scheme to wss for WebSocket URL."),
|
|
invalid_scheme => {
|
|
return Err(Box::new(IllegalStateException::new(&format!(
|
|
"Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
|
|
invalid_scheme
|
|
))));
|
|
}
|
|
}
|
|
let ws_url = url.as_str();
|
|
|
|
// Build the WebSocket request with the necessary headers.
|
|
let request = Request::builder()
|
|
.uri(ws_url)
|
|
.header(
|
|
HeaderName::from_static("connection"),
|
|
HeaderValue::from_static("Upgrade"),
|
|
)
|
|
.header(
|
|
HeaderName::from_static("host"),
|
|
HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
|
|
IllegalStateException::new(&format!(
|
|
"Invalid header value for header with name 'host': {}",
|
|
err
|
|
))
|
|
})?,
|
|
)
|
|
.header(
|
|
HeaderName::from_static("sec-websocket-key"),
|
|
HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
|
|
)
|
|
.header(
|
|
HeaderName::from_static("sec-websocket-protocol"),
|
|
HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
|
|
)
|
|
.header(
|
|
HeaderName::from_static("sec-websocket-version"),
|
|
HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
|
|
)
|
|
.header(
|
|
HeaderName::from_static("upgrade"),
|
|
HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
|
|
)
|
|
.body(())?;
|
|
|
|
// Connect to the Lightstreamer server using WebSocket.
|
|
let ws_stream = match connect_async(request).await {
|
|
Ok((ws_stream, response)) => {
|
|
if let Some(server_header) = response.headers().get("server") {
|
|
self.make_log(
|
|
Level::INFO,
|
|
&format!(
|
|
"Connected to Lightstreamer server: {}",
|
|
server_header.to_str().unwrap_or("")
|
|
),
|
|
);
|
|
} else {
|
|
self.make_log(Level::INFO, "Connected to Lightstreamer server");
|
|
}
|
|
ws_stream
|
|
}
|
|
Err(err) => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::ConnectionRefused,
|
|
format!(
|
|
"Failed to connect to Lightstreamer server with WebSocket: {}",
|
|
err
|
|
),
|
|
)));
|
|
}
|
|
};
|
|
|
|
// Split the WebSocket stream into a write and a read stream.
|
|
let (mut write_stream, mut read_stream) = ws_stream.split();
|
|
|
|
//
|
|
// Initiate communication with the server by sending a 'wsok' message.
|
|
//
|
|
write_stream.send(Message::Text("wsok".into())).await?;
|
|
|
|
//
|
|
// Start reading and processing messages from the server.
|
|
//
|
|
let mut request_id: usize = 0;
|
|
let mut _session_id: Option<String> = None;
|
|
let mut subscription_id: usize = 0;
|
|
let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
|
|
HashMap::new();
|
|
loop {
|
|
tokio::select! {
|
|
message = read_stream.next() => {
|
|
match message {
|
|
Some(Ok(Message::Text(text))) => {
|
|
// Messages could include multiple submessages separated by /r/n.
|
|
// Split the message into submessages and process each one separately.
|
|
let submessages: Vec<&str> = text.split("\r\n")
|
|
.filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
|
|
.collect();
|
|
for submessage in submessages {
|
|
let clean_text = clean_message(&submessage);
|
|
let submessage_fields: Vec<&str> = clean_text.split(",").collect();
|
|
match *submessage_fields.first().unwrap_or(&"") {
|
|
//
|
|
// Errors from server.
|
|
//
|
|
"conerr" | "reqerr" => {
|
|
self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
|
|
break;
|
|
},
|
|
//
|
|
// Session created successfully.
|
|
//
|
|
"conok" => {
|
|
if let Some(session_id) = submessage_fields.get(1).as_deref() {
|
|
self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
|
|
self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
|
|
//
|
|
// Subscribe to the desired items.
|
|
//
|
|
while let Some(subscription) = self.subscriptions.get(subscription_id) {
|
|
//
|
|
// Gather all the necessary subscription parameters.
|
|
//
|
|
request_id += 1;
|
|
let ls_req_id = request_id.to_string();
|
|
subscription_id += 1;
|
|
let ls_sub_id = subscription_id.to_string();
|
|
let ls_mode = subscription.get_mode().to_string();
|
|
let ls_group = match subscription.get_item_group() {
|
|
Some(item_group) => item_group.to_string(),
|
|
None => match subscription.get_items() {
|
|
Some(items) => {
|
|
items.join(" ")
|
|
},
|
|
None => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
"No item group or items found in subscription.",
|
|
)));
|
|
},
|
|
},
|
|
};
|
|
let ls_schema = match subscription.get_field_schema() {
|
|
Some(field_schema) => field_schema.to_string(),
|
|
None => match subscription.get_fields() {
|
|
Some(fields) => {
|
|
fields.join(" ")
|
|
},
|
|
None => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
"No field schema or fields found in subscription.",
|
|
)));
|
|
},
|
|
},
|
|
};
|
|
let ls_data_adapter = match subscription.get_data_adapter() {
|
|
Some(data_adapter) => data_adapter.to_string(),
|
|
None => "".to_string(),
|
|
};
|
|
let ls_snapshot = subscription.get_requested_snapshot().unwrap_or_default().to_string();
|
|
//
|
|
// Prepare the subscription request.
|
|
//
|
|
let mut params: Vec<(&str, &str)> = vec![
|
|
("LS_data_adapter", &ls_data_adapter),
|
|
("LS_reqId", &ls_req_id),
|
|
("LS_op", "add"),
|
|
("LS_subId", &ls_sub_id),
|
|
("LS_mode", &ls_mode),
|
|
("LS_group", &ls_group),
|
|
("LS_schema", &ls_schema),
|
|
("LS_ack", "false"),
|
|
];
|
|
// Remove the data adapter parameter if not specified.
|
|
if ls_data_adapter == "" {
|
|
params.remove(0);
|
|
}
|
|
if ls_snapshot != "" {
|
|
params.push(("LS_snapshot", &ls_snapshot));
|
|
}
|
|
let encoded_params = serde_urlencoded::to_string(¶ms)?;
|
|
write_stream
|
|
.send(Message::Text(format!("control\r\n{}", encoded_params).into()))
|
|
.await?;
|
|
info!("Sent subscription request: '{}'", encoded_params);
|
|
}
|
|
} else {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
"Session ID not found in 'conok' message from server",
|
|
)));
|
|
}
|
|
},
|
|
//
|
|
// Notifications from server.
|
|
//
|
|
"conf" | "cons" | "clientip" | "servname" | "prog" | "sync" => {
|
|
self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
|
|
// Don't do anything with these notifications for now.
|
|
},
|
|
"probe" => {
|
|
self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
|
|
},
|
|
"reqok" => {
|
|
self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
|
|
},
|
|
//
|
|
// Subscription confirmation from server.
|
|
//
|
|
"subok" => {
|
|
self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
|
|
},
|
|
//
|
|
// Data updates from server.
|
|
//
|
|
"u" => {
|
|
// Parse arguments from the received message.
|
|
let arguments = clean_text.split(",").collect::<Vec<&str>>();
|
|
//
|
|
// Extract the subscription from the first argument.
|
|
//
|
|
let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
|
|
let subscription = match self.get_subscriptions().get(subscription_index-1) {
|
|
Some(subscription) => subscription,
|
|
None => {
|
|
self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
|
|
continue;
|
|
|
|
}
|
|
};
|
|
//
|
|
// Extract the item from the second argument.
|
|
//
|
|
let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
|
|
let item = match subscription.get_items() {
|
|
Some(items) => items.get(item_index-1),
|
|
None => {
|
|
self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
|
|
continue;
|
|
}
|
|
};
|
|
//
|
|
// Determine if the update is a snapshot or real-time update based on the subscription parameters.
|
|
//
|
|
let is_snapshot = match subscription.get_requested_snapshot() {
|
|
Some(ls_snapshot) => {
|
|
match ls_snapshot {
|
|
Snapshot::No => false,
|
|
Snapshot::Yes => {
|
|
match subscription.get_mode() {
|
|
SubscriptionMode::Merge => {
|
|
if arguments.len() == 4 && arguments[3] == "$" {
|
|
// EOS notification received
|
|
true
|
|
} else {
|
|
// If item doesn't exist in item_updates yet, the first update
|
|
// is always a snapshot.
|
|
if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
|
|
if let Some(_) = item_updates.get(&(item_index)) {
|
|
// Item update already exists in item_updates, so it's not a snapshot.
|
|
false
|
|
} else {
|
|
// Item update doesn't exist in item_updates, so the first update is always a snapshot.
|
|
true
|
|
}
|
|
} else {
|
|
// Item updates not found for subscription, so the first update is always a snapshot.
|
|
true
|
|
}
|
|
}
|
|
},
|
|
SubscriptionMode::Distinct | SubscriptionMode::Command => {
|
|
if !subscription.is_subscribed() {
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
},
|
|
_ => false,
|
|
}
|
|
},
|
|
_ => false,
|
|
}
|
|
},
|
|
None => false,
|
|
};
|
|
|
|
// Extract the field values from the third argument.
|
|
let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
|
|
|
|
//
|
|
// Get fields from subscription and create a HashMap of field names and values.
|
|
//
|
|
let subscription_fields = subscription.get_fields();
|
|
let mut field_map: HashMap<String, Option<String>> = subscription_fields
|
|
.map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
|
|
.unwrap_or_default();
|
|
|
|
let mut field_index = 0;
|
|
for value in field_values {
|
|
match value {
|
|
"" => {
|
|
// An empty value means the field is unchanged compared to the previous update of the same field.
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
|
field_map.insert(field_name.to_string(), None);
|
|
}
|
|
field_index += 1;
|
|
}
|
|
"#" | "$" => {
|
|
// A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
|
field_map.insert(field_name.to_string(), Some("".to_string()));
|
|
}
|
|
field_index += 1;
|
|
}
|
|
value if value.starts_with('^') => {
|
|
let command = value.chars().nth(1).unwrap_or(' ');
|
|
match command {
|
|
'0'..='9' => {
|
|
let count = value[1..].parse().unwrap_or(0);
|
|
for i in 0..count {
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
|
|
field_map.insert(field_name.to_string(), None);
|
|
}
|
|
}
|
|
field_index += count;
|
|
}
|
|
'P' | 'T' => {
|
|
let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
|
if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
|
|
let new_value = match command {
|
|
'P' => {
|
|
// Apply JSON Patch
|
|
let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
|
|
let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
|
|
let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
|
|
let _ = json_patch::patch(&mut prev_json, &patch_operations);
|
|
prev_json.to_string()
|
|
}
|
|
'T' => {
|
|
// Apply TLCP-diff
|
|
//tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
|
|
unimplemented!("Implement TLCP-diff");
|
|
}
|
|
_ => unreachable!(),
|
|
};
|
|
field_map.insert(field_name.to_string(), Some(new_value.to_string()));
|
|
}
|
|
}
|
|
field_index += 1;
|
|
}
|
|
_ => {
|
|
let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
|
field_map.insert(field_name.to_string(), Some(decoded_value));
|
|
}
|
|
field_index += 1;
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
|
|
if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
|
|
field_map.insert(field_name.to_string(), Some(decoded_value));
|
|
}
|
|
field_index += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Store only item_update's changed fields.
|
|
let changed_fields: HashMap<String, String> = field_map.iter()
|
|
.filter_map(|(k, v)| {
|
|
if let Some(v) = v {
|
|
Some((k.clone(), v.clone()))
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
//
|
|
// Take the proper item_update from item_updates and update it with changed fields.
|
|
// If the item_update doesn't exist yet, create a new one.
|
|
//
|
|
let current_item_update: ItemUpdate;
|
|
match subscription_item_updates.get_mut(&(subscription_index)) {
|
|
Some(item_updates) => match item_updates.get_mut(&(item_index)) {
|
|
Some(item_update) => {
|
|
//
|
|
// Iterate changed_fields and update existing item_update.fields assigning the new values.
|
|
//
|
|
for (field_name, new_value) in &changed_fields {
|
|
if item_update.fields.contains_key(field_name) {
|
|
item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
|
|
}
|
|
}
|
|
item_update.changed_fields = changed_fields;
|
|
item_update.is_snapshot = is_snapshot;
|
|
current_item_update = item_update.clone();
|
|
},
|
|
None => {
|
|
// Create a new item_update and add it to item_updates.
|
|
let item_update = ItemUpdate {
|
|
item_name: item.cloned(),
|
|
item_pos: item_index,
|
|
fields: field_map,
|
|
changed_fields: changed_fields,
|
|
is_snapshot: is_snapshot,
|
|
};
|
|
current_item_update = item_update.clone();
|
|
item_updates.insert(item_index, item_update);
|
|
}
|
|
},
|
|
None => {
|
|
// Create a new item_update and add it to item_updates.
|
|
let item_update = ItemUpdate {
|
|
item_name: item.cloned(),
|
|
item_pos: item_index,
|
|
fields: field_map,
|
|
changed_fields: changed_fields,
|
|
is_snapshot: is_snapshot,
|
|
};
|
|
current_item_update = item_update.clone();
|
|
let mut item_updates = HashMap::new();
|
|
item_updates.insert(item_index, item_update);
|
|
subscription_item_updates.insert(subscription_index, item_updates);
|
|
}
|
|
};
|
|
|
|
// Get mutable subscription listeners directly.
|
|
let subscription_listeners = subscription.get_listeners();
|
|
|
|
// Iterate subscription listeners and call on_item_update for each listener.
|
|
for listener in subscription_listeners {
|
|
listener.on_item_update(¤t_item_update);
|
|
}
|
|
}
|
|
//
|
|
// Connection confirmation from server.
|
|
//
|
|
"wsok" => {
|
|
self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
|
|
//
|
|
// Request session creation.
|
|
//
|
|
let ls_adapter_set = match self.connection_details.get_adapter_set() {
|
|
Some(adapter_set) => adapter_set,
|
|
None => {
|
|
return Err(Box::new(IllegalStateException::new(
|
|
"No adapter set found in connection details.",
|
|
)));
|
|
},
|
|
};
|
|
let ls_send_sync = self.connection_options.get_send_sync().to_string();
|
|
let mut params: Vec<(&str, &str)> = vec![
|
|
("LS_adapter_set", &ls_adapter_set),
|
|
("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
|
|
("LS_send_sync", &ls_send_sync),
|
|
];
|
|
if let Some(user) = &self.connection_details.get_user() {
|
|
params.push(("LS_user", user));
|
|
}
|
|
if let Some(password) = &self.connection_details.get_password() {
|
|
params.push(("LS_password", password));
|
|
}
|
|
params.push(("LS_protocol", Self::TLCP_VERSION));
|
|
let encoded_params = serde_urlencoded::to_string(¶ms)?;
|
|
write_stream
|
|
.send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
|
|
.await?;
|
|
self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
|
|
},
|
|
unexpected_message => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
format!(
|
|
"Unexpected message received from server: '{:?}'",
|
|
unexpected_message
|
|
),
|
|
)));
|
|
},
|
|
}
|
|
}
|
|
},
|
|
Some(Ok(non_text_message)) => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
format!(
|
|
"Unexpected non-text message from server: {:?}",
|
|
non_text_message
|
|
),
|
|
)));
|
|
},
|
|
Some(Err(err)) => {
|
|
return Err(Box::new(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
format!("Error reading message from server: {}", err),
|
|
)));
|
|
},
|
|
None => {
|
|
self.make_log( Level::DEBUG, "No more messages from server" );
|
|
break;
|
|
},
|
|
}
|
|
},
|
|
_ = shutdown_signal.notified() => {
|
|
self.make_log( Level::INFO, &format!("Received shutdown signal") );
|
|
break;
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Operation method that requests to close the Session opened against the configured Lightstreamer
|
|
/// Server (if any).
|
|
///
|
|
/// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
|
|
///
|
|
/// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
|
|
/// are preserved to be re-subscribed to on future Sessions.
|
|
///
|
|
/// Note that the request to disconnect is accomplished by the client in a separate thread; this
|
|
/// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
|
|
/// change yet.
|
|
///
|
|
/// When the request to disconnect is finally being executed, if the status of the client is
|
|
/// "DISCONNECTED", then nothing will be done.
|
|
///
|
|
/// See also `connect()`
|
|
#[instrument]
|
|
pub async fn disconnect(&mut self) {
|
|
// Implementation for disconnect
|
|
self.make_log( Level::INFO, "Disconnecting from Lightstreamer server" );
|
|
}
|
|
|
|
/// Static inquiry method that can be used to share cookies between connections to the Server
|
|
/// (performed by this library) and connections to other sites that are performed by the application.
|
|
/// With this method, cookies received from the Server can be extracted for sending through other
|
|
/// connections, according with the URI to be accessed.
|
|
///
|
|
/// See `addCookies()` for clarifications on when cookies are directly stored by the library and
|
|
/// when not.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `uri`: the URI to which the cookies should be sent, or `None`.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// A list with the various cookies that can be sent in a HTTP request for the specified URI.
|
|
/// If a `None` URI was supplied, all available non-expired cookies will be returned.
|
|
pub fn get_cookies(_uri: Option<&str>) -> Cookie {
|
|
// Implementation for get_cookies
|
|
unimplemented!()
|
|
}
|
|
|
|
/// Returns a list containing the `ClientListener` instances that were added to this client.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// A list containing the listeners that were added to this client.
|
|
///
|
|
/// See also `addListener()`
|
|
pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
|
|
&self.listeners
|
|
}
|
|
|
|
/// Inquiry method that gets the current client status and transport (when applicable).
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// The current client status. It can be one of the following values:
|
|
///
|
|
/// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
|
|
/// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
|
|
/// and is currently verifying if a streaming connection is possible;
|
|
/// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
|
|
/// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
|
|
/// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
|
|
/// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
|
|
/// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
|
|
/// longer than a configured time;
|
|
/// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
|
|
/// (possibly after a timeout);
|
|
/// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
|
|
/// as soon as possible, as an attempt to recover the current session after a connection issue;
|
|
/// - `"DISCONNECTED"`: no connection is currently active.
|
|
///
|
|
/// See also `ClientListener.onStatusChange()`
|
|
pub fn get_status(&self) -> &ClientStatus {
|
|
&self.status
|
|
}
|
|
|
|
/// Inquiry method that returns a list containing all the `Subscription` instances that are
|
|
/// currently "active" on this `LightstreamerClient`.
|
|
///
|
|
/// Internal second-level `Subscription` are not included.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
|
|
/// The list can be empty.
|
|
///
|
|
/// See also `subscribe()`
|
|
pub fn get_subscriptions(&self) -> &Vec<Subscription> {
|
|
&self.subscriptions
|
|
}
|
|
|
|
/// Creates a new instance of `LightstreamerClient`.
|
|
///
|
|
/// The constructor initializes the client with the server address and adapter set, if provided.
|
|
/// It sets up the connection details and options for the client. If no server address or
|
|
/// adapter set is specified, those properties on the client will be `None`. This allows
|
|
/// for late configuration of these details before connecting to the Lightstreamer server.
|
|
///
|
|
/// # Arguments
|
|
/// * `server_address` - An optional reference to a string slice that represents the server
|
|
/// address to connect to. If `None`, the server address must be set later.
|
|
/// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
|
|
/// If `None`, the adapter set must be set later.
|
|
///
|
|
/// # Returns
|
|
/// A result containing the new `LightstreamerClient` instance if successful, or an
|
|
/// `IllegalStateException` if the initialization fails due to invalid state conditions.
|
|
///
|
|
/// # Panics
|
|
/// Does not panic under normal circumstances. However, unexpected internal errors during
|
|
/// the creation of internal components could cause panics, which should be considered when
|
|
/// using this function in production code.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// // Example usage of `new` to create a LightstreamerClient with specified server address and
|
|
/// // adapter set.
|
|
/// let server_address = Some("http://myserver.com");
|
|
/// let adapter_set = Some("MY_ADAPTER_SET");
|
|
/// let ls_client = LightstreamerClient::new(server_address, adapter_set);
|
|
///
|
|
/// assert!(ls_client.is_ok());
|
|
/// if let Ok(client) = ls_client {
|
|
/// assert_eq!(client.server_address.unwrap(), "http://myserver.com".to_string());
|
|
/// assert_eq!(client.adapter_set.unwrap(), "MY_ADAPTER_SET".to_string());
|
|
/// }
|
|
/// ```
|
|
pub fn new(
|
|
server_address: Option<&str>,
|
|
adapter_set: Option<&str>,
|
|
username: Option<&str>,
|
|
password: Option<&str>,
|
|
) -> Result<LightstreamerClient, Box<dyn Error>> {
|
|
let connection_details =
|
|
ConnectionDetails::new(server_address, adapter_set, username, password)?;
|
|
let connection_options = ConnectionOptions::default();
|
|
|
|
Ok(LightstreamerClient {
|
|
server_address: server_address.map(|s| s.to_string()),
|
|
adapter_set: adapter_set.map(|s| s.to_string()),
|
|
connection_details,
|
|
connection_options,
|
|
listeners: Vec::new(),
|
|
subscriptions: Vec::new(),
|
|
status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
|
|
logging: LogType::StdLogs,
|
|
})
|
|
}
|
|
|
|
/// Removes a listener from the `LightstreamerClient` instance so that it will not receive
|
|
/// events anymore.
|
|
///
|
|
/// A listener can be removed at any time.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `listener`: The listener to be removed.
|
|
///
|
|
/// See also `addListener()`
|
|
pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
|
|
unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
|
|
//self.listeners.remove(&listener);
|
|
}
|
|
|
|
/// Operation method that sends a message to the Server. The message is interpreted and handled
|
|
/// by the Metadata Adapter associated to the current Session. This operation supports in-order
|
|
/// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
|
|
/// to arrive exactly once and respecting the original order, whatever is the underlying transport
|
|
/// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
|
|
/// necessary, to reduce network round trips.
|
|
///
|
|
/// Upon subsequent calls to the method, the sequential management of the involved messages is
|
|
/// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
|
|
/// issued.
|
|
///
|
|
/// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
|
|
/// it will be resent; however, this may cause the subsequent messages to be delayed. For this
|
|
/// reason, each message can specify a "delayTimeout", which is the longest time the message,
|
|
/// after reaching the Server, can be kept waiting if one of more preceding messages haven't
|
|
/// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
|
|
/// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
|
|
/// Note that, because of the parallel transport of the messages, if a zero or very low timeout
|
|
/// is set for a message and the previous message was sent immediately before, it is possible
|
|
/// that the latter gets discarded even if no communication issues occur. The Server may also
|
|
/// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
|
|
/// long time.
|
|
///
|
|
/// Sequence identifiers can also be associated with the messages. In this case, the sequential
|
|
/// management is restricted to all subsets of messages with the same sequence identifier associated.
|
|
///
|
|
/// Notifications of the operation outcome can be received by supplying a suitable listener. The
|
|
/// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
|
|
/// are guaranteed to be invoked sequentially.
|
|
///
|
|
/// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
|
|
/// processing is guaranteed, while strict ordering and even sequentialization of the processing
|
|
/// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
|
|
/// messages that, for any reason, should fail to reach the Server whereas subsequent messages
|
|
/// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
|
|
/// the listener eventually gets a notification.
|
|
///
|
|
/// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
|
|
/// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
|
|
/// are performed at all, so as to optimize the processing and allow the highest possible throughput.
|
|
///
|
|
/// Since a message is handled by the Metadata Adapter associated to the current connection, a
|
|
/// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
|
|
/// flag is specified it is possible to call the method at any time and the client will take
|
|
/// care of sending the message as soon as a connection is available, otherwise, if the current
|
|
/// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
|
|
/// event will be fired.
|
|
///
|
|
/// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
|
|
/// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
|
|
/// flag set to `true`.
|
|
///
|
|
/// Also note that forwarding of the message to the server is made in a separate thread, hence,
|
|
/// if a message is sent while the connection is active, it could be aborted because of a subsequent
|
|
/// disconnection. In the same way a message sent while the connection is not active might be
|
|
/// sent because of a subsequent connection.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
|
|
/// associated to the current connection.
|
|
/// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
|
|
/// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
|
|
/// is supplied, the message will be processed in the special way described above. The parameter
|
|
/// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
|
|
/// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
|
|
/// timeout on missing messages, the latter will be used instead. The parameter is optional; if
|
|
/// a negative value is supplied, the Server configured timeout on missing messages will be applied.
|
|
/// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
|
|
/// timeout on missing messages still applies.
|
|
/// * `listener`: an object suitable for receiving notifications about the processing outcome. The
|
|
/// parameter is optional; if not supplied, no notification will be available.
|
|
/// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
|
|
/// status when the provided message is handled, then the message is not aborted right away but
|
|
/// is queued waiting for a new session. Note that the message can still be aborted later when
|
|
/// a new session is established.
|
|
pub fn send_message(
|
|
&mut self,
|
|
message: &str,
|
|
sequence: Option<&str>,
|
|
_delay_timeout: Option<u64>,
|
|
listener: Option<Box<dyn ClientMessageListener>>,
|
|
enqueue_while_disconnected: bool,
|
|
) {
|
|
let _sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES");
|
|
|
|
// Handle the message based on the current connection status
|
|
match &self.status {
|
|
ClientStatus::Connected(_connection_type) => {
|
|
// Send the message to the server in a separate thread
|
|
// ...
|
|
}
|
|
ClientStatus::Disconnected(_disconnection_type) => {
|
|
if enqueue_while_disconnected {
|
|
// Enqueue the message to be sent when a connection is available
|
|
// ...
|
|
} else {
|
|
// Abort the message and notify the listener
|
|
if let Some(listener) = listener {
|
|
listener.on_abort(message, false);
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
// Enqueue the message to be sent when a connection is available
|
|
// ...
|
|
}
|
|
}
|
|
unimplemented!("Complete mechanism to send message to LightstreamerClient.");
|
|
}
|
|
|
|
/// Static method that permits to configure the logging system used by the library. The logging
|
|
/// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
|
|
/// third-party logging system.
|
|
///
|
|
/// If no logging system is specified, all the generated log is discarded.
|
|
///
|
|
/// The following categories are available to be consumed:
|
|
///
|
|
/// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
|
|
/// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
|
|
/// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
|
|
/// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
|
|
/// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
|
|
/// are logged; at DEBUG level, lifecycle event details are logged.
|
|
/// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
|
|
/// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
|
|
/// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
|
|
/// - `lightstreamer.actions`: logs settings / API calls.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
|
|
/// library classes.
|
|
pub fn set_logger_provider() {
|
|
unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
|
|
}
|
|
/*
|
|
pub fn set_logger_provider(provider: LoggerProvider) {
|
|
// Implementation for set_logger_provider
|
|
}
|
|
*/
|
|
|
|
/// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
|
|
/// accept untrusted ones.
|
|
///
|
|
/// May be called only once before creating any `LightstreamerClient` instance.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `factory`: an instance of `ssl.SSLContext`
|
|
///
|
|
/// # Raises
|
|
///
|
|
/// * `IllegalArgumentException`: if the factory is `None`
|
|
/// * `IllegalStateException`: if a factory is already installed
|
|
pub fn set_trust_manager_factory() {
|
|
unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
|
|
}
|
|
/*
|
|
pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
|
|
if factory.is_none() {
|
|
return Err(IllegalArgumentException::new(
|
|
"Factory cannot be None",
|
|
));
|
|
}
|
|
|
|
// Implementation for set_trust_manager_factory
|
|
Ok(())
|
|
}
|
|
*/
|
|
|
|
/// Operation method that adds a `Subscription` to the list of "active" Subscriptions. The `Subscription`
|
|
/// cannot already be in the "active" state.
|
|
///
|
|
/// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
|
|
/// as there is a session available). Active `Subscription` are automatically persisted across different
|
|
/// sessions as long as a related unsubscribe call is not issued.
|
|
///
|
|
/// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
|
|
/// immediately enters the "active" state.
|
|
///
|
|
/// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
|
|
/// unless it is first removed from the "active" state through a call to `unsubscribe()`.
|
|
///
|
|
/// Also note that forwarding of the subscription to the server is made in a separate thread.
|
|
///
|
|
/// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
|
|
/// event.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
|
|
/// values.
|
|
///
|
|
/// See also `unsubscribe()`
|
|
pub fn subscribe(&mut self, subscription: Subscription) {
|
|
self.subscriptions.push(subscription);
|
|
// Implementation for subscribe
|
|
}
|
|
|
|
/// Operation method that removes a `Subscription` that is currently in the "active" state.
|
|
///
|
|
/// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
|
|
/// items is requested to Lightstreamer Server.
|
|
///
|
|
/// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
|
|
/// exits the "active" state.
|
|
///
|
|
/// Note that forwarding of the unsubscription to the server is made in a separate thread.
|
|
///
|
|
/// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `subscription`: An "active" `Subscription` object that was activated by this `LightstreamerClient`
|
|
/// instance.
|
|
pub fn unsubscribe(&mut self, _subscription: Subscription) {
|
|
unimplemented!("Implement mechanism to unsubscribe from LightstreamerClient.");
|
|
}
|
|
/*
|
|
pub fn unsubscribe(&mut self, subscription: &Subscription) {
|
|
if let Some(index) = self.subscriptions.iter().position(|s| s == subscription) {
|
|
self.subscriptions.remove(index);
|
|
// Implementation for unsubscribe
|
|
}
|
|
}
|
|
*/
|
|
|
|
/// Method setting enum for the logging of this instance.
|
|
///
|
|
/// Default logging type is StdLogs, corresponding to `stdout`
|
|
///
|
|
/// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
|
|
/// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
|
|
/// and its configuration and formatting.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
|
|
pub fn set_logging_type(&mut self, logging: LogType) {
|
|
self.logging = logging;
|
|
}
|
|
|
|
/// Method for logging messages
|
|
///
|
|
/// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
|
|
///
|
|
/// # Parameters
|
|
///
|
|
/// * `loglevel` Enum determining use of stdout or Tracing subscriber.
|
|
pub fn make_log(&mut self, loglevel: Level, log: &str) {
|
|
match self.logging {
|
|
LogType::StdLogs => {
|
|
println!("{}", log);
|
|
}
|
|
LogType::TracingLogs => match loglevel {
|
|
Level::INFO => {
|
|
info!(log);
|
|
}
|
|
Level::WARN => {
|
|
warn!(log);
|
|
}
|
|
Level::ERROR => {
|
|
error!(log);
|
|
}
|
|
Level::TRACE => {
|
|
trace!(log);
|
|
}
|
|
Level::DEBUG => {
|
|
debug!(log);
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
/// The transport type to be used by the client.
|
|
/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will
|
|
/// only use WebSocket based connections. If a connection over WebSocket is not possible
|
|
/// because of the environment the client will not connect at all.
|
|
/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client
|
|
/// will only use HTTP based connections. If a connection over HTTP is not possible because
|
|
/// of the environment the client will not connect at all.
|
|
/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect
|
|
/// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of
|
|
/// the environment the client will not connect at all.
|
|
/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only
|
|
/// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the
|
|
/// browser/environment the client will not connect at all.
|
|
/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
|
|
/// on Polling over WebSocket. If Polling over WebSocket is not possible because of the
|
|
/// environment the client will not connect at all.
|
|
/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
|
|
/// on Polling over HTTP. If Polling over HTTP is not possible because of the environment
|
|
/// the client will not connect at all.
|
|
#[derive(Debug, PartialEq)]
|
|
pub enum Transport {
|
|
Ws,
|
|
Http,
|
|
WsStreaming,
|
|
HttpStreaming,
|
|
WsPolling,
|
|
HttpPolling,
|
|
}
|