diff options
author | 2024-03-24 20:39:38 +0100 | |
---|---|---|
committer | 2024-03-24 20:39:43 +0100 | |
commit | 7e1eb27a06e5545b3d1b77b5998dc0463df27d70 (patch) | |
tree | 8c9569d9574e3321390521bf855ad9d39a187db2 /src/main.rs | |
parent | dfd6b4a2b7494854096663be73dc9255ff14c7d8 (diff) |
Created structure and scaffolding for the Lightstreamer client.
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 87 |
1 files changed, 83 insertions, 4 deletions
diff --git a/src/main.rs b/src/main.rs index 2e3c3c5..d782d2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ use futures::stream::StreamExt; +use futures::SinkExt; use reqwest::Client; +use serde_urlencoded; use std::error::Error; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use tokio::sync::Mutex; async fn establish_persistent_http_connection( session_id_shared: Arc<Mutex<String>>, @@ -28,7 +32,7 @@ async fn establish_persistent_http_connection( if let Some(end) = response_text.find(",50000,5000,*\r\n") { let session_id = &response_text[start + 6..end]; println!("Session ID: {}", session_id); - let mut session_id_lock = session_id_shared.lock().unwrap(); + let mut session_id_lock = session_id_shared.lock().await; *session_id_lock = session_id.to_string(); } } else { @@ -45,6 +49,54 @@ async fn establish_persistent_http_connection( Ok(()) } +/* +// Establish a persistent WebSocket connection and handle the session creation +async fn establish_persistent_ws_connection( + session_id_shared: Arc<Mutex<String>>, +) -> Result<(), Box<dyn Error>> { + let ws_url = "wss://push.lightstreamer.com/lightstreamer"; + + let (ws_stream, _) = tokio_tungstenite::connect_async_with_config( + tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned())) + ).await.expect("Failed to connect"); + + let (mut write, mut read) = ws_stream.split(); + + // Session creation parameters + let params = [ + ("LS_op2", "create_session"), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"), + ("LS_adapter_set", "DEMO"), + ]; + + let encoded_params = serde_urlencoded::to_string(¶ms)?; + + // Send the create session message + write + .send(Message::Text(format!("{}\n", encoded_params))) + .await?; + + // Listen for messages from the server + while let Some(message) = read.next().await { + match message? { + Message::Text(text) => { + if text.starts_with("CONOK") { + let session_info: Vec<&str> = text.split(",").collect(); + let session_id = session_info.get(1).unwrap_or(&"").to_string(); + *session_id_shared.lock().await = session_id.clone(); + println!("Session established with ID: {}", session_id); + subscribe_to_channel_ws(session_id, write).await?; + break; // Exit after successful subscription + } + } + _ => {} + } + } + + Ok(()) +} +*/ + async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> { let client = Client::new(); let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt"; @@ -61,6 +113,33 @@ async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> Ok(()) } +// Function to subscribe to a channel using WebSocket +async fn subscribe_to_channel_ws( + session_id: String, + mut write: futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::protocol::Message>, +) -> Result<(), Box<dyn Error>> { + // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set + let sub_params = [ + ("LS_table", "1"), + ("LS_op2", "add"), + ("LS_session", &session_id), + ("LS_id", "item1"), + ("LS_schema", "stock_name last_price"), + ("LS_mode", "MERGE"), + ]; + + let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?; + + // Send the subscription message + write + .send(Message::Text(encoded_sub_params)) + .await?; + + println!("Subscribed to channel with session ID: {}", session_id); + + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { @@ -78,7 +157,7 @@ async fn main() -> Result<(), Box<dyn Error>> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; let session_id; { - session_id = session_id_shared.lock().unwrap().clone(); + session_id = session_id_shared.lock().await.clone(); } if !session_established && !session_id.is_empty() { @@ -93,4 +172,4 @@ async fn main() -> Result<(), Box<dyn Error>> { task2.await?; Ok(()) -} +}
\ No newline at end of file |