1 Commits

Author SHA1 Message Date
psun256
5ad8539d7a added config hot reload 2025-12-10 15:37:02 -05:00
6 changed files with 431 additions and 188 deletions

251
Cargo.lock generated
View File

@@ -15,12 +15,68 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "anstream"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.61.2",
]
[[package]]
name = "anywho"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arraydeque"
version = "0.5.1"
@@ -39,6 +95,12 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.10.0"
@@ -74,6 +136,52 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60"
[[package]]
name = "clap"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "colorchoice"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -107,6 +215,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "getrandom"
version = "0.3.4"
@@ -137,18 +254,73 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "inotify"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
dependencies = [
"bitflags 2.10.0",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "l4lb"
version = "0.1.0"
dependencies = [
"anywho",
"arc-swap",
"cidr",
"clap",
"notify",
"rand",
"serde",
"serde-saphyr",
@@ -170,6 +342,12 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "memchr"
version = "2.7.6"
@@ -183,6 +361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.61.2",
]
@@ -193,6 +372,30 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
[[package]]
name = "notify"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.10.0",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.60.2",
]
[[package]]
name = "notify-types"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]]
name = "num-traits"
version = "0.2.19"
@@ -208,6 +411,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "once_cell_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "parking_lot"
version = "0.12.5"
@@ -284,7 +493,7 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
"bitflags 2.10.0",
]
[[package]]
@@ -293,6 +502,15 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "saphyr-parser"
version = "0.0.6"
@@ -401,6 +619,12 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "2.0.110"
@@ -446,12 +670,28 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -467,6 +707,15 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "windows-link"
version = "0.2.1"

View File

@@ -10,3 +10,6 @@ rand = "0.10.0-rc.5"
serde = { version = "1.0.228", features = ["derive"] }
cidr = "0.3.1"
serde-saphyr = "0.0.10"
arc-swap = "1.7.1"
clap = { version = "4.5.53", features = ["derive"] }
notify = "8.2.0"

View File

@@ -24,7 +24,6 @@ rules:
- clients:
- "0.0.0.0/0:6767"
- "0.0.0.0/0:6969"
targets: # no issues with duplicate servers or clusters
- "priority-api"
- "priority-api"

View File

@@ -27,7 +27,7 @@ impl AdaptiveWeightBalancer {
.iter()
.map(|b| AdaptiveNode {
backend: b.clone(),
weight: 1f64,
weight: 0f64,
})
.collect();
@@ -85,6 +85,7 @@ impl Balancer for AdaptiveWeightBalancer {
};
let ratio = risk / node.weight;
if ratio <= threshold {
return Some(node.backend.clone());
}
@@ -141,126 +142,3 @@ impl Balancer for AdaptiveWeightBalancer {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::Backend;
use std::net::SocketAddr;
fn backend_factory(id: &str, ip: &str, port: u16) -> Arc<Backend> {
Arc::new(Backend::new(
id.to_string(),
SocketAddr::new(ip.parse().unwrap(), port),
Arc::new(RwLock::new(ServerMetrics::default())),
))
}
fn unused_ctx() -> ConnectionInfo {
ConnectionInfo {
client_ip: ("0.0.0.0".parse().unwrap()),
}
}
#[test]
fn basic_weight_update_and_choose() {
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
]);
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.5, 0.2, 0.2, 0.1], 0.5);
// initially equal weights
// update one backend to be heavily loaded
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(90.0, 80.0, 10.0, 5.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(10.0, 5.0, 1.0, 1.0);
}
// Choose backend: should pick the less loaded host server1
let chosen = b
.choose_backend(unused_ctx())
.expect("should choose a backend");
let sm0: &ServerMetrics = &backends.backends.get(0).unwrap().metrics.read().unwrap();
let sm1: &ServerMetrics = &backends.backends.get(1).unwrap().metrics.read().unwrap();
println!("{:?}, {:?}", sm0, sm1);
assert_eq!(chosen.id, "server-1");
}
#[test]
fn choose_none_when_empty() {
let mut b =
AdaptiveWeightBalancer::new(BackendPool::new(vec![]), [0.5, 0.2, 0.2, 0.1], 0.5);
assert!(b.choose_backend(unused_ctx()).is_none());
}
#[test]
fn ratio_triggers_immediate_selection() {
// Arrange two servers where server 1 has composite load 0 and server 2 has composite load 100.
// With alpha = 1.0 and two servers, threshold = 1.0 * (r_sum / w_sum) = 1.0 * (100 / 2) = 50.
// Server 0 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately.
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
]);
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.25, 0.25, 0.25, 0.25], 1.0);
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(0.0, 0.0, 0.0, 0.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(100.0, 100.0, 100.0, 100.0);
}
let chosen = b
.choose_backend(unused_ctx())
.expect("should choose a backend");
assert_eq!(chosen.id, "server-0");
}
#[test]
fn choose_min_current_load_when_no_ratio() {
// Arrange three servers with identical composite loads so no server satisfies Ri/Wi <= threshold
// (set alpha < 1 so threshold < ratio). The implementation then falls back to picking the
// server with minimum current_load
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
backend_factory("server-2", "127.0.0.1", 3002),
]);
// set current_loads (field expected to be public)
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(10.0, 10.0, 10.0, 10.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(5.0, 5.0, 5.0, 5.0);
}
{
let mut sm2_guard = backends.backends.get(2).unwrap().metrics.write().unwrap();
sm2_guard.update(20.0, 20.0, 20.0, 20.0);
}
// Use coeffs that only consider CPU so composite load is easy to reason about.
let mut bal = AdaptiveWeightBalancer::new(backends.clone(), [1.0, 0.0, 0.0, 0.0], 0.5);
// set identical composite loads > 0 for all so ratio = x and threshold = alpha * x < x
// you will have threshold = 25 for all 3 backend servers and ratio = 50
// so that forces to choose the smallest current load backend
let chosen = bal
.choose_backend(unused_ctx())
.expect("should choose a backend");
// expect server with smallest current_load server-1
assert_eq!(chosen.id, "server-1");
}
}

