diff --git a/src/balancer/adaptive_weight.rs b/src/balancer/adaptive_weight.rs index 46b0e47..1a30463 100644 --- a/src/balancer/adaptive_weight.rs +++ b/src/balancer/adaptive_weight.rs @@ -2,7 +2,6 @@ use crate::netutils::Backend; use rand::prelude::*; use rand::rngs::SmallRng; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[derive(Debug, Clone)] @@ -19,7 +18,7 @@ impl ServerMetrics { ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 } } - pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64, alpha: f64) { + pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) { self.cpu = cpu; self.mem = mem; self.net = net; @@ -64,18 +63,14 @@ impl AdaptiveBalancer { pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) { for s in &mut self.servers { if s.backend.to_string() == backend_addr { - s.metrics.update(cpu, mem, net, io, self.alpha); + s.metrics.update(cpu, mem, net, io); return; } } } fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 { - let l = coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io; - // convert load to a score where higher is better: raw = 100 - L - let raw = (100.0 - l).max(0.0); - // amplify differences (square) and add small epsilon to avoid zero - raw * raw + 1e-6 + coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io } /// Choose a backend using weighted random selection based on current weights. @@ -89,34 +84,39 @@ impl AdaptiveBalancer { let rs: Vec = self.servers.iter().map(|s| { Self::metrics_to_weight(&s.metrics, &self.coeffs) }).collect(); - let ws: Vec = self.servers.iter().map(|s| s.weight).collect(); + let ls: Vec = self.servers.iter().map(|s| s.backend.current_load).collect(); let r_sum: f64 = rs.iter().copied().sum::(); let w_sum: f64 = ws.iter().copied().sum::().max(1e-12); + let l_sum: u32 = ls.iter().copied().sum::(); let threshold = self.alpha * (r_sum / w_sum); for (i, s) in self.servers.iter_mut().enumerate() { let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight }; if ratio <= threshold { - Some(Arc::new(s.backend.clone())); + return Some(Arc::new(s.backend.clone())); } } // If any server satisfies Ri/Wi <= threshold, it means the server // is relatively overloaded and we must adjust its weight using // formula (6). - - // TODO: adjust weight + + let lwi: Vec = self.servers.iter().enumerate().map(|(i, s)| { + s.backend.current_load as f64 * w_sum / ws[i] * l_sum as f64 + }).collect(); + let a_lwi: f64 = lwi.iter().copied().sum::() / lwi.len() as f64; + for (i, s) in self.servers.iter_mut().enumerate() { + s.weight += 1 as f64 - lwi[i] / a_lwi; + } - // Compute Li = Wi / Ri and choose server minimizing Li. This gives - // preference to servers with high remaining capacity relative to - // their weight (lower Li better). + // Compute Li = Wi / Ri and choose server minimizing Li. let mut best_idx: Option = None; - let mut best_li = AtomicUsize::new(usize::MAX); + let mut best_li = u32::MAX; for (i, s) in self.servers.iter().enumerate() { let li = s.backend.current_load; - if compare_atomic(&li, &best_li) == -1 { + if li < best_li { best_li = li; best_idx = Some(i); } @@ -137,14 +137,6 @@ impl AdaptiveBalancer { } } -fn compare_atomic(a: &AtomicUsize, b: &AtomicUsize) -> i32 { - let x = a.load(Ordering::SeqCst); - let y = b.load(Ordering::SeqCst); - if x < y { return -1; } - if x > y { return 1; } - return 0; -} - #[cfg(test)] mod tests { use super::*; @@ -159,14 +151,11 @@ mod tests { // update one backend to be heavily loaded b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0); b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0); - let snaps2 = b.snapshot_weights(); - println!("{:?}, {:?}", snaps, snaps2); - // after update the first server (127.0.0.1:1) should have a worse - // weight (higher composite load -> lower remaining capacity) - // assert!(snaps2[0].1 < snaps2[1].1); // Choose backend: should pick the less loaded host (127.0.0.1:2) let chosen = b.choose_backend().expect("should choose a backend"); + let snaps2 = b.snapshot_weights(); + println!("{:?}, {:?}", snaps, snaps2); assert_eq!(chosen.to_string(), "127.0.0.1:2"); } } diff --git a/src/main.rs b/src/main.rs index adff2c3..4c650d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ macro_rules! error { } mod netutils; +mod balancer; use anywho::Error; use netutils::{Backend, tunnel}; diff --git a/src/netutils.rs b/src/netutils.rs index bd40aa4..004f963 100644 --- a/src/netutils.rs +++ b/src/netutils.rs @@ -7,11 +7,15 @@ use std::error::Error; #[derive(Clone, Debug)] pub struct Backend { address: String, + pub current_load : u32 } impl Backend { pub fn new(address: String) -> Self { - Backend { address } + Backend { + address, + current_load : 0 + } } }