diff --git a/Cargo.lock b/Cargo.lock index c908bfa..c400951 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,11 +26,23 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "l4lb" version = "0.1.0" dependencies = [ "anywho", + "rand", "tokio", ] @@ -89,6 +101,15 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -107,6 +128,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -286,3 +337,23 @@ name = "windows_x86_64_msvc" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + +[[package]] +name = "zerocopy" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index ad17095..5f2710d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,4 @@ edition = "2024" [dependencies] anywho = "0.1.2" tokio = { version = "1.48.0", features = ["full"] } +rand = { version = "0.8", features = ["small_rng"] } diff --git a/README.md b/README.md index 94ae351..4a03c12 100644 --- a/README.md +++ b/README.md @@ -104,3 +104,12 @@ process to load balance: - connect to the server - proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually) - cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions) + +## Load balancer algorithm +- Choose a fixed weight coefficient for the resource parameter +- Spawn a thread on a load balancer to host the iperf server, used for new onboarding server connecting to the load balancer to measure their maximum bandwidth +- Spawn another thread for listening to resource update from connected server +- Update the comprehensive load sum from eq (1), update the formula in eq (2) to (5) +- Choose alpha for eq (8), and run the algorithm to choose which server +- Extract the server from the server id using ```get_backend()``` +- Use ```tunnel()``` to proxy the packet diff --git a/W110.pdf b/W110.pdf new file mode 100644 index 0000000..566f3aa Binary files /dev/null and b/W110.pdf differ diff --git a/src/backend/mod.rs b/src/backend/mod.rs new file mode 100644 index 0000000..4bfd49c --- /dev/null +++ b/src/backend/mod.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::RwLock; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +pub struct BackendPool { + pub backends: Arc>>>, +} + +#[derive(Debug)] +pub struct Backend { + pub id: String, + pub address: SocketAddr, + pub is_healthy: AtomicBool, // no clue how this should work, for now + pub current_load: AtomicUsize, // no clue how this should work, for now +} + +impl BackendPool { + pub fn new(initial_backends: Vec>) -> Self { + let mut map = HashMap::new(); + for backend in initial_backends { + map.insert(backend.id.clone(), backend); + } + + Self { + backends: Arc::new(RwLock::new(map)), + } + } + + pub fn add_backend(&self, backend: Arc) { + let mut backends_guard = self.backends + .write() + .expect("BackendPool lock poisoned"); + // let backends_guard = self.backends.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + backends_guard.insert(backend.id.clone(), backend); + } + + pub fn get_backend(&self, id: &str) -> Option> { + let backends_guard = self.backends + .read() + .expect("BackendPool lock poisoned"); + // let backends_guard = self.backends.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + backends_guard.get(id).cloned() + } + + pub fn bruh_amogus_sus(&self) { + for k in self.backends.read().unwrap().keys() { + self.backends.write().unwrap().get(k).unwrap().increment_current_load(); + } + } +} + +impl Backend { + pub fn new(id: String, address: SocketAddr) -> Self { + Self { + id: id, + address: address, + is_healthy: AtomicBool::new(false), + current_load: AtomicUsize::new(0), + } + } + + pub fn increment_current_load(&self) { + self.current_load.fetch_add(1, Ordering::SeqCst); + } + + pub fn decrement_current_load(&self) { + self.current_load.fetch_sub(1, Ordering::SeqCst); + } +} \ No newline at end of file diff --git a/src/balancer/adaptive-weight.rs b/src/balancer/adaptive-weight.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/balancer/adaptive_weight.rs b/src/balancer/adaptive_weight.rs new file mode 100644 index 0000000..c74b888 --- /dev/null +++ b/src/balancer/adaptive_weight.rs @@ -0,0 +1,180 @@ +use crate::netutils::Backend; +use rand::prelude::*; +use rand::rngs::SmallRng; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + + +#[derive(Debug, Clone)] +pub struct ServerMetrics { + // metrics are percents (0..100) + pub cpu: f64, + pub mem: f64, + pub net: f64, + pub io: f64, +} + +impl ServerMetrics { + pub fn new() -> Self { + ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 } + } + + pub fn update_ema(&mut self, cpu: f64, mem: f64, net: f64, io: f64, alpha: f64) { + self.cpu = cpu; + self.mem = mem; + self.net = net; + self.io = io; + } +} + +#[derive(Debug, Clone)] +pub struct ServerState { + pub backend: Backend, + pub metrics: ServerMetrics, + pub weight: f64, +} + +impl ServerState { + pub fn new(backend: Backend) -> Self { + ServerState { backend, metrics: ServerMetrics::new(), weight: 1.0 } + } +} + +/// This implementation keeps an EMA of reported resource usage (cpu/mem/net/io), +/// computes a composite load L = sum(coeff_i * metric_i) and converts that to +/// a selection weight. Lower load -> higher weight. Selection is weighted-random. +pub struct AdaptiveBalancer { + servers: Vec, + // resource coefficients (cpu, mem, net, io) - sum to 1.0 + coeffs: [f64; 4], + // EMA smoothing factor (alpha) + alpha: f64, + rng: SmallRng, +} + +impl AdaptiveBalancer { + /// Create a new balancer from a list of backends. + /// `coeffs` are the importance weights for cpu,mem,net,io respectively. + /// `alpha` controls EMA smoothing (0..1). Typical alpha ~0.2-0.5. + pub fn new(backends: Vec, coeffs: [f64; 4], alpha: f64) -> Self { + let servers = backends.into_iter().map(ServerState::new).collect(); + let rng = SmallRng::from_entropy(); + AdaptiveBalancer { servers, coeffs, alpha, rng } + } + + /// Add a backend at runtime. + pub fn add_backend(&mut self, backend: Backend) { + self.servers.push(ServerState::new(backend)); + } + + /// Update metrics reported by a backend identified by its display/address. + /// If the backend isn't found this is a no-op. + pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) { + for s in &mut self.servers { + if s.backend.to_string() == backend_addr { + s.metrics.update_ema(cpu, mem, net, io, self.alpha); + return; + } + } + } + + fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 { + let l = coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io; + // convert load to a score where higher is better: raw = 100 - L + let raw = (100.0 - l).max(0.0); + // amplify differences (square) and add small epsilon to avoid zero + raw * raw + 1e-6 + } + + /// Choose a backend using weighted random selection based on current weights. + /// Returns an Arc-wrapped Backend clone so callers can cheaply clone it. + pub fn choose_backend(&mut self) -> Option> { + if self.servers.is_empty() { + return None; + } + + // Compute remaining capacity R_i = 100 - composite_load + let rs: Vec = self.servers.iter().map(|s| { + Self::metrics_to_weight(&s.metrics, &self.coeffs) + }).collect(); + + let ws: Vec = self.servers.iter().map(|s| s.weight).collect(); + + let r_sum: f64 = rs.iter().copied().sum::(); + let w_sum: f64 = ws.iter().copied().sum::().max(1e-12); + let threshold = self.alpha * (r_sum / w_sum); + + for (i, s) in self.servers.iter_mut().enumerate() { + let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight }; + if ratio <= threshold { + Some(Arc::new(s.backend.clone())); + } + } + + // If any server satisfies Ri/Wi <= threshold, it means the server + // is relatively overloaded and we must adjust its weight using + // formula (6). + + // TODO: adjust weight + + // Compute Li = Wi / Ri and choose server minimizing Li. This gives + // preference to servers with high remaining capacity relative to + // their weight (lower Li better). + let mut best_idx: Option = None; + let mut best_li = AtomicUsize::new(usize::MAX); + for (i, s) in self.servers.iter().enumerate() { + let li = s.backend.current_load; + if compare_atomic(&li, &best_li) == -1 { + best_li = li; + best_idx = Some(i); + } + } + + // If nothing chosen, fall back to random selection + if best_idx.is_none() { + let i = (self.rng.next_u32() as usize) % self.servers.len(); + return Some(Arc::new(self.servers[i].backend.clone())); + } + + Some(Arc::new(self.servers[best_idx.unwrap()].backend.clone())) + } + + // Expose a snapshot of server weights (for monitoring/testing) + pub fn snapshot_weights(&self) -> Vec<(String, f64)> { + self.servers.iter().map(|s| (s.backend.to_string(), s.weight)).collect() + } +} + +fn compare_atomic(a: &AtomicUsize, b: &AtomicUsize) -> i32 { + let x = a.load(Ordering::SeqCst); + let y = b.load(Ordering::SeqCst); + if x < y { return -1; } + if x > y { return 1; } + return 0; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic_weight_update_and_choose() { + let backends = vec![Backend::new("127.0.0.1:1".to_string()), Backend::new("127.0.0.1:2".to_string())]; + let mut b = AdaptiveBalancer::new(backends, [0.5, 0.2, 0.2, 0.1], 0.5); + // initially equal weights + let snaps = b.snapshot_weights(); + assert_eq!(snaps.len(), 2); + // update one backend to be heavily loaded + b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0); + b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0); + let snaps2 = b.snapshot_weights(); + println!("{:?}, {:?}", snaps, snaps2); + // after update the first server (127.0.0.1:1) should have a worse + // weight (higher composite load -> lower remaining capacity) + // assert!(snaps2[0].1 < snaps2[1].1); + + // Choose backend: should pick the less loaded host (127.0.0.1:2) + let chosen = b.choose_backend().expect("should choose a backend"); + assert_eq!(chosen.to_string(), "127.0.0.1:2"); + } +} diff --git a/src/balancer/mod.rs b/src/balancer/mod.rs index e69de29..f628f66 100644 --- a/src/balancer/mod.rs +++ b/src/balancer/mod.rs @@ -0,0 +1,2 @@ +pub mod adaptive_weight; +pub use adaptive_weight::AdaptiveBalancer;