added adaptive weight balancing algorithm

This commit is contained in:
psun256
2025-12-09 18:31:22 -05:00
parent a3f50c1f0a
commit 20b51c2562
13 changed files with 274 additions and 26 deletions

0
src/backend/health.rs Normal file
View File

View File

@@ -1,22 +1,53 @@
pub mod health;
use core::fmt;
use std::net::SocketAddr;
use std::sync::RwLock;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
// Physical server information
#[derive(Debug)]
pub struct Server {
pub endpoints: Arc<Vec<Arc<Backend>>>,
pub metrics: Arc<RwLock<ServerHealth>>,
}
// Physical server health statistics, used for certain load balancing algorithms
#[derive(Debug, Default)]
pub struct ServerHealth {
pub cpu: f64,
pub mem: f64,
pub net: f64,
pub io: f64,
}
impl ServerHealth {
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu;
self.mem = mem;
self.net = net;
self.io = io;
}
}
// A possible endpoint for a proxied connection.
// Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>>
#[derive(Debug)]
pub struct Backend {
pub id: String,
pub address: SocketAddr,
pub active_connections: AtomicUsize,
pub metrics: Arc<RwLock<ServerHealth>>,
}
impl Backend {
pub fn new(id: String, address: SocketAddr) -> Self {
pub fn new(id: String, address: SocketAddr, server_metrics: Arc<RwLock<ServerHealth>>) -> Self {
Self {
id: id.to_string(),
address,
active_connections: AtomicUsize::new(0),
metrics: server_metrics,
}
}
@@ -40,19 +71,18 @@ impl fmt::Display for Backend {
}
}
// A set of endpoints that can be load balanced around.
// Each Balancer owns one of these. Backend instances may be shared
// with other Balancer instances, hence Arc<Backend>.
#[derive(Clone, Debug)]
pub struct BackendPool {
pub backends: Arc<RwLock<Vec<Arc<Backend>>>>,
pub backends: Arc<Vec<Arc<Backend>>>,
}
impl BackendPool {
pub fn new() -> Self {
pub fn new(backends: Vec<Arc<Backend>>) -> Self {
BackendPool {
backends: Arc::new(RwLock::new(Vec::new())),
backends: Arc::new(backends),
}
}
pub fn add(&self, backend: Backend) {
self.backends.write().unwrap().push(Arc::new(backend));
}
}
}

View File

@@ -0,0 +1,135 @@
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use std::fs::Metadata;
use crate::backend::{Backend, BackendPool, ServerHealth};
use crate::balancer::Balancer;
use rand::prelude::*;
use rand::rngs::SmallRng;
#[derive(Debug)]
struct AdaptiveNode {
backend: Arc<Backend>,
weight: f64,
}
#[derive(Debug)]
pub struct AdaptiveWeightBalancer {
pool: Vec<AdaptiveNode>,
coefficients: [f64; 4],
alpha: f64,
rng: SmallRng,
}
impl AdaptiveWeightBalancer {
pub fn new(pool: BackendPool, coefficients: [f64; 4], alpha: f64) -> Self {
let nodes = pool.backends
.iter()
.map(|b| AdaptiveNode {
backend: b.clone(),
weight: 0f64,
})
.collect();
AdaptiveWeightBalancer {
pool: nodes,
coefficients,
alpha,
rng: SmallRng::from_rng(&mut rand::rng())
}
}
pub fn metrics_to_weight(&self, metrics: &ServerHealth) -> f64 {
self.coefficients[0] * metrics.cpu +
self.coefficients[1] * metrics.mem +
self.coefficients[2] * metrics.net +
self.coefficients[3] * metrics.io
}
}
impl Balancer for AdaptiveWeightBalancer {
fn choose_backend(&mut self) -> Option<Arc<Backend>> {
if self.pool.is_empty() {
return None;
}
// Compute remaining capacity R_i = 100 - composite_load
let mut r_sum = 0.0;
let mut w_sum = 0.0;
let mut l_sum = 0;
for node in &self.pool {
if let Ok(health) = node.backend.metrics.read() {
r_sum += self.metrics_to_weight(&health);
}
w_sum += node.weight;
l_sum += node.backend.active_connections
.load(std::sync::atomic::Ordering::Relaxed);
}
let safe_w_sum = w_sum.max(1e-12);
let threshold = self.alpha * (r_sum / safe_w_sum);
for idx in 0..self.pool.len() {
let node = &self.pool[idx];
if node.weight <= 0.001 { continue; }
let risk = match node.backend.metrics.read() {
Ok(h) => self.metrics_to_weight(&h),
Err(_) => f64::MAX,
};
let ratio = risk / node.weight;
if ratio <= threshold {
return Some(node.backend.clone());
}
}
// If any server satisfies Ri/Wi <= threshold, it means the server
// is relatively overloaded, and we must adjust its weight using
// formula (6).
let mut total_lwi = 0.0;
let l_sum_f64 = l_sum as f64;
for node in &self.pool {
let load = node.backend.active_connections
.load(std::sync::atomic::Ordering::Relaxed) as f64;
let weight = node.weight.max(1e-12);
let lwi = load * (safe_w_sum / weight) * l_sum_f64;
total_lwi += lwi;
}
let avg_lwi = (total_lwi / self.pool.len() as f64).max(1e-12);
// Compute Li = Wi / Ri and choose server minimizing Li.
let mut best_backend: Option<Arc<Backend>> = None;
let mut min_load = usize::MAX;
for node in &mut self.pool {
let load = node.backend.active_connections
.load(std::sync::atomic::Ordering::Relaxed);
let load_f64 = load as f64;
let weight = node.weight.max(1e-12);
let lwi = load_f64 * (safe_w_sum / weight) * l_sum_f64;
let adj = 1.0 - (lwi / avg_lwi);
node.weight += adj;
node.weight = node.weight.clamp(0.1, 100.0);
if load < min_load {
min_load = load;
best_backend = Some(node.backend.clone());
}
}
match best_backend {
Some(backend) => Some(backend),
None => {
let i = (self.rng.next_u32() as usize) % self.pool.len();
Some(self.pool[i].backend.clone())
}
}
}
}

View File

@@ -0,0 +1,4 @@
use super::*;
pub fn test() {
println!("Hello from RR");
}

View File

@@ -0,0 +1 @@
use super::*;

View File

@@ -1,4 +1,7 @@
pub mod round_robin;
pub mod adaptive_weight;
pub mod least_connections;
pub mod ip_hashing;
use std::fmt::Debug;
use std::sync::Arc;
@@ -6,4 +9,4 @@ use crate::backend::Backend;
pub trait Balancer: Debug + Send + Sync + 'static {
fn choose_backend(&mut self) -> Option<Arc<Backend>>;
}
}

