aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs43
1 files changed, 24 insertions, 19 deletions
diff --git a/src/main.rs b/src/main.rs
index c284605..2da215c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,11 +3,12 @@ use lightstreamer_client::ls_client::{LightstreamerClient, Transport};
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
use lightstreamer_client::subscription_listener::SubscriptionListener;
+use colored::*;
use signal_hook::low_level::signal_name;
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::Arc;
-use tokio::sync::{Notify, Mutex};
+use tokio::sync::{Mutex, Notify};
const MAX_CONNECTION_ATTEMPTS: u64 = 1;
@@ -44,34 +45,33 @@ pub struct MySubscriptionListener {}
impl SubscriptionListener for MySubscriptionListener {
fn on_item_update(&self, update: &ItemUpdate) {
- println!(
- "UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}",
- update.item_name.as_ref().unwrap_or(&"N/A".to_string()),
+ let not_available = "N/A".to_string();
+ let item_name = update.item_name.clone().unwrap_or(not_available.clone());
+ let fields = vec![
"stock_name",
- update.get_value("stock_name").unwrap_or(&"N/A".to_string()),
"last_price",
- update.get_value("last_price").unwrap_or(&"N/A".to_string()),
"time",
- update.get_value("time").unwrap_or(&"N/A".to_string()),
"pct_change",
- update.get_value("pct_change").unwrap_or(&"N/A".to_string()),
"bid_quantity",
- update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()),
"bid",
- update.get_value("bid").unwrap_or(&"N/A".to_string()),
"ask",
- update.get_value("ask").unwrap_or(&"N/A".to_string()),
"ask_quantity",
- update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()),
"min",
- update.get_value("min").unwrap_or(&"N/A".to_string()),
"max",
- update.get_value("max").unwrap_or(&"N/A".to_string()),
"ref_price",
- update.get_value("ref_price").unwrap_or(&"N/A".to_string()),
"open_price",
- update.get_value("open_price").unwrap_or(&"N/A".to_string()),
- );
+ ];
+ let mut output = String::new();
+ for field in fields {
+ let value = update.get_value(field).unwrap_or(&not_available).clone();
+ let value_str = if update.changed_fields.contains_key(field) {
+ value.yellow().to_string()
+ } else {
+ value.to_string()
+ };
+ output.push_str(&format!("{}: {}, ", field, value_str));
+ }
+ println!("{}, {}", item_name, output);
}
}
@@ -128,7 +128,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
{
let mut client = client.lock().await;
client.subscribe(my_subscription);
- client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
+ client
+ .connection_options
+ .set_forced_transport(Some(Transport::WsStreaming));
}
// Create a new Notify instance to send a shutdown signal to the signal handler thread.
@@ -163,7 +165,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
if retry_counter == MAX_CONNECTION_ATTEMPTS {
- println!("Failed to connect after {} retries. Exiting...", retry_counter);
+ println!(
+ "Failed to connect after {} retries. Exiting...",
+ retry_counter
+ );
} else {
println!("Exiting orderly from Lightstreamer client...");
}