aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 21:24:12 +0200
committerLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 21:24:12 +0200
commit65282048aefecda06f937ed670755b64b19ce9b1 (patch)
tree1a026ad96955820ab72b02dba79c3c3b8662cacd /src
parent5af8a69942aeed85b5cd2bd84cb5f33169834690 (diff)
⬆️ (Cargo.toml): Bump lightstreamer-client version from 0.1.8 to 0.1.9
✨ (Cargo.toml): Add colored dependency to enhance console output 📝 (README.md): Overhaul documentation to provide comprehensive details about the project, its features, usage, and contribution guidelines 💡 (client_listener.rs, client_message_listener.rs): Add newline at end of file to adhere to POSIX standards ♻️ (connection_options.rs): Refactor code to improve readability and maintainability 🐛 (connection_options.rs): Rename 'reduce_head' to '_reduce_head' to indicate unused variable ♻️ (error.rs): Reorder imports and adjust formatting for consistency ♻️ (item_update.rs): Refactor code to improve readability and maintainability ♻️ (lib.rs): Reorder module imports for better organization ♻️ (ls_client.rs): Refactor code to improve readability and maintainability 💡 (main.rs): Update print statements to use colored output for changed fields 🐛 (main.rs): Fix import order to follow Rust's idiomatic style 🐛 (proxy.rs, subscription.rs, subscription_listener.rs): Add newline at end of file to follow POSIX standard ♻️ (subscription.rs, subscription_listener.rs): Refactor code to improve readability and maintainability 📝 (util.rs): Add newline at end of file to adhere to POSIX standards
Diffstat (limited to 'src')
-rw-r--r--src/client_listener.rs2
-rw-r--r--src/client_message_listener.rs2
-rw-r--r--src/connection_options.rs80
-rw-r--r--src/error.rs12
-rw-r--r--src/item_update.rs14
-rw-r--r--src/lib.rs6
-rw-r--r--src/ls_client.rs5
-rw-r--r--src/main.rs43
-rw-r--r--src/proxy.rs2
-rw-r--r--src/subscription.rs73
-rw-r--r--src/subscription_listener.rs24
-rw-r--r--src/util.rs2
12 files changed, 167 insertions, 98 deletions
diff --git a/src/client_listener.rs b/src/client_listener.rs
index cb2a73f..e6e2fee 100644
--- a/src/client_listener.rs
+++ b/src/client_listener.rs
@@ -185,4 +185,4 @@ pub trait ClientListener: Debug + Send {
// Implementation for on_status_change
unimplemented!("Implement on_status_change method for ClientListener");
}
-} \ No newline at end of file
+}
diff --git a/src/client_message_listener.rs b/src/client_message_listener.rs
index e6f5fc0..1f77027 100644
--- a/src/client_message_listener.rs
+++ b/src/client_message_listener.rs
@@ -71,4 +71,4 @@ pub trait ClientMessageListener {
// Implementation for on_processed
unimplemented!("Implement on_processed method for ClientMessageListener.");
}
-} \ No newline at end of file
+}
diff --git a/src/connection_options.rs b/src/connection_options.rs
index e9b3ab0..3c3485f 100644
--- a/src/connection_options.rs
+++ b/src/connection_options.rs
@@ -30,7 +30,7 @@ pub struct ConnectionOptions {
slowing_enabled: bool,
stalled_timeout: u64,
send_sync: bool,
- reduce_head: bool,
+ _reduce_head: bool,
supported_diffs: Option<String>,
polling: bool,
ttl_millis: Option<u64>,
@@ -59,7 +59,7 @@ impl ConnectionOptions {
stalled_timeout: 2000,
server_instance_address_ignored: false,
send_sync: true,
- reduce_head: false,
+ _reduce_head: false,
supported_diffs: None,
polling: false,
ttl_millis: None,
@@ -334,7 +334,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative or zero value is configured
- pub fn set_content_length(&mut self, content_length: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_content_length(
+ &mut self,
+ content_length: u64,
+ ) -> Result<(), IllegalArgumentException> {
if content_length == 0 {
return Err(IllegalArgumentException::new(
"Content length cannot be zero",
@@ -369,7 +372,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative or zero value is configured
- pub fn set_first_retry_max_delay(&mut self, first_retry_max_delay: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_first_retry_max_delay(
+ &mut self,
+ first_retry_max_delay: u64,
+ ) -> Result<(), IllegalArgumentException> {
if first_retry_max_delay == 0 {
return Err(IllegalArgumentException::new(
"First retry max delay cannot be zero",
@@ -487,7 +493,8 @@ impl ConnectionOptions {
&mut self,
http_extra_headers_on_session_creation_only: bool,
) {
- self.http_extra_headers_on_session_creation_only = http_extra_headers_on_session_creation_only;
+ self.http_extra_headers_on_session_creation_only =
+ http_extra_headers_on_session_creation_only;
}
/// Setter method that sets the maximum time the Server is allowed to wait for any data to
@@ -521,9 +528,7 @@ impl ConnectionOptions {
/// * `IllegalArgumentException`: if a negative value is configured
pub fn set_idle_timeout(&mut self, idle_timeout: u64) -> Result<(), IllegalArgumentException> {
if idle_timeout == 0 {
- return Err(IllegalArgumentException::new(
- "Idle timeout cannot be zero",
- ));
+ return Err(IllegalArgumentException::new("Idle timeout cannot be zero"));
}
self.idle_timeout = idle_timeout;
@@ -558,13 +563,17 @@ impl ConnectionOptions {
/// See also `setStalledTimeout()`
///
/// See also `setReconnectTimeout()`
- pub fn set_keepalive_interval(&mut self, keepalive_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_keepalive_interval(
+ &mut self,
+ keepalive_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if keepalive_interval == 0 {
self.keepalive_interval = keepalive_interval;
return Ok(());
}
- if keepalive_interval < self.stalled_timeout || keepalive_interval < self.reconnect_timeout {
+ if keepalive_interval < self.stalled_timeout || keepalive_interval < self.reconnect_timeout
+ {
return Err(IllegalArgumentException::new(
"Keepalive interval should be greater than or equal to stalled timeout and reconnect timeout",
));
@@ -616,7 +625,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is configured
- pub fn set_polling_interval(&mut self, polling_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_polling_interval(
+ &mut self,
+ polling_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if polling_interval == 0 {
self.polling_interval = polling_interval;
return Ok(());
@@ -675,7 +687,10 @@ impl ConnectionOptions {
/// See also `setStalledTimeout()`
///
/// See also `setKeepaliveInterval()`
- pub fn set_reconnect_timeout(&mut self, reconnect_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_reconnect_timeout(
+ &mut self,
+ reconnect_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if reconnect_timeout == 0 {
return Err(IllegalArgumentException::new(
"Reconnect timeout cannot be zero",
@@ -719,7 +734,10 @@ impl ConnectionOptions {
/// values) is passed.
///
/// See also `get_real_max_bandwidth()`
- pub fn set_requested_max_bandwidth(&mut self, max_bandwidth: Option<f64>) -> Result<(), IllegalArgumentException> {
+ pub fn set_requested_max_bandwidth(
+ &mut self,
+ max_bandwidth: Option<f64>,
+ ) -> Result<(), IllegalArgumentException> {
if let Some(bandwidth) = max_bandwidth {
if bandwidth <= 0.0 {
return Err(IllegalArgumentException::new(
@@ -781,9 +799,7 @@ impl ConnectionOptions {
/// See also `setFirstRetryMaxDelay()`
pub fn set_retry_delay(&mut self, retry_delay: u64) -> Result<(), IllegalArgumentException> {
if retry_delay == 0 {
- return Err(IllegalArgumentException::new(
- "Retry delay cannot be zero",
- ));
+ return Err(IllegalArgumentException::new("Retry delay cannot be zero"));
}
self.retry_delay = retry_delay;
@@ -836,7 +852,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is configured
- pub fn set_reverse_heartbeat_interval(&mut self, reverse_heartbeat_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_reverse_heartbeat_interval(
+ &mut self,
+ reverse_heartbeat_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if reverse_heartbeat_interval == 0 {
self.reverse_heartbeat_interval = reverse_heartbeat_interval;
return Ok(());
@@ -919,7 +938,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is passed.
- pub fn set_session_recovery_timeout(&mut self, session_recovery_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_session_recovery_timeout(
+ &mut self,
+ session_recovery_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if session_recovery_timeout == 0 {
self.session_recovery_timeout = session_recovery_timeout;
return Ok(());
@@ -987,7 +1009,10 @@ impl ConnectionOptions {
/// See also `setReconnectTimeout()`
///
/// See also `setKeepaliveInterval()`
- pub fn set_stalled_timeout(&mut self, stalled_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_stalled_timeout(
+ &mut self,
+ stalled_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if stalled_timeout == 0 {
return Err(IllegalArgumentException::new(
"Stalled timeout cannot be zero",
@@ -1101,7 +1126,10 @@ impl Debug for ConnectionOptions {
.field("first_retry_max_delay", &self.first_retry_max_delay)
.field("forced_transport", &self.forced_transport)
.field("http_extra_headers", &self.http_extra_headers)
- .field("http_extra_headers_on_session_creation_only", &self.http_extra_headers_on_session_creation_only)
+ .field(
+ "http_extra_headers_on_session_creation_only",
+ &self.http_extra_headers_on_session_creation_only,
+ )
.field("idle_timeout", &self.idle_timeout)
.field("keepalive_interval", &self.keepalive_interval)
.field("polling_interval", &self.polling_interval)
@@ -1110,8 +1138,14 @@ impl Debug for ConnectionOptions {
.field("reconnect_timeout", &self.reconnect_timeout)
.field("requested_max_bandwidth", &self.requested_max_bandwidth)
.field("retry_delay", &self.retry_delay)
- .field("reverse_heartbeat_interval", &self.reverse_heartbeat_interval)
- .field("server_instance_address_ignored", &self.server_instance_address_ignored)
+ .field(
+ "reverse_heartbeat_interval",
+ &self.reverse_heartbeat_interval,
+ )
+ .field(
+ "server_instance_address_ignored",
+ &self.server_instance_address_ignored,
+ )
.field("session_recovery_timeout", &self.session_recovery_timeout)
.field("slowing_enabled", &self.slowing_enabled)
.field("stalled_timeout", &self.stalled_timeout)
@@ -1133,7 +1167,7 @@ impl Default for ConnectionOptions {
proxy: None,
real_max_bandwidth: None,
reconnect_timeout: 3000,
- reduce_head: false,
+ _reduce_head: false,
requested_max_bandwidth: None,
retry_delay: 4000,
reverse_heartbeat_interval: 0,
diff --git a/src/error.rs b/src/error.rs
index cd1d7c2..33c6ff8 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,5 +1,5 @@
-use std::fmt;
use std::error::Error;
+use std::fmt;
#[derive(Debug)]
pub struct IllegalArgumentException(String);
@@ -24,18 +24,20 @@ impl Error for IllegalArgumentException {
#[derive(Debug)]
pub struct IllegalStateException {
- details: String
+ details: String,
}
impl IllegalStateException {
pub fn new(msg: &str) -> IllegalStateException {
- IllegalStateException{details: msg.to_string()}
+ IllegalStateException {
+ details: msg.to_string(),
+ }
}
}
impl fmt::Display for IllegalStateException {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f,"{}",self.details)
+ write!(f, "{}", self.details)
}
}
@@ -43,4 +45,4 @@ impl Error for IllegalStateException {
fn description(&self) -> &str {
&self.details
}
-} \ No newline at end of file
+}
diff --git a/src/item_update.rs b/src/item_update.rs
index 7c43d93..fd95f90 100644
--- a/src/item_update.rs
+++ b/src/item_update.rs
@@ -133,7 +133,10 @@ impl ItemUpdate {
.iter()
.find(|(name, _)| self.get_field_position(name) == pos)
.and_then(|(_, value)| value.as_deref()),
- Err(_) => self.fields.get(field_name_or_pos).and_then(|v| v.as_deref()),
+ Err(_) => self
+ .fields
+ .get(field_name_or_pos)
+ .and_then(|v| v.as_deref()),
}
}
@@ -162,10 +165,7 @@ impl ItemUpdate {
/// # Returns
/// A JSON Patch structure representing the difference between the new value and the previous one,
/// or None if the difference in JSON Patch format is not available for any reason.
- pub fn get_value_as_json_patch_if_available(
- &self,
- _field_name_or_pos: &str,
- ) -> Option<String> {
+ pub fn get_value_as_json_patch_if_available(&self, _field_name_or_pos: &str) -> Option<String> {
// Implementation pending
None
}
@@ -232,10 +232,10 @@ impl ItemUpdate {
///
/// # Returns
/// The 1-based position of the field within the field list or field schema.
- fn get_field_position(&self, field_name: &str) -> usize {
+ fn get_field_position(&self, _field_name: &str) -> usize {
// Implementation pending
// This method should return the 1-based position of the field based on the field list or field schema
// If the field is not found, it should raise an IllegalArgumentException
unimplemented!()
}
-} \ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index f42f13a..5200961 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,11 +1,11 @@
pub mod client_listener;
pub mod client_message_listener;
-pub mod error;
-pub mod item_update;
-pub mod subscription_listener;
pub mod connection_details;
pub mod connection_options;
+pub mod error;
+pub mod item_update;
pub mod ls_client;
pub mod proxy;
pub mod subscription;
+pub mod subscription_listener;
pub mod util;
diff --git a/src/ls_client.rs b/src/ls_client.rs
index b65885c..39d7c8b 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -315,7 +315,8 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
- let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new();
+ let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
+ HashMap::new();
loop {
tokio::select! {
message = read_stream.next() => {
@@ -488,7 +489,7 @@ impl LightstreamerClient {
// is always a snapshot.
if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
if let Some(_) = item_updates.get(&(item_index)) {
- // Item update already exists in item_updates, so it's not a snapshot.
+ // Item update already exists in item_updates, so it's not a snapshot.
false
} else {
// Item update doesn't exist in item_updates, so the first update is always a snapshot.
diff --git a/src/main.rs b/src/main.rs
index c284605..2da215c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,11 +3,12 @@ use lightstreamer_client::ls_client::{LightstreamerClient, Transport};
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
use lightstreamer_client::subscription_listener::SubscriptionListener;
+use colored::*;
use signal_hook::low_level::signal_name;
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::Arc;
-use tokio::sync::{Notify, Mutex};
+use tokio::sync::{Mutex, Notify};
const MAX_CONNECTION_ATTEMPTS: u64 = 1;
@@ -44,34 +45,33 @@ pub struct MySubscriptionListener {}
impl SubscriptionListener for MySubscriptionListener {
fn on_item_update(&self, update: &ItemUpdate) {
- println!(
- "UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}",
- update.item_name.as_ref().unwrap_or(&"N/A".to_string()),
+ let not_available = "N/A".to_string();
+ let item_name = update.item_name.clone().unwrap_or(not_available.clone());
+ let fields = vec![
"stock_name",
- update.get_value("stock_name").unwrap_or(&"N/A".to_string()),
"last_price",
- update.get_value("last_price").unwrap_or(&"N/A".to_string()),
"time",
- update.get_value("time").unwrap_or(&"N/A".to_string()),
"pct_change",
- update.get_value("pct_change").unwrap_or(&"N/A".to_string()),
"bid_quantity",
- update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()),
"bid",
- update.get_value("bid").unwrap_or(&"N/A".to_string()),
"ask",
- update.get_value("ask").unwrap_or(&"N/A".to_string()),
"ask_quantity",
- update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()),
"min",
- update.get_value("min").unwrap_or(&"N/A".to_string()),
"max",
- update.get_value("max").unwrap_or(&"N/A".to_string()),
"ref_price",
- update.get_value("ref_price").unwrap_or(&"N/A".to_string()),
"open_price",
- update.get_value("open_price").unwrap_or(&"N/A".to_string()),
- );
+ ];
+ let mut output = String::new();
+ for field in fields {
+ let value = update.get_value(field).unwrap_or(&not_available).clone();
+ let value_str = if update.changed_fields.contains_key(field) {
+ value.yellow().to_string()
+ } else {
+ value.to_string()
+ };
+ output.push_str(&format!("{}: {}, ", field, value_str));
+ }
+ println!("{}, {}", item_name, output);
}
}
@@ -128,7 +128,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
{
let mut client = client.lock().await;
client.subscribe(my_subscription);
- client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
+ client
+ .connection_options
+ .set_forced_transport(Some(Transport::WsStreaming));
}
// Create a new Notify instance to send a shutdown signal to the signal handler thread.
@@ -163,7 +165,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
if retry_counter == MAX_CONNECTION_ATTEMPTS {
- println!("Failed to connect after {} retries. Exiting...", retry_counter);
+ println!(
+ "Failed to connect after {} retries. Exiting...",
+ retry_counter
+ );
} else {
println!("Exiting orderly from Lightstreamer client...");
}
diff --git a/src/proxy.rs b/src/proxy.rs
index 9a8add8..a420e67 100644
--- a/src/proxy.rs
+++ b/src/proxy.rs
@@ -80,4 +80,4 @@ pub enum ProxyType {
Socks4,
/// SOCKS5 proxy.
Socks5,
-} \ No newline at end of file
+}
diff --git a/src/subscription.rs b/src/subscription.rs
index 9252ee6..135996b 100644
--- a/src/subscription.rs
+++ b/src/subscription.rs
@@ -39,18 +39,6 @@ pub enum SubscriptionMode {
Command,
}
-impl SubscriptionMode {
- fn from_str(s: &str) -> Result<SubscriptionMode, String> {
- match s.to_lowercase().as_str() {
- "merge" => Ok(SubscriptionMode::Merge),
- "distinct" => Ok(SubscriptionMode::Distinct),
- "raw" => Ok(SubscriptionMode::Raw),
- "command" => Ok(SubscriptionMode::Command),
- _ => Err(format!("Invalid subscription mode: {}", s)),
- }
- }
-}
-
impl ToString for SubscriptionMode {
fn to_string(&self) -> String {
match self {
@@ -171,15 +159,15 @@ impl Subscription {
/// # See also
/// `addListener()`
pub fn remove_listener<T>(&mut self, listener: &T)
- where
- T: SubscriptionListener,
- {
- self.listeners.retain(|l| {
- let l_ref = l.as_ref() as &dyn SubscriptionListener;
- let listener_ref = listener as &dyn SubscriptionListener;
- !(std::ptr::addr_of!(*l_ref) == std::ptr::addr_of!(*listener_ref))
- });
- }
+ where
+ T: SubscriptionListener,
+ {
+ self.listeners.retain(|l| {
+ let l_ref = l.as_ref() as &dyn SubscriptionListener;
+ let listener_ref = listener as &dyn SubscriptionListener;
+ !(std::ptr::addr_of!(*l_ref) == std::ptr::addr_of!(*listener_ref))
+ });
+ }
/// Returns a list containing the SubscriptionListener instances that were added to this client.
///
@@ -404,7 +392,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFieldSchema()`
- pub fn set_command_second_level_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
+ pub fn set_command_second_level_data_adapter(
+ &mut self,
+ adapter: Option<String>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -457,7 +448,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFields()`
- pub fn set_command_second_level_field_schema(&mut self, schema: Option<String>) -> Result<(), String> {
+ pub fn set_command_second_level_field_schema(
+ &mut self,
+ schema: Option<String>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -511,7 +505,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFieldSchema()`
- pub fn set_command_second_level_fields(&mut self, fields: Option<Vec<String>>) -> Result<(), String> {
+ pub fn set_command_second_level_fields(
+ &mut self,
+ fields: Option<Vec<String>>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -763,7 +760,12 @@ impl Subscription {
///
/// # Returns
/// The current value for the specified field of the specified key within the specified item (possibly `None`), or `None` if the specified key has not been added yet (note that it might have been added and eventually deleted).
- pub fn get_command_value(&self, item_pos: usize, key: &str, field_pos: usize) -> Option<&String> {
+ pub fn get_command_value(
+ &self,
+ item_pos: usize,
+ key: &str,
+ field_pos: usize,
+ ) -> Option<&String> {
let key = format!("{}_{}", item_pos, key);
self.command_values
.get(&key)
@@ -842,7 +844,9 @@ impl Subscription {
return None;
}
if let Some(ref schema) = self.field_schema {
- return schema.split(',').position(|field| field.trim() == "command");
+ return schema
+ .split(',')
+ .position(|field| field.trim() == "command");
}
None
}
@@ -896,9 +900,18 @@ impl Debug for Subscription {
.field("field_schema", &self.field_schema)
.field("fields", &self.fields)
.field("data_adapter", &self.data_adapter)
- .field("command_second_level_data_adapter", &self.command_second_level_data_adapter)
- .field("command_second_level_field_schema", &self.command_second_level_field_schema)
- .field("command_second_level_fields", &self.command_second_level_fields)
+ .field(
+ "command_second_level_data_adapter",
+ &self.command_second_level_data_adapter,
+ )
+ .field(
+ "command_second_level_field_schema",
+ &self.command_second_level_field_schema,
+ )
+ .field(
+ "command_second_level_fields",
+ &self.command_second_level_fields,
+ )
.field("requested_buffer_size", &self.requested_buffer_size)
.field("requested_max_frequency", &self.requested_max_frequency)
.field("requested_snapshot", &self.requested_snapshot)
@@ -907,4 +920,4 @@ impl Debug for Subscription {
.field("is_subscribed", &self.is_subscribed)
.finish()
}
-} \ No newline at end of file
+}
diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs
index 72958fe..d05a6a3 100644
--- a/src/subscription_listener.rs
+++ b/src/subscription_listener.rs
@@ -57,7 +57,9 @@ pub trait SubscriptionListener: Send {
/// - `Subscription::set_command_second_level_field_schema()`
fn on_command_second_level_item_lost_updates(&mut self, _lost_updates: u32, _key: &str) {
// Default implementation does nothing.
- unimplemented!("Implement on_command_second_level_item_lost_updates method for SubscriptionListener.");
+ unimplemented!(
+ "Implement on_command_second_level_item_lost_updates method for SubscriptionListener."
+ );
}
/// Event handler that is called when the Server notifies an error on a second-level subscription.
@@ -89,9 +91,16 @@ pub trait SubscriptionListener: Send {
/// - `ConnectionDetails::set_adapter_set()`
/// - `Subscription::set_command_second_level_fields()`
/// - `Subscription::set_command_second_level_field_schema()`
- fn on_command_second_level_subscription_error(&mut self, _code: i32, _message: Option<&str>, _key: &str) {
+ fn on_command_second_level_subscription_error(
+ &mut self,
+ _code: i32,
+ _message: Option<&str>,
+ _key: &str,
+ ) {
// Default implementation does nothing.
- unimplemented!("Implement on_command_second_level_subscription_error method for SubscriptionListener.");
+ unimplemented!(
+ "Implement on_command_second_level_subscription_error method for SubscriptionListener."
+ );
}
/// Event handler that is called by Lightstreamer to notify that all snapshot events for an item
@@ -143,7 +152,12 @@ pub trait SubscriptionListener: Send {
/// # See also
///
/// - `Subscription::set_requested_max_frequency()`
- fn on_item_lost_updates(&mut self, _item_name: Option<&str>, _item_pos: usize, _lost_updates: u32) {
+ fn on_item_lost_updates(
+ &mut self,
+ _item_name: Option<&str>,
+ _item_pos: usize,
+ _lost_updates: u32,
+ ) {
// Default implementation does nothing.
unimplemented!("Implement on_item_lost_updates method for SubscriptionListener.");
}
@@ -273,4 +287,4 @@ pub trait SubscriptionListener: Send {
fn on_unsubscription(&mut self) {
// Default implementation does nothing.
}
-} \ No newline at end of file
+}
diff --git a/src/util.rs b/src/util.rs
index 8de27e3..60b2967 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -1,4 +1,4 @@
/// Clean the message from newlines and carriage returns and convert it to lowercase.
pub fn clean_message(text: &str) -> String {
text.replace("\n", "").replace("\r", "").to_lowercase()
-} \ No newline at end of file
+}