View File

@@ -14,24 +14,29 @@ pub struct RoutingTable {
pub entries: Vec<(IpCidr, usize)>,
}
fn parse_client(s: &str) -> (IpCidr, u16) {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':').expect("badly formatted client");
let port: u16 = port_part.parse().expect("bad port");
let cidr: IpCidr = ip_part.parse().expect("bad ip/mask");
(cidr, port)
}
pub type PortListeners = HashMap<u16, RoutingTable>;
pub fn build_lb(config: AppConfig) -> PortListeners {
fn parse_client(s: &str) -> Result<(IpCidr, u16), String> {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':')
.ok_or_else(|| format!("badly formatted client: {}", s))?;
let port = port_part.parse()
.map_err(|_| format!("bad port: {}", s))?;
let cidr = ip_part.parse()
.map_err(|_| format!("bad ip/mask: {}", s))?;
Ok((cidr, port))
}
pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>), String> {
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
for backend_cfg in config.backends {
let ip: IpAddr = backend_cfg.ip.parse().unwrap();
let ip: IpAddr = backend_cfg.ip.parse()
.map_err(|_| format!("bad ip: {}", backend_cfg.ip))?;
let addr = SocketAddr::new(ip, backend_cfg.port);
let health = healths
@@ -80,7 +85,7 @@ pub fn build_lb(config: AppConfig) -> PortListeners {
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
for client_def in rule.clients {
let (cidr, port) = parse_client(&client_def);
let (cidr, port) = parse_client(&client_def)?;
port_groups.entry(port).or_default().push(cidr);
}
@@ -116,5 +121,5 @@ pub fn build_lb(config: AppConfig) -> PortListeners {
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length()));
}
listeners
Ok((listeners, healths))
}

View File

