fix server metrics framing issue
This commit is contained in:
@@ -146,20 +146,14 @@ async fn main() -> std::io::Result<()> {
|
|||||||
}
|
}
|
||||||
println!();
|
println!();
|
||||||
|
|
||||||
// Identify this process (client) by the local socket address used to connect
|
|
||||||
let server_identifier = match stream.local_addr() {
|
|
||||||
Ok(addr) => addr.to_string(),
|
|
||||||
Err(_) => format!("localhost:{}", PORT),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut packet: HashMap<String, Value> = HashMap::new();
|
let mut packet: HashMap<String, Value> = HashMap::new();
|
||||||
packet.insert("server_ip".to_string(), Value::String(server_identifier));
|
|
||||||
packet.insert("cpu".to_string(), Value::from(cpu_usage)); // %
|
packet.insert("cpu".to_string(), Value::from(cpu_usage)); // %
|
||||||
packet.insert("mem".to_string(), Value::from(mem_usage)); // %
|
packet.insert("mem".to_string(), Value::from(mem_usage)); // %
|
||||||
packet.insert("net".to_string(), Value::from(net_usage_pct));
|
packet.insert("net".to_string(), Value::from(net_usage_pct));
|
||||||
packet.insert("io".to_string(), Value::from(io_usage));
|
packet.insert("io".to_string(), Value::from(io_usage));
|
||||||
|
|
||||||
let serialized_packet = serde_json::to_string(&packet)?;
|
let serialized_packet = serde_json::to_string(&packet)?;
|
||||||
|
serialized_packet.push('\n');
|
||||||
let _ = stream.write(serialized_packet.as_bytes());
|
let _ = stream.write(serialized_packet.as_bytes());
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(10));
|
thread::sleep(Duration::from_secs(10));
|
||||||
|
|||||||
76
src/main.rs
76
src/main.rs
@@ -8,7 +8,7 @@ use crate::proxy::tcp::proxy_tcp_connection;
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::{AsyncBufReadExt, AsyncReadExt};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr};
|
use std::net::{IpAddr};
|
||||||
@@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
|
|||||||
use crate::backend::health::ServerMetrics;
|
use crate::backend::health::ServerMetrics;
|
||||||
use rperf3::{Server, Config};
|
use rperf3::{Server, Config};
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
|
use std::io::{BufRead, BufReader};
|
||||||
|
|
||||||
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
||||||
@@ -27,32 +27,52 @@ async fn start_iperf_server() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connection(mut stream: TcpStream, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
|
async fn handle_metrics_stream(stream: TcpStream, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
|
||||||
loop {
|
let server_ip = stream.peer_addr()?.ip();
|
||||||
let mut buffer = [0u8; 512];
|
let mut reader = tokio::io::BufReader::new(stream);
|
||||||
let bytes_read = stream.read(&mut buffer).await?;
|
let mut line = String::new();
|
||||||
let receiving_data = String::from_utf8((buffer[..bytes_read]).to_vec())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let parsed_recdata : Value = serde_json::from_str(&receiving_data).unwrap();
|
|
||||||
let server_ip: IpAddr = parsed_recdata["server_ip"]
|
|
||||||
.as_str()
|
|
||||||
.unwrap()
|
|
||||||
.parse()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
healths.get(&server_ip)
|
loop {
|
||||||
.unwrap()
|
line.clear();
|
||||||
.write()
|
|
||||||
.unwrap()
|
match reader.read_line(&mut line).await {
|
||||||
.update(
|
Ok(0) => break,
|
||||||
parsed_recdata["cpu"].as_f64().unwrap(),
|
Ok(_) => {
|
||||||
parsed_recdata["mem"].as_f64().unwrap(),
|
if let Err(e) = process_metrics(server_ip, &line, healths) {
|
||||||
parsed_recdata["net"].as_f64().unwrap(),
|
eprintln!("skipping invalid packet: {}", e);
|
||||||
parsed_recdata["io"].as_f64().unwrap(),
|
}
|
||||||
);
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("connection error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#[warn(unreachable_code)]
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_metrics(server_ip: IpAddr, json_str: &str, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> Result<(), String> {
|
||||||
|
let parsed: Value = serde_json::from_str(json_str)
|
||||||
|
.map_err(|e| format!("parse error: {}", e))?;
|
||||||
|
|
||||||
|
let metrics_lock = healths.get(&server_ip)
|
||||||
|
.ok_or_else(|| format!("unknown server: {}", server_ip))?;
|
||||||
|
|
||||||
|
let get_f64 = |key: &str| -> Result<f64, String> {
|
||||||
|
parsed.get(key)
|
||||||
|
.and_then(|v| v.as_f64())
|
||||||
|
.ok_or_else(|| format!("invalid '{}'", key))
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(mut guard) = metrics_lock.write() {
|
||||||
|
guard.update(
|
||||||
|
get_f64("cpu")?,
|
||||||
|
get_f64("mem")?,
|
||||||
|
get_f64("net")?,
|
||||||
|
get_f64("io")?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,7 +87,7 @@ async fn start_healthcheck_listener(addr: &str, healths: HashMap<IpAddr, Arc<RwL
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = handle_connection(stream, &healths).await {
|
if let Err(e) = handle_metrics_stream(stream, &healths).await {
|
||||||
eprintln!("connection handler error: {}", e);
|
eprintln!("connection handler error: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -97,7 +117,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
handles.push(
|
handles.push(
|
||||||
tokio::spawn(async {
|
tokio::spawn(async {
|
||||||
start_healthcheck_listener("127.0.0.1:8080", healths).await.unwrap();
|
start_healthcheck_listener("0.0.0.0:8080", healths).await.unwrap();
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user