From 90d326ba3387df44ca38b6db269c5568342588ff Mon Sep 17 00:00:00 2001 From: nnhphong Date: Wed, 10 Dec 2025 02:28:09 -0500 Subject: [PATCH] ip hashing + test --- docker-compose.yml | 89 +++++++++++++++++ src/balancer/ip_hashing.rs | 195 ++++++++++++++++++++++++++++++++++++- src/balancer/mod.rs | 12 +++ src/main.rs | 14 +++ 4 files changed, 307 insertions(+), 3 deletions(-) create mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2a7ed42 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,89 @@ +services: + # two-arm load balancer + load-balancer: + image: neoslhp/enginewhy-lb + container_name: load-balancer + tty: true + deploy: + resources: + limits: + cpus: "4.0" + memory: 8G + cap_add: + - NET_ADMIN + - SYS_ADMIN + networks: + internal: + ipv4_address: 172.67.67.67 + external: + ipv4_address: 192.67.67.67 + + + server1-high-cpu: + image: neoslhp/enginewhy-server + container_name: server1 + tty: true + deploy: + resources: + limits: + cpus: "4.0" + memory: 8G + depends_on: + - load-balancer + cap_add: + - NET_ADMIN + networks: + external: + ipv4_address: 192.67.67.2 + + server2-low-cpu: + image: neoslhp/enginewhy-server + container_name: server2 + tty: true + deploy: + resources: + limits: + cpus: "2.0" + memory: 4G + depends_on: + - load-balancer + cap_add: + - NET_ADMIN + networks: + external: + ipv4_address: 192.67.67.3 + + client: + image: neoslhp/enginewhy-ubuntu22.04 + container_name: client + tty: true + deploy: + resources: + limits: + cpus: "4.0" + memory: 4G + depends_on: + - load-balancer + cap_add: + - NET_ADMIN + networks: + internal: + ipv4_address: 172.67.67.2 + +networks: + internal: + driver: bridge + ipam: + config: + - subnet: 172.67.67.0/24 + external: + driver: bridge + ipam: + config: + - subnet: 192.67.67.0/24 + +# Resources: +# https://networkgeekstuff.com/networking/basic-load-balancer-scenarios-explained/ +# https://hub.docker.com/r/linuxserver/wireshark +# https://www.wcse.org/WCSE_2018/W110.pdf +# Deepseek diff --git a/src/balancer/ip_hashing.rs b/src/balancer/ip_hashing.rs index 7edf986..91d63b6 100644 --- a/src/balancer/ip_hashing.rs +++ b/src/balancer/ip_hashing.rs @@ -1,4 +1,193 @@ -use super::*; -pub fn test() { - println!("Hello from RR"); +use crate::backend::{Backend, BackendPool, ServerHealth}; +use crate::balancer::{Balancer, CURRENT_CONNECTION_INFO, ConnectionInfo}; +use std::hash::{Hasher, DefaultHasher, Hash}; +use std::sync::{Arc, RwLock}; + +#[derive(Debug)] +pub struct SourceIPHash { + pool : BackendPool, } + +impl SourceIPHash { + pub fn new(pool: BackendPool) -> SourceIPHash { + Self { pool } + } +} + +impl Balancer for SourceIPHash { + fn choose_backend(&mut self) -> Option>{ + let client_ip = CURRENT_CONNECTION_INFO.with(|info| { + info.borrow().as_ref().map(|c| c.client_ip.clone()) + }); + + let client_ip = match client_ip { + Some(ip) => ip, + None => return None, // no client info available + }; + + let mut hasher = DefaultHasher::new(); + client_ip.hash(&mut hasher); + let hash = hasher.finish(); + let idx = (hash as usize) % self.pool.backends.len(); + + return Some(self.pool.backends[idx].clone()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_same_ip_always_selects_same_backend() { + let backends = vec![ + Arc::new(Backend::new( + "backend 1".into(), + "127.0.0.1:8081".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + Arc::new(Backend::new( + "backend 2".into(), + "127.0.0.1:8082".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + Arc::new(Backend::new( + "backend 3".into(), + "127.0.0.1:8083".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + ]; + + let mut balancer = SourceIPHash::new(BackendPool::new(backends)); + let client_ip = "192.168.1.100:54321".parse().unwrap(); + + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip }); + }); + + let first_choice = balancer.choose_backend(); + + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip }); + }); + + let second_choice = balancer.choose_backend(); + + assert!(first_choice.is_some()); + assert!(second_choice.is_some()); + let first = first_choice.unwrap(); + let second = second_choice.unwrap(); + assert_eq!(first.id, second.id); + + // Cleanup + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = None; + }); + } + + #[test] + fn test_different_ips_may_select_different_backends() { + let backends = vec![ + Arc::new(Backend::new( + "backend 1".into(), + "127.0.0.1:8081".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + Arc::new(Backend::new( + "backend 2".into(), + "127.0.0.1:8082".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + ]; + + let mut balancer = SourceIPHash::new(BackendPool::new(backends)); + + let ip1 = "192.168.1.100:54321".parse().unwrap(); + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip: ip1 }); + }); + let choice1 = balancer.choose_backend(); + + let ip2 = "192.168.1.101:54322".parse().unwrap(); + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip: ip2 }); + }); + let choice2 = balancer.choose_backend(); + + assert!(choice1.is_some()); + assert!(choice2.is_some()); + // Note: choice1 and choice2 might be equal by chance, but statistically should differ + + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = None; + }); + } + + #[test] + fn test_returns_none_when_no_connection_info() { + let backends = vec![Arc::new(Backend::new( + "backend 1".into(), + "127.0.0.1:8081".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + ))]; + + let mut balancer = SourceIPHash::new(BackendPool::new(backends)); + + // Don't set any connection info + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = None; + }); + + let choice = balancer.choose_backend(); + assert!(choice.is_none()); + } + + #[test] + fn test_hash_distribution_across_backends() { + let backends = vec![ + Arc::new(Backend::new( + "backend 1".into(), + "127.0.0.1:8081".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + Arc::new(Backend::new( + "backend 2".into(), + "127.0.0.1:8082".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + Arc::new(Backend::new( + "backend 3".into(), + "127.0.0.1:8083".parse().unwrap(), + Arc::new(RwLock::new(ServerHealth::default())), + )), + ]; + + let mut balancer = SourceIPHash::new(BackendPool::new(backends.clone())); + let mut distribution = [0, 0, 0]; + + // Test 30 different IPs to see if they distribute across backends + for i in 0..30 { + let client_ip = format!("192.168.1.{}:54321", 100 + i).parse().unwrap(); + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip }); + }); + + if let Some(backend) = balancer.choose_backend() { + for (idx, b) in backends.iter().enumerate() { + if backend.id == b.id && backend.address == b.address { + distribution[idx] += 1; + break; + } + } + } + } + + assert!(distribution[0] > 0); + assert!(distribution[1] > 0); + assert!(distribution[2] > 0); + + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = None; + }); + } +} \ No newline at end of file diff --git a/src/balancer/mod.rs b/src/balancer/mod.rs index e5a280e..d3317da 100644 --- a/src/balancer/mod.rs +++ b/src/balancer/mod.rs @@ -6,7 +6,19 @@ pub mod ip_hashing; use std::fmt::Debug; use std::sync::Arc; use crate::backend::Backend; +use std::cell::RefCell; +use std::net::{SocketAddr}; + +thread_local! { + pub static CURRENT_CONNECTION_INFO: RefCell> = RefCell::new(None); +} + +#[derive(Clone, Debug)] +pub struct ConnectionInfo { + pub client_ip : SocketAddr, +} pub trait Balancer: Debug + Send + Sync + 'static { fn choose_backend(&mut self) -> Option>; } + diff --git a/src/main.rs b/src/main.rs index f23a8ef..83ba318 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,10 @@ mod proxy; use std::fs::File; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::net::TcpListener; +use crate::backend::{Backend, BackendPool, ServerHealth}; +use crate::balancer::{Balancer, CURRENT_CONNECTION_INFO, ConnectionInfo}; +use crate::balancer::round_robin::RoundRobinBalancer; +use crate::balancer::ip_hashing::SourceIPHash; use crate::proxy::tcp::proxy_tcp_connection; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); @@ -47,6 +51,11 @@ async fn main() -> Result<(), Box> { let remote_ip = remote_addr.ip(); let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed); + let client_ip = socket.local_addr()?; + + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = Some(ConnectionInfo { client_ip : client_ip }); + }); let mut chosen_backend = None; @@ -67,6 +76,11 @@ async fn main() -> Result<(), Box> { } else { println!("error: no matching rule for {} on port {}", remote_ip, port); } + + // clear the slot after use to avoid stale data + CURRENT_CONNECTION_INFO.with(|info| { + *info.borrow_mut() = None; + }); } })); }