@@ -3,81 +3,190 @@ mod balancer;
mod config;
mod proxy;
use crate::balancer::{Balancer, ConnectionInfo};
use std::collections::HashMap;
use crate::balancer::{ConnectionInfo};
use crate::proxy::tcp::proxy_tcp_connection;
use std::fs::File;
use std::path::PathBuf;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use anywho::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use crate::backend::ServerMetrics;
use crate::config::loader::{build_lb, RoutingTable};
use notify::{Watcher, RecursiveMode, Event};
use clap::Parser;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
struct ProgramState {
tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let f = File::open("config.yaml").expect("couldn't open config.yaml");
let app_config: config::AppConfig = serde_saphyr::from_reader(f)?;
let args = Args::parse();
if !args.config.is_file() {
eprintln!("config file not found or not accessible");
std::process::exit(1);
}
println!("reading config from {:?}", args.config);
let state = Arc::new(Mutex::new(ProgramState {
tx_rt_map: HashMap::new(),
healths: HashMap::new(),
}));
if let Err(e) = load_config(&args.config, state.clone()).await {
eprintln!("config file loading failed: {}", e);
}
let config_path = args.config.clone();
let state_clone = state.clone();
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
if event.kind.is_modify() {
let _ = tx.blocking_send(());
}
}
}).unwrap();
watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap();
println!("watching for changes to {:?}", config_path);
while rx.recv().await.is_some() {
if let Err(e) = load_config(&config_path, state_clone.clone()).await {
eprintln!("loading config failed: {}", e);
}
}
});
loop { tokio::time::sleep(Duration::from_hours(1)).await; }
}
async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> {
let f = File::open(path)?;
let app_config: config::AppConfig = match serde_saphyr::from_reader(f) {
Ok(app_config) => app_config,
Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); }
};
println!(
"Loaded {} backends, {} rules.",
"Loaded config, with {} backends, {} rules.",
app_config.backends.len(),
app_config.rules.len()
);
let listeners = config::loader::build_lb(app_config);
let (mut listeners, health_monitors) = match build_lb(app_config) {
Ok(v) => v,
Err(e) => {
eprintln!("config has logical errors: {}", e);
return Ok(());
}
};
let mut prog_state = state.lock().unwrap();
if listeners.is_empty() {
eprintln!("its a lawless land");
return Ok(());
let ports_to_remove: Vec<u16> = prog_state.tx_rt_map
.keys()
.cloned()
.filter(|port| !listeners.contains_key(port))
.collect();
for port in ports_to_remove {
prog_state.tx_rt_map.remove(&port);
}
let mut handles = Vec::new();
prog_state.healths = health_monitors;
for (port, routing_table) in listeners.drain() {
if let Some(x) = prog_state.tx_rt_map.get_mut(&port) {
x.send(routing_table)?;
println!("updated rules on port {}", port);
} else {
let (tx_rt, rx_rt) = mpsc::unbounded_channel();
prog_state.tx_rt_map.insert(port, tx_rt);
for (port, mut routing_table) in listeners {
handles.push(tokio::spawn(async move {
let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr);
tokio::spawn(run_listener(port, rx_rt, routing_table));
}
}
let listener = TcpListener::bind(&addr).await.expect("Failed to bind port");
println!("reload complete");
Ok(())
}
loop {
let (socket, remote_addr) = match listener.accept().await {
Ok(v) => v,
Err(e) => {
eprintln!("error: listener port {}: {}", port, e);
continue;
async fn run_listener(
port: u16,
mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>,
mut current_table: RoutingTable
) {
let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr);
let listener = TcpListener::bind(&addr).await.expect("Failed to bind port");
loop {
tokio::select! {
msg = rx_rt.recv() => {
match msg {
Some(new_table) => {
current_table = new_table;
}
};
let remote_ip = remote_addr.ip();
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
let mut chosen_backend = None;
for (cidr, balancer_idx) in &mut routing_table.entries {
if cidr.contains(&remote_ip) {
let balancer = &mut routing_table.balancers[*balancer_idx];
chosen_backend = balancer.choose_backend(ConnectionInfo {
client_ip: remote_ip,
});
None => {
println!("Unbinding listener on port {}", port);
break;
}
}
}
accept_result = listener.accept() => {
match accept_result {
Ok((socket, remote_addr)) => {
let remote_ip = remote_addr.ip();
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
if let Some(backend) = chosen_backend {
tokio::spawn(async move {
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
let mut chosen_backend = None;
for (cidr, balancer_idx) in &mut current_table.entries {
if cidr.contains(&remote_ip) {
let balancer = &mut current_table.balancers[*balancer_idx];
chosen_backend = balancer.choose_backend(ConnectionInfo {
client_ip: remote_ip,
});
break;
}
}
});
} else {
println!("error: no matching rule for {} on port {}", remote_ip, port);
if let Some(backend) = chosen_backend {
tokio::spawn(async move {
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
}
});
} else {
println!("error: no matching rule for {} on port {}", remote_ip, port);
}
}
Err(e) => {
eprintln!("error: listener port {}: {}", port, e);
continue;
}
}
}
}));
}
}
for h in handles {
let _ = h.await;
}
Ok(())
}