Merge branch 'main' into healthcheck

This commit is contained in:
Ning Qi (Paul) Sun
2025-12-10 16:49:16 -05:00
committed by GitHub
5 changed files with 253 additions and 69 deletions

68
Cargo.lock generated
View File

@@ -95,6 +95,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2" checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]] [[package]]
name = "arraydeque" name = "arraydeque"
version = "0.5.1" version = "0.5.1"
@@ -113,6 +119,12 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.10.0" version = "2.10.0"
@@ -297,6 +309,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.16" version = "0.2.16"
@@ -430,6 +451,7 @@ name = "l4lb"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anywho", "anywho",
"arc-swap",
"cidr", "cidr",
"rand 0.10.0-rc.5", "rand 0.10.0-rc.5",
"rperf3-rs", "rperf3-rs",
@@ -479,6 +501,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
@@ -489,6 +512,30 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
[[package]]
name = "notify"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.10.0",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.60.2",
]
[[package]]
name = "notify-types"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@@ -640,7 +687,7 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.10.0",
] ]
[[package]] [[package]]
@@ -706,6 +753,15 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "saphyr-parser" name = "saphyr-parser"
version = "0.0.6" version = "0.0.6"
@@ -903,6 +959,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.1+wasi-snapshot-preview1" version = "0.11.1+wasi-snapshot-preview1"

View File

@@ -12,3 +12,6 @@ serde_json = "1.0.145"
rperf3-rs = "0.3.9" rperf3-rs = "0.3.9"
cidr = "0.3.1" cidr = "0.3.1"
serde-saphyr = "0.0.10" serde-saphyr = "0.0.10"
arc-swap = "1.7.1"
clap = { version = "4.5.53", features = ["derive"] }
notify = "8.2.0"

View File

@@ -24,7 +24,6 @@ rules:
- clients: - clients:
- "0.0.0.0/0:6767" - "0.0.0.0/0:6767"
- "0.0.0.0/0:6969"
targets: # no issues with duplicate servers or clusters targets: # no issues with duplicate servers or clusters
- "priority-api" - "priority-api"
- "priority-api" - "priority-api"

View File

@@ -15,24 +15,28 @@ pub struct RoutingTable {
pub entries: Vec<(IpCidr, usize)>, pub entries: Vec<(IpCidr, usize)>,
} }
fn parse_client(s: &str) -> (IpCidr, u16) {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':').expect("badly formatted client");
let port: u16 = port_part.parse().expect("bad port");
let cidr: IpCidr = ip_part.parse().expect("bad ip/mask");
(cidr, port)
}
pub type PortListeners = HashMap<u16, RoutingTable>; pub type PortListeners = HashMap<u16, RoutingTable>;
pub fn build_lb(config: AppConfig) -> (PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) { fn parse_client(s: &str) -> Result<(IpCidr, u16), String> {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':')
.ok_or_else(|| format!("badly formatted client: {}", s))?;
let port = port_part.parse()
.map_err(|_| format!("bad port: {}", s))?;
let cidr = ip_part.parse()
.map_err(|_| format!("bad ip/mask: {}", s))?;
Ok((cidr, port))
}
pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>), String> {
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new(); let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new(); let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
for backend_cfg in config.backends { for backend_cfg in config.backends {
let ip: IpAddr = backend_cfg.ip.parse().unwrap(); let ip: IpAddr = backend_cfg.ip.parse()
.map_err(|_| format!("bad ip: {}", backend_cfg.ip))?;
let addr = SocketAddr::new(ip, backend_cfg.port); let addr = SocketAddr::new(ip, backend_cfg.port);
let health = healths let health = healths
@@ -81,7 +85,7 @@ pub fn build_lb(config: AppConfig) -> (PortListeners, HashMap<IpAddr, Arc<RwLock
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new(); let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
for client_def in rule.clients { for client_def in rule.clients {
let (cidr, port) = parse_client(&client_def); let (cidr, port) = parse_client(&client_def)?;
port_groups.entry(port).or_default().push(cidr); port_groups.entry(port).or_default().push(cidr);
} }
@@ -117,5 +121,5 @@ pub fn build_lb(config: AppConfig) -> (PortListeners, HashMap<IpAddr, Arc<RwLock
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); .sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length()));
} }
(listeners, healths) Ok((listeners, healths))
} }

View File

