From 5ad8539d7a3caab284c039c9262bfe7cf4f33730 Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 10 Dec 2025 15:37:02 -0500 Subject: [PATCH] added config hot reload --- Cargo.lock | 251 ++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 + config.yaml | 1 - src/config/loader.rs | 33 +++--- src/main.rs | 203 ++++++++++++++++++++++++++-------- 5 files changed, 428 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25cc39d..e33b721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,12 +15,68 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anywho" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arraydeque" version = "0.5.1" @@ -39,6 +95,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -74,6 +136,52 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60" +[[package]] +name = "clap" +version = "4.5.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -107,6 +215,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -137,18 +254,73 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "l4lb" version = "0.1.0" dependencies = [ "anywho", + "arc-swap", "cidr", + "clap", + "notify", "rand", "serde", "serde-saphyr", @@ -170,6 +342,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + [[package]] name = "memchr" version = "2.7.6" @@ -183,6 +361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -193,6 +372,30 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.10.0", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "num-traits" version = "0.2.19" @@ -208,6 +411,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "parking_lot" version = "0.12.5" @@ -284,7 +493,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -293,6 +502,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "saphyr-parser" version = "0.0.6" @@ -401,6 +619,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.110" @@ -446,12 +670,28 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -467,6 +707,15 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index f55a2b5..614de44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,6 @@ rand = "0.10.0-rc.5" serde = { version = "1.0.228", features = ["derive"] } cidr = "0.3.1" serde-saphyr = "0.0.10" +arc-swap = "1.7.1" +clap = { version = "4.5.53", features = ["derive"] } +notify = "8.2.0" diff --git a/config.yaml b/config.yaml index 214f970..17c7353 100644 --- a/config.yaml +++ b/config.yaml @@ -24,7 +24,6 @@ rules: - clients: - "0.0.0.0/0:6767" - - "0.0.0.0/0:6969" targets: # no issues with duplicate servers or clusters - "priority-api" - "priority-api" diff --git a/src/config/loader.rs b/src/config/loader.rs index 81c7314..c4eb678 100644 --- a/src/config/loader.rs +++ b/src/config/loader.rs @@ -14,24 +14,29 @@ pub struct RoutingTable { pub entries: Vec<(IpCidr, usize)>, } -fn parse_client(s: &str) -> (IpCidr, u16) { - // just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80) - let (ip_part, port_part) = s.rsplit_once(':').expect("badly formatted client"); - - let port: u16 = port_part.parse().expect("bad port"); - let cidr: IpCidr = ip_part.parse().expect("bad ip/mask"); - - (cidr, port) -} - pub type PortListeners = HashMap; -pub fn build_lb(config: AppConfig) -> PortListeners { +fn parse_client(s: &str) -> Result<(IpCidr, u16), String> { + // just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80) + let (ip_part, port_part) = s.rsplit_once(':') + .ok_or_else(|| format!("badly formatted client: {}", s))?; + + let port = port_part.parse() + .map_err(|_| format!("bad port: {}", s))?; + let cidr = ip_part.parse() + .map_err(|_| format!("bad ip/mask: {}", s))?; + + Ok((cidr, port)) +} + + +pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap>>), String> { let mut healths: HashMap>> = HashMap::new(); let mut backends: HashMap> = HashMap::new(); for backend_cfg in config.backends { - let ip: IpAddr = backend_cfg.ip.parse().unwrap(); + let ip: IpAddr = backend_cfg.ip.parse() + .map_err(|_| format!("bad ip: {}", backend_cfg.ip))?; let addr = SocketAddr::new(ip, backend_cfg.port); let health = healths @@ -80,7 +85,7 @@ pub fn build_lb(config: AppConfig) -> PortListeners { let mut port_groups: HashMap> = HashMap::new(); for client_def in rule.clients { - let (cidr, port) = parse_client(&client_def); + let (cidr, port) = parse_client(&client_def)?; port_groups.entry(port).or_default().push(cidr); } @@ -116,5 +121,5 @@ pub fn build_lb(config: AppConfig) -> PortListeners { .sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); } - listeners + Ok((listeners, healths)) } diff --git a/src/main.rs b/src/main.rs index b459cde..28c9ad2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,81 +3,190 @@ mod balancer; mod config; mod proxy; -use crate::balancer::{Balancer, ConnectionInfo}; +use std::collections::HashMap; +use crate::balancer::{ConnectionInfo}; use crate::proxy::tcp::proxy_tcp_connection; use std::fs::File; +use std::path::PathBuf; +use std::net::IpAddr; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; +use anywho::Error; use tokio::net::TcpListener; +use tokio::sync::mpsc; +use crate::backend::ServerMetrics; +use crate::config::loader::{build_lb, RoutingTable}; + +use notify::{Watcher, RecursiveMode, Event}; +use clap::Parser; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); +struct ProgramState { + tx_rt_map: HashMap>, + healths: HashMap>>, +} + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + #[arg(short, long, default_value = "config.yaml")] + config: PathBuf, +} + #[tokio::main] async fn main() -> Result<(), Box> { - let f = File::open("config.yaml").expect("couldn't open config.yaml"); - let app_config: config::AppConfig = serde_saphyr::from_reader(f)?; + let args = Args::parse(); + + if !args.config.is_file() { + eprintln!("config file not found or not accessible"); + std::process::exit(1); + } + + println!("reading config from {:?}", args.config); + + let state = Arc::new(Mutex::new(ProgramState { + tx_rt_map: HashMap::new(), + healths: HashMap::new(), + })); + + if let Err(e) = load_config(&args.config, state.clone()).await { + eprintln!("config file loading failed: {}", e); + } + + let config_path = args.config.clone(); + let state_clone = state.clone(); + tokio::spawn(async move { + let (tx, mut rx) = mpsc::channel(1); + + let mut watcher = notify::recommended_watcher(move |res: Result| { + if let Ok(event) = res { + if event.kind.is_modify() { + let _ = tx.blocking_send(()); + } + } + }).unwrap(); + + watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap(); + println!("watching for changes to {:?}", config_path); + + while rx.recv().await.is_some() { + if let Err(e) = load_config(&config_path, state_clone.clone()).await { + eprintln!("loading config failed: {}", e); + } + } + }); + + loop { tokio::time::sleep(Duration::from_hours(1)).await; } +} + +async fn load_config(path: &PathBuf, state: Arc>) -> Result<(), Error> { + let f = File::open(path)?; + let app_config: config::AppConfig = match serde_saphyr::from_reader(f) { + Ok(app_config) => app_config, + Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); } + }; println!( - "Loaded {} backends, {} rules.", + "Loaded config, with {} backends, {} rules.", app_config.backends.len(), app_config.rules.len() ); - let listeners = config::loader::build_lb(app_config); + let (mut listeners, health_monitors) = match build_lb(app_config) { + Ok(v) => v, + Err(e) => { + eprintln!("config has logical errors: {}", e); + return Ok(()); + } + }; + let mut prog_state = state.lock().unwrap(); - if listeners.is_empty() { - eprintln!("its a lawless land"); - return Ok(()); + let ports_to_remove: Vec = prog_state.tx_rt_map + .keys() + .cloned() + .filter(|port| !listeners.contains_key(port)) + .collect(); + + for port in ports_to_remove { + prog_state.tx_rt_map.remove(&port); } - let mut handles = Vec::new(); + prog_state.healths = health_monitors; + for (port, routing_table) in listeners.drain() { + if let Some(x) = prog_state.tx_rt_map.get_mut(&port) { + x.send(routing_table)?; + println!("updated rules on port {}", port); + } else { + let (tx_rt, rx_rt) = mpsc::unbounded_channel(); + prog_state.tx_rt_map.insert(port, tx_rt); - for (port, mut routing_table) in listeners { - handles.push(tokio::spawn(async move { - let addr = format!("0.0.0.0:{}", port); - println!("Starting tcp listener on {}", addr); + tokio::spawn(run_listener(port, rx_rt, routing_table)); + } + } - let listener = TcpListener::bind(&addr).await.expect("Failed to bind port"); + println!("reload complete"); + Ok(()) +} - loop { - let (socket, remote_addr) = match listener.accept().await { - Ok(v) => v, - Err(e) => { - eprintln!("error: listener port {}: {}", port, e); - continue; +async fn run_listener( + port: u16, + mut rx_rt: mpsc::UnboundedReceiver, + mut current_table: RoutingTable +) { + let addr = format!("0.0.0.0:{}", port); + println!("Starting tcp listener on {}", addr); + + let listener = TcpListener::bind(&addr).await.expect("Failed to bind port"); + + loop { + tokio::select! { + msg = rx_rt.recv() => { + match msg { + Some(new_table) => { + current_table = new_table; } - }; - - let remote_ip = remote_addr.ip(); - let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); - - let mut chosen_backend = None; - - for (cidr, balancer_idx) in &mut routing_table.entries { - if cidr.contains(&remote_ip) { - let balancer = &mut routing_table.balancers[*balancer_idx]; - chosen_backend = balancer.choose_backend(ConnectionInfo { - client_ip: remote_ip, - }); + None => { + println!("Unbinding listener on port {}", port); break; } } + } + accept_result = listener.accept() => { + match accept_result { + Ok((socket, remote_addr)) => { + let remote_ip = remote_addr.ip(); + let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); - if let Some(backend) = chosen_backend { - tokio::spawn(async move { - if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await { - eprintln!("error: conn_id={} proxy failed: {}", conn_id, e); + let mut chosen_backend = None; + + for (cidr, balancer_idx) in &mut current_table.entries { + if cidr.contains(&remote_ip) { + let balancer = &mut current_table.balancers[*balancer_idx]; + chosen_backend = balancer.choose_backend(ConnectionInfo { + client_ip: remote_ip, + }); + break; + } } - }); - } else { - println!("error: no matching rule for {} on port {}", remote_ip, port); + + if let Some(backend) = chosen_backend { + tokio::spawn(async move { + if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await { + eprintln!("error: conn_id={} proxy failed: {}", conn_id, e); + } + }); + } else { + println!("error: no matching rule for {} on port {}", remote_ip, port); + } + } + Err(e) => { + eprintln!("error: listener port {}: {}", port, e); + continue; + } } } - })); + } } - - for h in handles { - let _ = h.await; - } - - Ok(()) }