aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs87
1 files changed, 83 insertions, 4 deletions
diff --git a/src/main.rs b/src/main.rs
index 2e3c3c5..d782d2b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,7 +1,11 @@
use futures::stream::StreamExt;
+use futures::SinkExt;
use reqwest::Client;
+use serde_urlencoded;
use std::error::Error;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
+use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+use tokio::sync::Mutex;
async fn establish_persistent_http_connection(
session_id_shared: Arc<Mutex<String>>,
@@ -28,7 +32,7 @@ async fn establish_persistent_http_connection(
if let Some(end) = response_text.find(",50000,5000,*\r\n") {
let session_id = &response_text[start + 6..end];
println!("Session ID: {}", session_id);
- let mut session_id_lock = session_id_shared.lock().unwrap();
+ let mut session_id_lock = session_id_shared.lock().await;
*session_id_lock = session_id.to_string();
}
} else {
@@ -45,6 +49,54 @@ async fn establish_persistent_http_connection(
Ok(())
}
+/*
+// Establish a persistent WebSocket connection and handle the session creation
+async fn establish_persistent_ws_connection(
+ session_id_shared: Arc<Mutex<String>>,
+) -> Result<(), Box<dyn Error>> {
+ let ws_url = "wss://push.lightstreamer.com/lightstreamer";
+
+ let (ws_stream, _) = tokio_tungstenite::connect_async_with_config(
+ tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned()))
+ ).await.expect("Failed to connect");
+
+ let (mut write, mut read) = ws_stream.split();
+
+ // Session creation parameters
+ let params = [
+ ("LS_op2", "create_session"),
+ ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
+ ("LS_adapter_set", "DEMO"),
+ ];
+
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+
+ // Send the create session message
+ write
+ .send(Message::Text(format!("{}\n", encoded_params)))
+ .await?;
+
+ // Listen for messages from the server
+ while let Some(message) = read.next().await {
+ match message? {
+ Message::Text(text) => {
+ if text.starts_with("CONOK") {
+ let session_info: Vec<&str> = text.split(",").collect();
+ let session_id = session_info.get(1).unwrap_or(&"").to_string();
+ *session_id_shared.lock().await = session_id.clone();
+ println!("Session established with ID: {}", session_id);
+ subscribe_to_channel_ws(session_id, write).await?;
+ break; // Exit after successful subscription
+ }
+ }
+ _ => {}
+ }
+ }
+
+ Ok(())
+}
+*/
+
async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> {
let client = Client::new();
let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt";
@@ -61,6 +113,33 @@ async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error>
Ok(())
}
+// Function to subscribe to a channel using WebSocket
+async fn subscribe_to_channel_ws(
+ session_id: String,
+ mut write: futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::protocol::Message>,
+) -> Result<(), Box<dyn Error>> {
+ // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set
+ let sub_params = [
+ ("LS_table", "1"),
+ ("LS_op2", "add"),
+ ("LS_session", &session_id),
+ ("LS_id", "item1"),
+ ("LS_schema", "stock_name last_price"),
+ ("LS_mode", "MERGE"),
+ ];
+
+ let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?;
+
+ // Send the subscription message
+ write
+ .send(Message::Text(encoded_sub_params))
+ .await?;
+
+ println!("Subscribed to channel with session ID: {}", session_id);
+
+ Ok(())
+}
+
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
@@ -78,7 +157,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let session_id;
{
- session_id = session_id_shared.lock().unwrap().clone();
+ session_id = session_id_shared.lock().await.clone();
}
if !session_established && !session_id.is_empty() {
@@ -93,4 +172,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
task2.await?;
Ok(())
-}
+} \ No newline at end of file