diff options
author | 2024-02-18 12:57:11 +0100 | |
---|---|---|
committer | 2024-02-18 12:57:11 +0100 | |
commit | cd3ffbf75a12f4e804c240552ee4f8b68a02a4cc (patch) | |
tree | d52dd4070055c61c475b5e8d89b10023638a1ee8 | |
parent | 3cf9cf9ecdf6dd2649300a753c0dd59ad389f13a (diff) |
Added function to subscribe to channel on Lightstreamer server and started using tokio for multithreading.
-rw-r--r-- | src/main.rs | 179 |
1 files changed, 49 insertions, 130 deletions
diff --git a/src/main.rs b/src/main.rs index 481bc9c..2e3c3c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,20 @@ +use futures::stream::StreamExt; use reqwest::Client; -use futures::stream::StreamExt; // Actualizado para usar futures::StreamExt -use tokio::runtime::Runtime; +use std::error::Error; use std::sync::{Arc, Mutex}; -use std::thread; -async fn establish_persistent_http_connection(session_id_shared: Arc<Mutex<String>>) -> Result<(), reqwest::Error> { +async fn establish_persistent_http_connection( + session_id_shared: Arc<Mutex<String>>, +) -> Result<(), reqwest::Error> { let client = Client::new(); - let params = [("LS_adapter_set", "DEMO"), ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg")]; - let request_url = "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0"; + let params = [ + ("LS_adapter_set", "DEMO"), + ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"), + ]; + let request_url = + "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0"; - let response = client.post(request_url) - .form(¶ms) - .send() - .await?; + let response = client.post(request_url).form(¶ms).send().await?; if response.status().is_success() { let mut stream = response.bytes_stream(); @@ -20,18 +22,19 @@ async fn establish_persistent_http_connection(session_id_shared: Arc<Mutex<Strin while let Some(item) = stream.next().await { match item { Ok(bytes) => { - let response_text = String::from_utf8(bytes.to_vec()).expect("Failed to convert bytes to string"); - // Intenta extraer el ID de sesión de la primera respuesta + let response_text = String::from_utf8(bytes.to_vec()) + .expect("Failed to convert bytes to string"); if let Some(start) = response_text.find("CONOK,") { if let Some(end) = response_text.find(",50000,5000,*\r\n") { let session_id = &response_text[start + 6..end]; println!("Session ID: {}", session_id); - // Guardar el session_id en la variable compartida let mut session_id_lock = session_id_shared.lock().unwrap(); *session_id_lock = session_id.to_string(); } + } else { + println!("New message: {}", response_text); } - }, + } Err(e) => println!("Error while receiving: {:?}", e), } } @@ -42,136 +45,52 @@ async fn establish_persistent_http_connection(session_id_shared: Arc<Mutex<Strin Ok(()) } -fn main() { - let rt = Runtime::new().unwrap(); +async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> { + let client = Client::new(); + let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt"; + let params = [("LS_session", &session_id)]; + + let response = client.post(subscribe_url).form(¶ms).send().await?; + + if response.status().is_success() { + println!("Subscription successful!"); + } else { + println!("Subscription failed: {}", response.status()); + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { - // Crear una variable compartida para almacenar el session_id let session_id_shared = Arc::new(Mutex::new(String::new())); let session_id_shared_clone = session_id_shared.clone(); - // Lanzar la función establecer conexión en un nuevo hilo de Tokio - rt.spawn(async move { + let task1 = tokio::spawn(async move { establish_persistent_http_connection(session_id_shared_clone).await.unwrap(); }); - // Crear otro hilo para acceder al session_id - thread::spawn(move || { + println!("Established connection to Lightstreamer server"); + let task2 = tokio::spawn(async move { + let mut session_established = false; loop { - // Simular un delay para esperar a que el session_id esté listo - thread::sleep(std::time::Duration::from_secs(5)); - let session_id = session_id_shared.lock().unwrap(); - if !session_id.is_empty() { - println!("Accessed Session ID from another thread: {}", *session_id); - break; // Salir del bucle si ya se obtuvo el session_id + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let session_id; + { + session_id = session_id_shared.lock().unwrap().clone(); } - } - }) - .join() - .unwrap(); -} - - -/* -use futures::stream::StreamExt; -use serde::{Deserialize, Serialize}; -use reqwest::Client; -use reqwest::Error; -use tokio::time::sleep; -use std::time::Duration; -use std::string::FromUtf8Error; -use std::sync::Arc; - -#[derive(Debug, Deserialize, Serialize, Clone)] -struct Session { - key: String, - server: String, -} - -async fn establish_connection(client: &Client, server: &str) -> Result<Session, Error> { - - println!("Establishing connection to server: {}", server); - let response = client.post(format!("{}/lightstreamer/create_session.txt", server)) - .body("LS_adapter_set=DEMO&LS_cid=mgQkwtwdysogQz2BJ4Ji%20kOj2Bg") - .send() - .await?; - - let mut stream = response.bytes_stream(); - - while let Some(item) = stream.next().await { - match item { - Ok(bytes) => { - // Convierte cualquier error de FromUtf8Error a Box<dyn Error> - let message = String::from_utf8(bytes.to_vec()).map_err(|e| Box::new(e) as Box<dyn std::error::Error>); - println!("Received: {:?}", message); - }, - Err(e) => { - eprintln!("Error while reading from stream: {}", e); - break; + if !session_established && !session_id.is_empty() { + println!("Accessed Session ID from another thread: {}", session_id); + session_established = true; + subscribe_to_channel(session_id).await.unwrap(); } } - } - - let session = Session { - key: "S2520d1412903a84dM42fT4356206".to_string(), - server: server.to_string(), - }; - - Ok(session) -} - -async fn keep_alive(session: Arc<Session>) { - let client = Client::new(); - - loop { - let response = client - .post(&format!("{}/lightstreamer/control.txt", session.server)) - .body("LS_op=probe") - .send() - .await; - - if let Err(error) = response { - eprintln!("Error en keep_alive: {}", error); - } - - sleep(Duration::from_secs(5)).await; - } -} - -async fn subscribe_to_channel(session: &Session, channel_id: &str) -> Result<(), Error> { - let client = Client::new(); - - let response = client - .post(&format!("{}/lightstreamer/control.txt", session.server)) - .body(format!( - "LS_op=subscribe&LS_session={}&LS_id={}&LS_mode=DISTINCT", - session.key, channel_id - )) - .send() - .await?; - - let messages = response.json::<Vec<serde_json::Value>>().await?; - - for message in messages { - println!("Message: {:?}", message); - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - let client = Client::new(); - let session = establish_connection(&client, "http://push.lightstreamer.com").await?; - let session_arc = Arc::new(session); - - let keep_alive_session = session_arc.clone(); - tokio::spawn(async move { - keep_alive(keep_alive_session).await; }); - subscribe_to_channel(&session_arc, "chat_room").await?; + task1.await?; + task2.await?; Ok(()) } -*/
\ No newline at end of file |