ip hashing + test
This commit is contained in:
89
docker-compose.yml
Normal file
89
docker-compose.yml
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
services:
|
||||||
|
# two-arm load balancer
|
||||||
|
load-balancer:
|
||||||
|
image: neoslhp/enginewhy-lb
|
||||||
|
container_name: load-balancer
|
||||||
|
tty: true
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: "4.0"
|
||||||
|
memory: 8G
|
||||||
|
cap_add:
|
||||||
|
- NET_ADMIN
|
||||||
|
- SYS_ADMIN
|
||||||
|
networks:
|
||||||
|
internal:
|
||||||
|
ipv4_address: 172.67.67.67
|
||||||
|
external:
|
||||||
|
ipv4_address: 192.67.67.67
|
||||||
|
|
||||||
|
|
||||||
|
server1-high-cpu:
|
||||||
|
image: neoslhp/enginewhy-server
|
||||||
|
container_name: server1
|
||||||
|
tty: true
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: "4.0"
|
||||||
|
memory: 8G
|
||||||
|
depends_on:
|
||||||
|
- load-balancer
|
||||||
|
cap_add:
|
||||||
|
- NET_ADMIN
|
||||||
|
networks:
|
||||||
|
external:
|
||||||
|
ipv4_address: 192.67.67.2
|
||||||
|
|
||||||
|
server2-low-cpu:
|
||||||
|
image: neoslhp/enginewhy-server
|
||||||
|
container_name: server2
|
||||||
|
tty: true
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: "2.0"
|
||||||
|
memory: 4G
|
||||||
|
depends_on:
|
||||||
|
- load-balancer
|
||||||
|
cap_add:
|
||||||
|
- NET_ADMIN
|
||||||
|
networks:
|
||||||
|
external:
|
||||||
|
ipv4_address: 192.67.67.3
|
||||||
|
|
||||||
|
client:
|
||||||
|
image: neoslhp/enginewhy-ubuntu22.04
|
||||||
|
container_name: client
|
||||||
|
tty: true
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: "4.0"
|
||||||
|
memory: 4G
|
||||||
|
depends_on:
|
||||||
|
- load-balancer
|
||||||
|
cap_add:
|
||||||
|
- NET_ADMIN
|
||||||
|
networks:
|
||||||
|
internal:
|
||||||
|
ipv4_address: 172.67.67.2
|
||||||
|
|
||||||
|
networks:
|
||||||
|
internal:
|
||||||
|
driver: bridge
|
||||||
|
ipam:
|
||||||
|
config:
|
||||||
|
- subnet: 172.67.67.0/24
|
||||||
|
external:
|
||||||
|
driver: bridge
|
||||||
|
ipam:
|
||||||
|
config:
|
||||||
|
- subnet: 192.67.67.0/24
|
||||||
|
|
||||||
|
# Resources:
|
||||||
|
# https://networkgeekstuff.com/networking/basic-load-balancer-scenarios-explained/
|
||||||
|
# https://hub.docker.com/r/linuxserver/wireshark
|
||||||
|
# https://www.wcse.org/WCSE_2018/W110.pdf
|
||||||
|
# Deepseek
|
||||||
@@ -1,4 +1,193 @@
|
|||||||
use super::*;
|
use crate::backend::{Backend, BackendPool, ServerHealth};
|
||||||
pub fn test() {
|
use crate::balancer::{Balancer, CURRENT_CONNECTION_INFO, ConnectionInfo};
|
||||||
println!("Hello from RR");
|
use std::hash::{Hasher, DefaultHasher, Hash};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SourceIPHash {
|
||||||
|
pool : BackendPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SourceIPHash {
|
||||||
|
pub fn new(pool: BackendPool) -> SourceIPHash {
|
||||||
|
Self { pool }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Balancer for SourceIPHash {
|
||||||
|
fn choose_backend(&mut self) -> Option<Arc<Backend>>{
|
||||||
|
let client_ip = CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
info.borrow().as_ref().map(|c| c.client_ip.clone())
|
||||||
|
});
|
||||||
|
|
||||||
|
let client_ip = match client_ip {
|
||||||
|
Some(ip) => ip,
|
||||||
|
None => return None, // no client info available
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
client_ip.hash(&mut hasher);
|
||||||
|
let hash = hasher.finish();
|
||||||
|
let idx = (hash as usize) % self.pool.backends.len();
|
||||||
|
|
||||||
|
return Some(self.pool.backends[idx].clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_same_ip_always_selects_same_backend() {
|
||||||
|
let backends = vec![
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 1".into(),
|
||||||
|
"127.0.0.1:8081".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 2".into(),
|
||||||
|
"127.0.0.1:8082".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 3".into(),
|
||||||
|
"127.0.0.1:8083".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut balancer = SourceIPHash::new(BackendPool::new(backends));
|
||||||
|
let client_ip = "192.168.1.100:54321".parse().unwrap();
|
||||||
|
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip });
|
||||||
|
});
|
||||||
|
|
||||||
|
let first_choice = balancer.choose_backend();
|
||||||
|
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip });
|
||||||
|
});
|
||||||
|
|
||||||
|
let second_choice = balancer.choose_backend();
|
||||||
|
|
||||||
|
assert!(first_choice.is_some());
|
||||||
|
assert!(second_choice.is_some());
|
||||||
|
let first = first_choice.unwrap();
|
||||||
|
let second = second_choice.unwrap();
|
||||||
|
assert_eq!(first.id, second.id);
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = None;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_different_ips_may_select_different_backends() {
|
||||||
|
let backends = vec![
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 1".into(),
|
||||||
|
"127.0.0.1:8081".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 2".into(),
|
||||||
|
"127.0.0.1:8082".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut balancer = SourceIPHash::new(BackendPool::new(backends));
|
||||||
|
|
||||||
|
let ip1 = "192.168.1.100:54321".parse().unwrap();
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip: ip1 });
|
||||||
|
});
|
||||||
|
let choice1 = balancer.choose_backend();
|
||||||
|
|
||||||
|
let ip2 = "192.168.1.101:54322".parse().unwrap();
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip: ip2 });
|
||||||
|
});
|
||||||
|
let choice2 = balancer.choose_backend();
|
||||||
|
|
||||||
|
assert!(choice1.is_some());
|
||||||
|
assert!(choice2.is_some());
|
||||||
|
// Note: choice1 and choice2 might be equal by chance, but statistically should differ
|
||||||
|
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = None;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_returns_none_when_no_connection_info() {
|
||||||
|
let backends = vec![Arc::new(Backend::new(
|
||||||
|
"backend 1".into(),
|
||||||
|
"127.0.0.1:8081".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
))];
|
||||||
|
|
||||||
|
let mut balancer = SourceIPHash::new(BackendPool::new(backends));
|
||||||
|
|
||||||
|
// Don't set any connection info
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = None;
|
||||||
|
});
|
||||||
|
|
||||||
|
let choice = balancer.choose_backend();
|
||||||
|
assert!(choice.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_hash_distribution_across_backends() {
|
||||||
|
let backends = vec![
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 1".into(),
|
||||||
|
"127.0.0.1:8081".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 2".into(),
|
||||||
|
"127.0.0.1:8082".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
Arc::new(Backend::new(
|
||||||
|
"backend 3".into(),
|
||||||
|
"127.0.0.1:8083".parse().unwrap(),
|
||||||
|
Arc::new(RwLock::new(ServerHealth::default())),
|
||||||
|
)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut balancer = SourceIPHash::new(BackendPool::new(backends.clone()));
|
||||||
|
let mut distribution = [0, 0, 0];
|
||||||
|
|
||||||
|
// Test 30 different IPs to see if they distribute across backends
|
||||||
|
for i in 0..30 {
|
||||||
|
let client_ip = format!("192.168.1.{}:54321", 100 + i).parse().unwrap();
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip });
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some(backend) = balancer.choose_backend() {
|
||||||
|
for (idx, b) in backends.iter().enumerate() {
|
||||||
|
if backend.id == b.id && backend.address == b.address {
|
||||||
|
distribution[idx] += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(distribution[0] > 0);
|
||||||
|
assert!(distribution[1] > 0);
|
||||||
|
assert!(distribution[2] > 0);
|
||||||
|
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = None;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,7 +6,19 @@ pub mod ip_hashing;
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::backend::Backend;
|
use crate::backend::Backend;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::net::{SocketAddr};
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
pub static CURRENT_CONNECTION_INFO: RefCell<Option<ConnectionInfo>> = RefCell::new(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct ConnectionInfo {
|
||||||
|
pub client_ip : SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
pub trait Balancer: Debug + Send + Sync + 'static {
|
pub trait Balancer: Debug + Send + Sync + 'static {
|
||||||
fn choose_backend(&mut self) -> Option<Arc<Backend>>;
|
fn choose_backend(&mut self) -> Option<Arc<Backend>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
14
src/main.rs
14
src/main.rs
@@ -6,6 +6,10 @@ mod proxy;
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
use crate::backend::{Backend, BackendPool, ServerHealth};
|
||||||
|
use crate::balancer::{Balancer, CURRENT_CONNECTION_INFO, ConnectionInfo};
|
||||||
|
use crate::balancer::round_robin::RoundRobinBalancer;
|
||||||
|
use crate::balancer::ip_hashing::SourceIPHash;
|
||||||
use crate::proxy::tcp::proxy_tcp_connection;
|
use crate::proxy::tcp::proxy_tcp_connection;
|
||||||
|
|
||||||
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
@@ -47,6 +51,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let remote_ip = remote_addr.ip();
|
let remote_ip = remote_addr.ip();
|
||||||
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
|
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let client_ip = socket.local_addr()?;
|
||||||
|
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = Some(ConnectionInfo { client_ip : client_ip });
|
||||||
|
});
|
||||||
|
|
||||||
let mut chosen_backend = None;
|
let mut chosen_backend = None;
|
||||||
|
|
||||||
@@ -67,6 +76,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
} else {
|
} else {
|
||||||
println!("error: no matching rule for {} on port {}", remote_ip, port);
|
println!("error: no matching rule for {} on port {}", remote_ip, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear the slot after use to avoid stale data
|
||||||
|
CURRENT_CONNECTION_INFO.with(|info| {
|
||||||
|
*info.borrow_mut() = None;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user