From 8170d2a6bf9f6379b50d0bd0e7f5455bbf8ce2e5 Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 10 Dec 2025 01:49:45 -0500 Subject: [PATCH] implemented better routing system, config parsing from yaml. --- Cargo.lock | 226 ++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 + config.yaml | 35 +++++++ src/config.rs | 6 -- src/config/loader.rs | 124 ++++++++++++++++++++++++ src/config/mod.rs | 49 ++++++++++ src/main.rs | 100 +++++++++++-------- 7 files changed, 498 insertions(+), 45 deletions(-) create mode 100644 config.yaml delete mode 100644 src/config.rs create mode 100644 src/config/loader.rs create mode 100644 src/config/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d775ae5..25cc39d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,43 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "anywho" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2" +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "2.10.0" @@ -37,6 +68,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "cidr" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -46,6 +83,30 @@ dependencies = [ "libc", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "encoding_rs_io" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cc3c5651fb62ab8aa3103998dade57efdd028544bd300516baa31840c252a83" +dependencies = [ + "encoding_rs", +] + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "getrandom" version = "0.3.4" @@ -58,12 +119,39 @@ dependencies = [ "wasip2", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown", +] + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "l4lb" version = "0.1.0" dependencies = [ "anywho", + "cidr", "rand", + "serde", + "serde-saphyr", "tokio", ] @@ -82,6 +170,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "memchr" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" + [[package]] name = "mio" version = "1.1.0" @@ -93,6 +187,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + [[package]] name = "parking_lot" version = "0.12.5" @@ -112,7 +227,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "smallvec", + "smallvec 1.15.1", "windows-link", ] @@ -172,12 +287,89 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "saphyr-parser" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb771b59f6b1985d1406325ec28f97cfb14256abcec4fdfb37b36a1766d6af7" +dependencies = [ + "arraydeque", + "hashlink", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde-saphyr" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b9e06cddad47cc6214c0c456cf209b99a58b54223e7af2f6d4b88a5a9968499" +dependencies = [ + "ahash", + "base64", + "encoding_rs_io", + "nohash-hasher", + "num-traits", + "ryu", + "saphyr-parser", + "serde", + "serde_json", + "smallvec 2.0.0-alpha.12", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.145" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", + "serde_core", +] + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -193,6 +385,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smallvec" +version = "2.0.0-alpha.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef784004ca8777809dcdad6ac37629f0a97caee4c685fcea805278d81dd8b857" + [[package]] name = "socket2" version = "0.6.1" @@ -248,6 +446,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -357,3 +561,23 @@ name = "wit-bindgen" version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "zerocopy" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index a4a9d43..f55a2b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,6 @@ edition = "2024" anywho = "0.1.2" tokio = { version = "1.48.0", features = ["full"] } rand = "0.10.0-rc.5" +serde = { version = "1.0.228", features = ["derive"] } +cidr = "0.3.1" +serde-saphyr = "0.0.10" diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..214f970 --- /dev/null +++ b/config.yaml @@ -0,0 +1,35 @@ +backends: + - id: "srv-1" + ip: "127.0.0.1" + port: 8081 + + - id: "srv-2" + ip: "127.0.0.1" + port: 8082 + +clusters: + main-api: + - "srv-1" + - "srv-2" + priority-api: + - "srv-1" + +rules: + - clients: + - "0.0.0.0/0:8080" + targets: + - "main-api" + strategy: + type: "RoundRobin" + + - clients: + - "0.0.0.0/0:6767" + - "0.0.0.0/0:6969" + targets: # no issues with duplicate servers or clusters + - "priority-api" + - "priority-api" + - "priority-api" + strategy: + type: "Adaptive" + coefficients: [ 1.5, 1.0, 0.5, 0.1 ] + alpha: 0.75 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index adbbc17..0000000 --- a/src/config.rs +++ /dev/null @@ -1,6 +0,0 @@ -// TODO: "routing" rules -// backends defined as ip + port -// define sets of backends -// allowed set operations for now is just union -// rules are ip + mask and ports, maps to some of the sets -// defined earlier, along with a routing strategy diff --git a/src/config/loader.rs b/src/config/loader.rs new file mode 100644 index 0000000..81d240e --- /dev/null +++ b/src/config/loader.rs @@ -0,0 +1,124 @@ +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{Arc, RwLock}; +use cidr::IpCidr; + +use crate::backend::*; +use crate::balancer::Balancer; +use crate::balancer::round_robin::RoundRobinBalancer; +use crate::balancer::adaptive_weight::AdaptiveWeightBalancer; +use crate::config::*; + +pub struct RoutingTable { + pub balancers: Vec>, + pub entries: Vec<(IpCidr, usize)>, +} + +fn parse_client(s: &str) -> (IpCidr, u16) { + // just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80) + let (ip_part, port_part) = s.rsplit_once(':').expect("badly formatted client"); + + let port: u16 = port_part.parse().expect("bad port"); + let cidr: IpCidr = ip_part.parse().expect("bad ip/mask"); + + (cidr, port) +} + +pub type PortListeners = HashMap; + +pub fn build_lb(config: AppConfig) -> PortListeners { + let mut healths: HashMap>> = HashMap::new(); + let mut backends: HashMap> = HashMap::new(); + + for backend_cfg in config.backends { + let ip: IpAddr = backend_cfg.ip.parse().unwrap(); + let addr = SocketAddr::new(ip, backend_cfg.port); + + let health = healths + .entry(ip) + .or_insert_with(|| Arc::new(RwLock::new(ServerHealth::default()))) + .clone(); + + let backend = Arc::new(Backend::new( + backend_cfg.id.clone(), + addr, + health, + )); + + backends.insert(backend_cfg.id, backend); + } + + let mut listeners: PortListeners = HashMap::new(); + + for rule in config.rules { + let mut target_backends = Vec::new(); + + for target_name in &rule.targets { + if let Some(members) = config.clusters.get(target_name) { + for member_id in members { + if let Some(backend) = backends.get(member_id) { + target_backends.push(backend.clone()); + } + } + } + else if let Some(backend) = backends.get(target_name) { + target_backends.push(backend.clone()); + } else { + eprintln!("warning: target {} not found", target_name); + } + } + + // possible for multiple targets of the same rule to have common backends. + target_backends.sort_by(|a, b| a.id.cmp(&b.id)); + target_backends.dedup_by(|a, b| a.id == b.id); + + if target_backends.is_empty() { + eprintln!("warning: rule has no valid targets, skipping."); + continue; + } + + // for each different client port on this rule, we unfortunately need to make a new + // Balancer, since Balancer is not thread safe, requires &mut self for the backend + // selection. + // a good enough compromise to make a new one for each port, avoids using Mutex, at the + // cost of minor penalty to load balancing "quality" when you have several client ports. + let mut port_groups: HashMap> = HashMap::new(); + + for client_def in rule.clients { + let (cidr, port) = parse_client(&client_def); + port_groups.entry(port).or_default().push(cidr); + } + + for (port, cidrs) in port_groups { + let table = listeners.entry(port).or_insert_with(|| RoutingTable { + balancers: Vec::new(), + entries: Vec::new(), + }); + + let pool = BackendPool::new(target_backends.clone()); + + let balancer: Box = match &rule.strategy { + LoadBalancerStrategy::RoundRobin => { + Box::new(RoundRobinBalancer::new(pool)) + }, + LoadBalancerStrategy::Adaptive { coefficients, alpha } => { + Box::new(AdaptiveWeightBalancer::new(pool, *coefficients, *alpha)) + } + }; + + let balancer_idx = table.balancers.len(); + table.balancers.push(balancer); + + for cidr in cidrs { + table.entries.push((cidr, balancer_idx)); + } + } + } + + // sort to make most specific first, so that first match == longest prefix match + for table in listeners.values_mut() { + table.entries.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); + } + + listeners +} diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000..634e3e1 --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1,49 @@ +// config is written as a YAML file, the path will be passed to the program. +// +// the high level structure of the config is that we +// first define the individual backends (ip + port) we are going +// to load balance around. +// +// next we define some clusters, which are really more like a short +// alias for a group of backends. +// +// next we define the rules. these are written as a list of +// "ip/subnet:port" for the clients, and then a list of clusters +// for which backends these are balanced around. and of course +// specify which algorithm to use. +pub mod loader; + +use std::collections::HashMap; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct AppConfig { + pub backends: Vec, + #[serde(default)] + pub clusters: HashMap>, + pub rules: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct BackendConfig { + pub id: String, + pub ip: String, + pub port: u16, +} + +#[derive(Debug, Deserialize)] +pub struct RuleConfig { + pub clients: Vec, + pub targets: Vec, + pub strategy: LoadBalancerStrategy, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type")] +pub enum LoadBalancerStrategy { + RoundRobin, + Adaptive { + coefficients: [f64; 4], + alpha: f64, + }, +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 136bf49..f23a8ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,55 +1,79 @@ -extern crate core; - mod balancer; mod config; mod backend; mod proxy; -use tokio::net::TcpListener; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use std::sync::{Arc, RwLock}; +use std::fs::File; use std::sync::atomic::{AtomicU64, Ordering}; -use crate::backend::{Backend, BackendPool, ServerHealth}; -use crate::balancer::Balancer; -use crate::balancer::round_robin::RoundRobinBalancer; +use tokio::net::TcpListener; use crate::proxy::tcp::proxy_tcp_connection; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); #[tokio::main] async fn main() -> Result<(), Box> { - let mut pool: Vec> = Vec::new(); - let server_metric = Arc::new(RwLock::new(ServerHealth::default())); - - pool.push(Arc::new(Backend::new( - "backend 1".into(), - "127.0.0.1:8081".parse().unwrap(), - server_metric.clone() - ))); + let f = File::open("config.yaml").expect("couldn't open config.yaml"); + let app_config: config::AppConfig = serde_saphyr::from_reader(f)?; - pool.push(Arc::new(Backend::new( - "backend 2".into(), - "127.0.0.1:8082".parse().unwrap(), - server_metric.clone() - ))); + println!("Loaded {} backends, {} rules.", + app_config.backends.len(), + app_config.rules.len() + ); - let mut balancer = RoundRobinBalancer::new(BackendPool::new(pool)); + let listeners = config::loader::build_lb(app_config); - let listener = TcpListener::bind("127.0.0.1:8080").await?; - - loop { - let (socket, _) = listener.accept().await?; - - let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); - - if let Some(backend) = balancer.choose_backend() { - tokio::spawn(async move { - if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await { - eprintln!("error: conn_id={} proxy failed: {}", conn_id, e); - } - }); - } else { - eprintln!("error: no backendsd for conn_id={}", conn_id); - } + if listeners.is_empty() { + eprintln!("its a lawless land"); + return Ok(()); } + + let mut handles = Vec::new(); + + for (port, mut routing_table) in listeners { + handles.push(tokio::spawn(async move { + let addr = format!("0.0.0.0:{}", port); + println!("Starting tcp listener on {}", addr); + + let listener = TcpListener::bind(&addr).await.expect("Failed to bind port"); + + loop { + let (socket, remote_addr) = match listener.accept().await { + Ok(v) => v, + Err(e) => { + eprintln!("error: listener port {}: {}", port, e); + continue; + } + }; + + let remote_ip = remote_addr.ip(); + let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); + + let mut chosen_backend = None; + + for (cidr, balancer_idx) in &mut routing_table.entries { + if cidr.contains(&remote_ip) { + let balancer = &mut routing_table.balancers[*balancer_idx]; + chosen_backend = balancer.choose_backend(); + break; + } + } + + if let Some(backend) = chosen_backend { + tokio::spawn(async move { + if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await { + eprintln!("error: conn_id={} proxy failed: {}", conn_id, e); + } + }); + } else { + println!("error: no matching rule for {} on port {}", remote_ip, port); + } + } + })); + } + + for h in handles { + let _ = h.await; + } + + Ok(()) }