From b5547acb4263779815ac0c375741aa4aebb52884 Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 10 Dec 2025 16:04:14 -0500 Subject: [PATCH] fix server metrics framing issue --- infra/enginewhy-server.rs | 8 +---- src/main.rs | 76 ++++++++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/infra/enginewhy-server.rs b/infra/enginewhy-server.rs index 8786484..4614649 100644 --- a/infra/enginewhy-server.rs +++ b/infra/enginewhy-server.rs @@ -146,20 +146,14 @@ async fn main() -> std::io::Result<()> { } println!(); - // Identify this process (client) by the local socket address used to connect - let server_identifier = match stream.local_addr() { - Ok(addr) => addr.to_string(), - Err(_) => format!("localhost:{}", PORT), - }; - let mut packet: HashMap = HashMap::new(); - packet.insert("server_ip".to_string(), Value::String(server_identifier)); packet.insert("cpu".to_string(), Value::from(cpu_usage)); // % packet.insert("mem".to_string(), Value::from(mem_usage)); // % packet.insert("net".to_string(), Value::from(net_usage_pct)); packet.insert("io".to_string(), Value::from(io_usage)); let serialized_packet = serde_json::to_string(&packet)?; + serialized_packet.push('\n'); let _ = stream.write(serialized_packet.as_bytes()); thread::sleep(Duration::from_secs(10)); diff --git a/src/main.rs b/src/main.rs index aaefa85..2395f2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use crate::proxy::tcp::proxy_tcp_connection; use std::fs::File; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncBufReadExt, AsyncReadExt}; use serde_json::Value; use std::collections::HashMap; use std::net::{IpAddr}; @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; use crate::backend::health::ServerMetrics; use rperf3::{Server, Config}; use std::io::Read; - +use std::io::{BufRead, BufReader}; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); @@ -27,32 +27,52 @@ async fn start_iperf_server() -> Result<(), Box> { Ok(()) } -async fn handle_connection(mut stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { - loop { - let mut buffer = [0u8; 512]; - let bytes_read = stream.read(&mut buffer).await?; - let receiving_data = String::from_utf8((buffer[..bytes_read]).to_vec()) - .unwrap(); - - let parsed_recdata : Value = serde_json::from_str(&receiving_data).unwrap(); - let server_ip: IpAddr = parsed_recdata["server_ip"] - .as_str() - .unwrap() - .parse() - .unwrap(); +async fn handle_metrics_stream(stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { + let server_ip = stream.peer_addr()?.ip(); + let mut reader = tokio::io::BufReader::new(stream); + let mut line = String::new(); - healths.get(&server_ip) - .unwrap() - .write() - .unwrap() - .update( - parsed_recdata["cpu"].as_f64().unwrap(), - parsed_recdata["mem"].as_f64().unwrap(), - parsed_recdata["net"].as_f64().unwrap(), - parsed_recdata["io"].as_f64().unwrap(), - ); + loop { + line.clear(); + + match reader.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => { + if let Err(e) = process_metrics(server_ip, &line, healths) { + eprintln!("skipping invalid packet: {}", e); + } + } + Err(e) => { + eprintln!("connection error: {}", e); + break; + } + } } - #[warn(unreachable_code)] + Ok(()) +} + +fn process_metrics(server_ip: IpAddr, json_str: &str, healths: &HashMap>>) -> Result<(), String> { + let parsed: Value = serde_json::from_str(json_str) + .map_err(|e| format!("parse error: {}", e))?; + + let metrics_lock = healths.get(&server_ip) + .ok_or_else(|| format!("unknown server: {}", server_ip))?; + + let get_f64 = |key: &str| -> Result { + parsed.get(key) + .and_then(|v| v.as_f64()) + .ok_or_else(|| format!("invalid '{}'", key)) + }; + + if let Ok(mut guard) = metrics_lock.write() { + guard.update( + get_f64("cpu")?, + get_f64("mem")?, + get_f64("net")?, + get_f64("io")?, + ); + } + Ok(()) } @@ -67,7 +87,7 @@ async fn start_healthcheck_listener(addr: &str, healths: HashMap Result<(), Box> { handles.push( tokio::spawn(async { - start_healthcheck_listener("127.0.0.1:8080", healths).await.unwrap(); + start_healthcheck_listener("0.0.0.0:8080", healths).await.unwrap(); }) );