diff --git a/Cargo.lock b/Cargo.lock index c908bfa..d775ae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,11 +26,44 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chacha20" +version = "0.10.0-rc.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cbf41c6ec3c4b9eaf7f8f5c11a72cd7d3aa0428125c20d5ef4d09907a0f019" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core", +] + +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", +] + [[package]] name = "l4lb" version = "0.1.0" dependencies = [ "anywho", + "rand", "tokio", ] @@ -107,6 +140,29 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.10.0-rc.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307" +dependencies = [ + "chacha20", + "getrandom", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.10.0-rc-2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "104a23e4e8b77312a823b6b5613edbac78397e2f34320bc7ac4277013ec4478e" + [[package]] name = "redox_syscall" version = "0.5.18" @@ -198,6 +254,15 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "windows-link" version = "0.2.1" @@ -286,3 +351,9 @@ name = "windows_x86_64_msvc" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" diff --git a/Cargo.toml b/Cargo.toml index ad17095..a4a9d43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,4 @@ edition = "2024" [dependencies] anywho = "0.1.2" tokio = { version = "1.48.0", features = ["full"] } +rand = "0.10.0-rc.5" diff --git a/src/backend/health.rs b/src/backend/health.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 9efcfc2..1058fc3 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -1,22 +1,53 @@ +pub mod health; + use core::fmt; use std::net::SocketAddr; use std::sync::RwLock; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +// Physical server information +#[derive(Debug)] +pub struct Server { + pub endpoints: Arc>>, + pub metrics: Arc>, +} + +// Physical server health statistics, used for certain load balancing algorithms +#[derive(Debug, Default)] +pub struct ServerHealth { + pub cpu: f64, + pub mem: f64, + pub net: f64, + pub io: f64, +} + +impl ServerHealth { + pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) { + self.cpu = cpu; + self.mem = mem; + self.net = net; + self.io = io; + } +} + +// A possible endpoint for a proxied connection. +// Note that multiple may live on the same server, hence the Arc> #[derive(Debug)] pub struct Backend { pub id: String, pub address: SocketAddr, pub active_connections: AtomicUsize, + pub metrics: Arc>, } impl Backend { - pub fn new(id: String, address: SocketAddr) -> Self { + pub fn new(id: String, address: SocketAddr, server_metrics: Arc>) -> Self { Self { id: id.to_string(), address, active_connections: AtomicUsize::new(0), + metrics: server_metrics, } } @@ -40,19 +71,18 @@ impl fmt::Display for Backend { } } +// A set of endpoints that can be load balanced around. +// Each Balancer owns one of these. Backend instances may be shared +// with other Balancer instances, hence Arc. #[derive(Clone, Debug)] pub struct BackendPool { - pub backends: Arc>>>, + pub backends: Arc>>, } impl BackendPool { - pub fn new() -> Self { + pub fn new(backends: Vec>) -> Self { BackendPool { - backends: Arc::new(RwLock::new(Vec::new())), + backends: Arc::new(backends), } } - - pub fn add(&self, backend: Backend) { - self.backends.write().unwrap().push(Arc::new(backend)); - } -} \ No newline at end of file +} diff --git a/src/balancer/adaptive_weight.rs b/src/balancer/adaptive_weight.rs new file mode 100644 index 0000000..cffd531 --- /dev/null +++ b/src/balancer/adaptive_weight.rs @@ -0,0 +1,135 @@ +use std::sync::{Arc, RwLock}; +use std::fmt::Debug; +use std::fs::Metadata; +use crate::backend::{Backend, BackendPool, ServerHealth}; +use crate::balancer::Balancer; +use rand::prelude::*; +use rand::rngs::SmallRng; + +#[derive(Debug)] +struct AdaptiveNode { + backend: Arc, + weight: f64, +} + +#[derive(Debug)] +pub struct AdaptiveWeightBalancer { + pool: Vec, + coefficients: [f64; 4], + alpha: f64, + rng: SmallRng, +} + +impl AdaptiveWeightBalancer { + pub fn new(pool: BackendPool, coefficients: [f64; 4], alpha: f64) -> Self { + let nodes = pool.backends + .iter() + .map(|b| AdaptiveNode { + backend: b.clone(), + weight: 0f64, + }) + .collect(); + + AdaptiveWeightBalancer { + pool: nodes, + coefficients, + alpha, + rng: SmallRng::from_rng(&mut rand::rng()) + } + } + + pub fn metrics_to_weight(&self, metrics: &ServerHealth) -> f64 { + self.coefficients[0] * metrics.cpu + + self.coefficients[1] * metrics.mem + + self.coefficients[2] * metrics.net + + self.coefficients[3] * metrics.io + } +} + +impl Balancer for AdaptiveWeightBalancer { + fn choose_backend(&mut self) -> Option> { + if self.pool.is_empty() { + return None; + } + + // Compute remaining capacity R_i = 100 - composite_load + let mut r_sum = 0.0; + let mut w_sum = 0.0; + let mut l_sum = 0; + + for node in &self.pool { + if let Ok(health) = node.backend.metrics.read() { + r_sum += self.metrics_to_weight(&health); + } + w_sum += node.weight; + l_sum += node.backend.active_connections + .load(std::sync::atomic::Ordering::Relaxed); + } + + let safe_w_sum = w_sum.max(1e-12); + let threshold = self.alpha * (r_sum / safe_w_sum); + + for idx in 0..self.pool.len() { + let node = &self.pool[idx]; + + if node.weight <= 0.001 { continue; } + + let risk = match node.backend.metrics.read() { + Ok(h) => self.metrics_to_weight(&h), + Err(_) => f64::MAX, + }; + + let ratio = risk / node.weight; + + if ratio <= threshold { + return Some(node.backend.clone()); + } + } + + // If any server satisfies Ri/Wi <= threshold, it means the server + // is relatively overloaded, and we must adjust its weight using + // formula (6). + let mut total_lwi = 0.0; + let l_sum_f64 = l_sum as f64; + + for node in &self.pool { + let load = node.backend.active_connections + .load(std::sync::atomic::Ordering::Relaxed) as f64; + let weight = node.weight.max(1e-12); + let lwi = load * (safe_w_sum / weight) * l_sum_f64; + total_lwi += lwi; + } + + let avg_lwi = (total_lwi / self.pool.len() as f64).max(1e-12); + + // Compute Li = Wi / Ri and choose server minimizing Li. + let mut best_backend: Option> = None; + let mut min_load = usize::MAX; + + for node in &mut self.pool { + let load = node.backend.active_connections + .load(std::sync::atomic::Ordering::Relaxed); + let load_f64 = load as f64; + let weight = node.weight.max(1e-12); + + let lwi = load_f64 * (safe_w_sum / weight) * l_sum_f64; + + let adj = 1.0 - (lwi / avg_lwi); + node.weight += adj; + + node.weight = node.weight.clamp(0.1, 100.0); + if load < min_load { + min_load = load; + best_backend = Some(node.backend.clone()); + } + } + + match best_backend { + Some(backend) => Some(backend), + None => { + let i = (self.rng.next_u32() as usize) % self.pool.len(); + Some(self.pool[i].backend.clone()) + } + } + } +} \ No newline at end of file diff --git a/src/balancer/ip_hashing.rs b/src/balancer/ip_hashing.rs new file mode 100644 index 0000000..7edf986 --- /dev/null +++ b/src/balancer/ip_hashing.rs @@ -0,0 +1,4 @@ +use super::*; +pub fn test() { + println!("Hello from RR"); +} diff --git a/src/balancer/least_connections.rs b/src/balancer/least_connections.rs new file mode 100644 index 0000000..4563e55 --- /dev/null +++ b/src/balancer/least_connections.rs @@ -0,0 +1 @@ +use super::*; diff --git a/src/balancer/mod.rs b/src/balancer/mod.rs index 555f6c2..e5a280e 100644 --- a/src/balancer/mod.rs +++ b/src/balancer/mod.rs @@ -1,4 +1,7 @@ pub mod round_robin; +pub mod adaptive_weight; +pub mod least_connections; +pub mod ip_hashing; use std::fmt::Debug; use std::sync::Arc; @@ -6,4 +9,4 @@ use crate::backend::Backend; pub trait Balancer: Debug + Send + Sync + 'static { fn choose_backend(&mut self) -> Option>; -} \ No newline at end of file +} diff --git a/src/balancer/round_robin.rs b/src/balancer/round_robin.rs index 3269a69..dc41510 100644 --- a/src/balancer/round_robin.rs +++ b/src/balancer/round_robin.rs @@ -23,11 +23,11 @@ impl RoundRobinBalancer { impl Balancer for RoundRobinBalancer { fn choose_backend(&mut self) -> Option> { - let backends = self.pool.backends.read().unwrap(); + let backends = self.pool.backends.clone(); if backends.is_empty() { return None; } let backend = backends[self.index % backends.len()].clone(); self.index = self.index.wrapping_add(1); Some(backend) } -} \ No newline at end of file +} diff --git a/src/config.rs b/src/config.rs index 6c59f64..adbbc17 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,4 +3,4 @@ // define sets of backends // allowed set operations for now is just union // rules are ip + mask and ports, maps to some of the sets -// defined earlier, along with a routing strategy \ No newline at end of file +// defined earlier, along with a routing strategy diff --git a/src/main.rs b/src/main.rs index 2b4a373..136bf49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,9 @@ mod proxy; use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use crate::backend::{Backend, BackendPool}; +use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicU64, Ordering}; +use crate::backend::{Backend, BackendPool, ServerHealth}; use crate::balancer::Balancer; use crate::balancer::round_robin::RoundRobinBalancer; use crate::proxy::tcp::proxy_tcp_connection; @@ -18,26 +18,29 @@ static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); #[tokio::main] async fn main() -> Result<(), Box> { - let pool = BackendPool::new(); - - pool.add(Backend::new( + let mut pool: Vec> = Vec::new(); + let server_metric = Arc::new(RwLock::new(ServerHealth::default())); + + pool.push(Arc::new(Backend::new( "backend 1".into(), "127.0.0.1:8081".parse().unwrap(), - )); + server_metric.clone() + ))); - pool.add(Backend::new( + pool.push(Arc::new(Backend::new( "backend 2".into(), "127.0.0.1:8082".parse().unwrap(), - )); + server_metric.clone() + ))); - let mut balancer = RoundRobinBalancer::new(pool.clone()); + let mut balancer = RoundRobinBalancer::new(BackendPool::new(pool)); let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, _) = listener.accept().await?; - let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); if let Some(backend) = balancer.choose_backend() { tokio::spawn(async move { diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 7b8e6df..e202bda 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -40,4 +40,4 @@ impl Drop for ConnectionContext { duration.as_secs_f64() ); } -} \ No newline at end of file +} diff --git a/src/proxy/tcp.rs b/src/proxy/tcp.rs index c03bfb6..b8eb0b2 100644 --- a/src/proxy/tcp.rs +++ b/src/proxy/tcp.rs @@ -23,4 +23,4 @@ pub async fn proxy_tcp_connection(connection_id: u64, mut client_stream: TcpStre ctx.bytes_transferred = tx + rx; Ok(()) -} \ No newline at end of file +}