View File

@@ -23,11 +23,11 @@ impl RoundRobinBalancer {
impl Balancer for RoundRobinBalancer {
fn choose_backend(&mut self) -> Option<Arc<Backend>> {
let backends = self.pool.backends.read().unwrap();
let backends = self.pool.backends.clone();
if backends.is_empty() { return None; }
let backend = backends[self.index % backends.len()].clone();
self.index = self.index.wrapping_add(1);
Some(backend)
}
}
}

View File

@@ -3,4 +3,4 @@
// define sets of backends
// allowed set operations for now is just union
// rules are ip + mask and ports, maps to some of the sets
// defined earlier, along with a routing strategy
// defined earlier, along with a routing strategy

View File

@@ -7,9 +7,9 @@ mod proxy;
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::backend::{Backend, BackendPool};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicU64, Ordering};
use crate::backend::{Backend, BackendPool, ServerHealth};
use crate::balancer::Balancer;
use crate::balancer::round_robin::RoundRobinBalancer;
use crate::proxy::tcp::proxy_tcp_connection;
@@ -18,26 +18,29 @@ static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = BackendPool::new();
pool.add(Backend::new(
let mut pool: Vec<Arc<Backend>> = Vec::new();
let server_metric = Arc::new(RwLock::new(ServerHealth::default()));
pool.push(Arc::new(Backend::new(
"backend 1".into(),
"127.0.0.1:8081".parse().unwrap(),
));
server_metric.clone()
)));
pool.add(Backend::new(
pool.push(Arc::new(Backend::new(
"backend 2".into(),
"127.0.0.1:8082".parse().unwrap(),
));
server_metric.clone()
)));
let mut balancer = RoundRobinBalancer::new(pool.clone());
let mut balancer = RoundRobinBalancer::new(BackendPool::new(pool));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (socket, _) = listener.accept().await?;
let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
if let Some(backend) = balancer.choose_backend() {
tokio::spawn(async move {

View File

@@ -40,4 +40,4 @@ impl Drop for ConnectionContext {
duration.as_secs_f64()
);
}
}
}

View File

@@ -23,4 +23,4 @@ pub async fn proxy_tcp_connection(connection_id: u64, mut client_stream: TcpStre
ctx.bytes_transferred = tx + rx;
Ok(())
}
}