@@ -3,9 +3,12 @@ mod balancer;
mod config; mod config;
mod proxy; mod proxy;
use crate::balancer::{Balancer, ConnectionInfo}; use std::collections::HashMap;
use crate::balancer::{ConnectionInfo};
use crate::proxy::tcp::proxy_tcp_connection; use crate::proxy::tcp::proxy_tcp_connection;
use std::fs::File; use std::fs::File;
use std::path::PathBuf;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncBufReadExt, AsyncReadExt}; use tokio::io::{AsyncBufReadExt, AsyncReadExt};
@@ -93,27 +96,53 @@ async fn start_healthcheck_listener(addr: &str, healths: HashMap<IpAddr, Arc<RwL
} }
Ok(()) Ok(())
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use anywho::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use crate::backend::ServerMetrics;
use crate::config::loader::{build_lb, RoutingTable};
use notify::{Watcher, RecursiveMode, Event};
use clap::Parser;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
struct ProgramState {
tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let f = File::open("config.yaml").expect("couldn't open config.yaml"); let args = Args::parse();
let app_config: config::AppConfig = serde_saphyr::from_reader(f)?;
println!( if !args.config.is_file() {
"Loaded {} backends, {} rules.", eprintln!("config file not found or not accessible");
app_config.backends.len(), std::process::exit(1);
app_config.rules.len()
);
let (listeners, healths) = config::loader::build_lb(app_config);
if listeners.is_empty() {
eprintln!("its a lawless land");
return Ok(());
} }
let mut handles = Vec::new(); println!("reading config from {:?}", args.config);
let state = Arc::new(Mutex::new(ProgramState {
tx_rt_map: HashMap::new(),
healths: HashMap::new(),
}));
if let Err(e) = load_config(&args.config, state.clone()).await {
eprintln!("config file loading failed: {}", e);
}
let config_path = args.config.clone();
let state_clone = state.clone();
handles.push( handles.push(
tokio::spawn(async { tokio::spawn(async {
@@ -127,30 +156,113 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}) })
); );
for (port, mut routing_table) in listeners { tokio::spawn(async move {
handles.push(tokio::spawn(async move { let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
if event.kind.is_modify() {
let _ = tx.blocking_send(());
}
}
}).unwrap();
watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap();
println!("watching for changes to {:?}", config_path);
while rx.recv().await.is_some() {
if let Err(e) = load_config(&config_path, state_clone.clone()).await {
eprintln!("loading config failed: {}", e);
}
}
});
loop { tokio::time::sleep(Duration::from_hours(1)).await; }
}
async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> {
let f = File::open(path)?;
let app_config: config::AppConfig = match serde_saphyr::from_reader(f) {
Ok(app_config) => app_config,
Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); }
};
println!(
"Loaded config, with {} backends, {} rules.",
app_config.backends.len(),
app_config.rules.len()
);
let (mut listeners, health_monitors) = match build_lb(app_config) {
Ok(v) => v,
Err(e) => {
eprintln!("config has logical errors: {}", e);
return Ok(());
}
};
let mut prog_state = state.lock().unwrap();
let ports_to_remove: Vec<u16> = prog_state.tx_rt_map
.keys()
.cloned()
.filter(|port| !listeners.contains_key(port))
.collect();
for port in ports_to_remove {
prog_state.tx_rt_map.remove(&port);
}
prog_state.healths = health_monitors;
for (port, routing_table) in listeners.drain() {
if let Some(x) = prog_state.tx_rt_map.get_mut(&port) {
x.send(routing_table)?;
println!("updated rules on port {}", port);
} else {
let (tx_rt, rx_rt) = mpsc::unbounded_channel();
prog_state.tx_rt_map.insert(port, tx_rt);
tokio::spawn(run_listener(port, rx_rt, routing_table));
}
}
println!("reload complete");
Ok(())
}
async fn run_listener(
port: u16,
mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>,
mut current_table: RoutingTable
) {
let addr = format!("0.0.0.0:{}", port); let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr); println!("Starting tcp listener on {}", addr);
let listener = TcpListener::bind(&addr).await.expect("Failed to bind port"); let listener = TcpListener::bind(&addr).await.expect("Failed to bind port");
loop { loop {
let (socket, remote_addr) = match listener.accept().await { tokio::select! {
Ok(v) => v, msg = rx_rt.recv() => {
Err(e) => { match msg {
eprintln!("error: listener port {}: {}", port, e); Some(new_table) => {
continue; current_table = new_table;
} }
}; None => {
println!("Unbinding listener on port {}", port);
break;
}
}
}
accept_result = listener.accept() => {
match accept_result {
Ok((socket, remote_addr)) => {
let remote_ip = remote_addr.ip(); let remote_ip = remote_addr.ip();
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
let mut chosen_backend = None; let mut chosen_backend = None;
for (cidr, balancer_idx) in &mut routing_table.entries { for (cidr, balancer_idx) in &mut current_table.entries {
if cidr.contains(&remote_ip) { if cidr.contains(&remote_ip) {
let balancer = &mut routing_table.balancers[*balancer_idx]; let balancer = &mut current_table.balancers[*balancer_idx];
chosen_backend = balancer.choose_backend(ConnectionInfo { chosen_backend = balancer.choose_backend(ConnectionInfo {
client_ip: remote_ip, client_ip: remote_ip,
}); });
@@ -168,12 +280,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("error: no matching rule for {} on port {}", remote_ip, port); println!("error: no matching rule for {} on port {}", remote_ip, port);
} }
} }
})); Err(e) => {
eprintln!("error: listener port {}: {}", port, e);
continue;
}
}
}
} }
for h in handles {
let _ = h.await;
} }
Ok(())
} }