the algorithm is working, but will need more test

This commit is contained in:
nnhphong
2025-12-07 23:04:29 -05:00
parent 742827b16f
commit 08cb522f93
3 changed files with 25 additions and 31 deletions

View File

@@ -2,7 +2,6 @@ use crate::netutils::Backend;
use rand::prelude::*; use rand::prelude::*;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -19,7 +18,7 @@ impl ServerMetrics {
ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 } ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 }
} }
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64, alpha: f64) { pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu; self.cpu = cpu;
self.mem = mem; self.mem = mem;
self.net = net; self.net = net;
@@ -64,18 +63,14 @@ impl AdaptiveBalancer {
pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) { pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) {
for s in &mut self.servers { for s in &mut self.servers {
if s.backend.to_string() == backend_addr { if s.backend.to_string() == backend_addr {
s.metrics.update(cpu, mem, net, io, self.alpha); s.metrics.update(cpu, mem, net, io);
return; return;
} }
} }
} }
fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 { fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 {
let l = coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io; coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io
// convert load to a score where higher is better: raw = 100 - L
let raw = (100.0 - l).max(0.0);
// amplify differences (square) and add small epsilon to avoid zero
raw * raw + 1e-6
} }
/// Choose a backend using weighted random selection based on current weights. /// Choose a backend using weighted random selection based on current weights.
@@ -89,17 +84,18 @@ impl AdaptiveBalancer {
let rs: Vec<f64> = self.servers.iter().map(|s| { let rs: Vec<f64> = self.servers.iter().map(|s| {
Self::metrics_to_weight(&s.metrics, &self.coeffs) Self::metrics_to_weight(&s.metrics, &self.coeffs)
}).collect(); }).collect();
let ws: Vec<f64> = self.servers.iter().map(|s| s.weight).collect(); let ws: Vec<f64> = self.servers.iter().map(|s| s.weight).collect();
let ls: Vec<u32> = self.servers.iter().map(|s| s.backend.current_load).collect();
let r_sum: f64 = rs.iter().copied().sum::<f64>(); let r_sum: f64 = rs.iter().copied().sum::<f64>();
let w_sum: f64 = ws.iter().copied().sum::<f64>().max(1e-12); let w_sum: f64 = ws.iter().copied().sum::<f64>().max(1e-12);
let l_sum: u32 = ls.iter().copied().sum::<u32>();
let threshold = self.alpha * (r_sum / w_sum); let threshold = self.alpha * (r_sum / w_sum);
for (i, s) in self.servers.iter_mut().enumerate() { for (i, s) in self.servers.iter_mut().enumerate() {
let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight }; let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight };
if ratio <= threshold { if ratio <= threshold {
Some(Arc::new(s.backend.clone())); return Some(Arc::new(s.backend.clone()));
} }
} }
@@ -107,16 +103,20 @@ impl AdaptiveBalancer {
// is relatively overloaded and we must adjust its weight using // is relatively overloaded and we must adjust its weight using
// formula (6). // formula (6).
// TODO: adjust weight let lwi: Vec<f64> = self.servers.iter().enumerate().map(|(i, s)| {
s.backend.current_load as f64 * w_sum / ws[i] * l_sum as f64
}).collect();
let a_lwi: f64 = lwi.iter().copied().sum::<f64>() / lwi.len() as f64;
for (i, s) in self.servers.iter_mut().enumerate() {
s.weight += 1 as f64 - lwi[i] / a_lwi;
}
// Compute Li = Wi / Ri and choose server minimizing Li. This gives // Compute Li = Wi / Ri and choose server minimizing Li.
// preference to servers with high remaining capacity relative to
// their weight (lower Li better).
let mut best_idx: Option<usize> = None; let mut best_idx: Option<usize> = None;
let mut best_li = AtomicUsize::new(usize::MAX); let mut best_li = u32::MAX;
for (i, s) in self.servers.iter().enumerate() { for (i, s) in self.servers.iter().enumerate() {
let li = s.backend.current_load; let li = s.backend.current_load;
if compare_atomic(&li, &best_li) == -1 { if li < best_li {
best_li = li; best_li = li;
best_idx = Some(i); best_idx = Some(i);
} }
@@ -137,14 +137,6 @@ impl AdaptiveBalancer {
} }
} }
fn compare_atomic(a: &AtomicUsize, b: &AtomicUsize) -> i32 {
let x = a.load(Ordering::SeqCst);
let y = b.load(Ordering::SeqCst);
if x < y { return -1; }
if x > y { return 1; }
return 0;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -159,14 +151,11 @@ mod tests {
// update one backend to be heavily loaded // update one backend to be heavily loaded
b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0); b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0);
b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0); b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0);
let snaps2 = b.snapshot_weights();
println!("{:?}, {:?}", snaps, snaps2);
// after update the first server (127.0.0.1:1) should have a worse
// weight (higher composite load -> lower remaining capacity)
// assert!(snaps2[0].1 < snaps2[1].1);
// Choose backend: should pick the less loaded host (127.0.0.1:2) // Choose backend: should pick the less loaded host (127.0.0.1:2)
let chosen = b.choose_backend().expect("should choose a backend"); let chosen = b.choose_backend().expect("should choose a backend");
let snaps2 = b.snapshot_weights();
println!("{:?}, {:?}", snaps, snaps2);
assert_eq!(chosen.to_string(), "127.0.0.1:2"); assert_eq!(chosen.to_string(), "127.0.0.1:2");
} }
} }

View File

@@ -13,6 +13,7 @@ macro_rules! error {
} }
mod netutils; mod netutils;
mod balancer;
use anywho::Error; use anywho::Error;
use netutils::{Backend, tunnel}; use netutils::{Backend, tunnel};

View File

@@ -7,11 +7,15 @@ use std::error::Error;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Backend { pub struct Backend {
address: String, address: String,
pub current_load : u32
} }
impl Backend { impl Backend {
pub fn new(address: String) -> Self { pub fn new(address: String) -> Self {
Backend { address } Backend {
address,
current_load : 0
}
} }
} }