applied hot reload to health check logic

This commit is contained in:
psun256
2025-12-10 18:52:40 -05:00
parent 8212a1a762
commit 022c48a041
10 changed files with 421 additions and 144 deletions

View File

@@ -3,115 +3,35 @@ mod balancer;
mod config;
mod proxy;
use std::collections::HashMap;
use crate::balancer::{ConnectionInfo};
use crate::backend::health::{start_healthcheck_listener, start_iperf_server, ServerMetrics};
use crate::balancer::ConnectionInfo;
use crate::config::loader::{build_lb, RoutingTable};
use crate::proxy::tcp::proxy_tcp_connection;
use std::fs::File;
use std::path::PathBuf;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncBufReadExt, AsyncReadExt};
use serde_json::Value;
use anywho::Error;
use std::collections::HashMap;
use std::net::{IpAddr};
use std::sync::{Arc, RwLock};
use crate::backend::health::ServerMetrics;
use rperf3::{Server, Config};
use std::io::Read;
use std::io::{BufRead, BufReader};
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
async fn start_iperf_server() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::server(5001);
let server = Server::new(config);
server.run().await?;
Ok(())
}
async fn handle_metrics_stream(stream: TcpStream, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
let server_ip = stream.peer_addr()?.ip();
let mut reader = tokio::io::BufReader::new(stream);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
if let Err(e) = process_metrics(server_ip, &line, healths) {
eprintln!("skipping invalid packet: {}", e);
}
}
Err(e) => {
eprintln!("connection error: {}", e);
break;
}
}
}
Ok(())
}
fn process_metrics(server_ip: IpAddr, json_str: &str, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> Result<(), String> {
let parsed: Value = serde_json::from_str(json_str)
.map_err(|e| format!("parse error: {}", e))?;
let metrics_lock = healths.get(&server_ip)
.ok_or_else(|| format!("unknown server: {}", server_ip))?;
let get_f64 = |key: &str| -> Result<f64, String> {
parsed.get(key)
.and_then(|v| v.as_f64())
.ok_or_else(|| format!("invalid '{}'", key))
};
if let Ok(mut guard) = metrics_lock.write() {
guard.update(
get_f64("cpu")?,
get_f64("mem")?,
get_f64("net")?,
get_f64("io")?,
);
}
Ok(())
}
async fn start_healthcheck_listener(addr: &str, healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?;
println!("TCP server listening on {}", addr);
loop {
let (stream, remote_addr) = match listener.accept().await {
Ok(v) => v,
Err(e) => {
continue;
}
};
if let Err(e) = handle_metrics_stream(stream, &healths).await {
eprintln!("connection handler error: {}", e);
}
}
Ok(())
use std::fs::File;
use std::hash::Hash;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use anywho::Error;
use tokio::io::AsyncBufReadExt;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use crate::backend::ServerMetrics;
use crate::config::loader::{build_lb, RoutingTable};
use notify::{Watcher, RecursiveMode, Event};
use clap::Parser;
use notify::{Event, RecursiveMode, Watcher};
use std::cmp;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
struct ProgramState {
tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
health_listener: Option<tokio::task::JoinHandle<()>>,
iperf_server: Option<tokio::task::JoinHandle<()>>,
health_listener_addr: Option<String>,
iperf_server_addr: Option<String>,
}
#[derive(Parser, Debug)]
@@ -135,6 +55,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(Mutex::new(ProgramState {
tx_rt_map: HashMap::new(),
healths: HashMap::new(),
health_listener: None,
iperf_server: None,
health_listener_addr: None,
iperf_server_addr: None,
}));
if let Err(e) = load_config(&args.config, state.clone()).await {
@@ -143,48 +67,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config_path = args.config.clone();
let state_clone = state.clone();
handles.push(
tokio::spawn(async {
start_healthcheck_listener("0.0.0.0:8080", healths).await.unwrap();
})
);
handles.push(
tokio::spawn(async {
start_iperf_server().await;
})
);
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
if event.kind.is_modify() {
let _ = tx.blocking_send(());
}
}
}).unwrap();
})
.unwrap();
watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap();
watcher
.watch(&config_path, RecursiveMode::NonRecursive)
.unwrap();
println!("watching for changes to {:?}", config_path);
while rx.recv().await.is_some() {
// for some reason, saving on certain text editors fires several events,
// and this causes us to reload a lot. try to flush some events, add a tiny delay
// to mitigate this
while rx.try_recv().is_ok() {}
tokio::time::sleep(Duration::from_millis(50)).await;
while rx.try_recv().is_ok() {}
if let Err(e) = load_config(&config_path, state_clone.clone()).await {
eprintln!("loading config failed: {}", e);
}
}
});
loop { tokio::time::sleep(Duration::from_hours(1)).await; }
loop {
tokio::time::sleep(Duration::from_hours(1)).await;
}
}
async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> {
let f = File::open(path)?;
let app_config: config::AppConfig = match serde_saphyr::from_reader(f) {
Ok(app_config) => app_config,
Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); }
Err(e) => {
eprintln!("error parsing config {}", e);
return Ok(());
}
};
println!(
@@ -193,7 +121,7 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
app_config.rules.len()
);
let (mut listeners, health_monitors) = match build_lb(app_config) {
let (mut listeners, health_monitors) = match build_lb(&app_config) {
Ok(v) => v,
Err(e) => {
eprintln!("config has logical errors: {}", e);
@@ -202,7 +130,8 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
};
let mut prog_state = state.lock().unwrap();
let ports_to_remove: Vec<u16> = prog_state.tx_rt_map
let ports_to_remove: Vec<u16> = prog_state
.tx_rt_map
.keys()
.cloned()
.filter(|port| !listeners.contains_key(port))
@@ -212,6 +141,38 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
prog_state.tx_rt_map.remove(&port);
}
if let Some(handle) = prog_state.health_listener.take() {
handle.abort();
}
let health_map: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = health_monitors.clone();
let health_addr = app_config.healthcheck_addr.clone();
let health_addr_c = health_addr.clone();
let health_handle = tokio::spawn(async move {
if let Err(e) = start_healthcheck_listener(&health_addr, health_map).await {
eprintln!("health check listener failed: {}", e);
}
});
prog_state.health_listener = Some(health_handle);
prog_state.health_listener_addr = Some(health_addr_c);
// maybe restart iperf server
let iperf_addr = app_config.iperf_addr.clone();
if prog_state.iperf_server_addr.as_ref() != Some(&iperf_addr) {
if let Some(handle) = prog_state.iperf_server.take() {
handle.abort();
}
let iperf_addr_c = iperf_addr.clone();
let iperf_handle = tokio::spawn(async move {
if let Err(e) = start_iperf_server(iperf_addr.as_str()).await {
eprintln!("iperf server failed: {}", e);
}
});
prog_state.iperf_server = Some(iperf_handle);
prog_state.iperf_server_addr = Some(iperf_addr_c);
}
prog_state.healths = health_monitors;
for (port, routing_table) in listeners.drain() {
if let Some(x) = prog_state.tx_rt_map.get_mut(&port) {
@@ -232,7 +193,7 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
async fn run_listener(
port: u16,
mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>,
mut current_table: RoutingTable
mut current_table: RoutingTable,
) {
let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr);