2 Commits

Author SHA1 Message Date
psun256
a3f50c1f0a should be good to extend functionality now 2025-12-08 14:31:59 -05:00
psun256
07cb45fa73 restructuring stuff 2025-12-03 21:35:08 -05:00
8 changed files with 239 additions and 20 deletions

View File

@@ -5,6 +5,22 @@ Production't graden't load balancer.
## Todo
- [ ] architecture astronauting
- balancer module
- just the algorithms i guess
-
- backend module
- manages the backend pool
- deals with health / load check
- BackendPool for all the backends stored together
- Backend for individual backends
- has some methods used by balancer module to pick a suitable backend
- proxy module
- all the different supported protocols to handle
- will create a session / stream context structure (ConnectionContext)
- not globally tracked (this might change for UDP!)
- mainly some metadata
- config module
- set up all the stuff or something
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
- [ ] basic backend pooling
- [ ] layer 4 load balancing
@@ -103,3 +119,10 @@ process to load balance:
- connect to the server
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
### UDP
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional
but async write and read don't work on just regular datagrams, so probably not possible.
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections.

58
src/backend/mod.rs Normal file
View File

@@ -0,0 +1,58 @@
use core::fmt;
use std::net::SocketAddr;
use std::sync::RwLock;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
pub struct Backend {
pub id: String,
pub address: SocketAddr,
pub active_connections: AtomicUsize,
}
impl Backend {
pub fn new(id: String, address: SocketAddr) -> Self {
Self {
id: id.to_string(),
address,
active_connections: AtomicUsize::new(0),
}
}
// Ordering::Relaxed means the ops could be in any order, but since this
// is just a metric, and we assume the underlying system is sane
// enough not to behave poorly, so SeqCst is probably overkill.
pub fn inc_connections(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
println!("{} has {} connections open", self.id, self.active_connections.load(Ordering::Relaxed));
}
pub fn dec_connections(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
println!("{} has {} connections open", self.id, self.active_connections.load(Ordering::Relaxed));
}
}
impl fmt::Display for Backend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({})", self.address, self.id)
}
}
#[derive(Clone, Debug)]
pub struct BackendPool {
pub backends: Arc<RwLock<Vec<Arc<Backend>>>>,
}
impl BackendPool {
pub fn new() -> Self {
BackendPool {
backends: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn add(&self, backend: Backend) {
self.backends.write().unwrap().push(Arc::new(backend));
}
}

9
src/balancer/mod.rs Normal file
View File

@@ -0,0 +1,9 @@
pub mod round_robin;
use std::fmt::Debug;
use std::sync::Arc;
use crate::backend::Backend;
pub trait Balancer: Debug + Send + Sync + 'static {
fn choose_backend(&mut self) -> Option<Arc<Backend>>;
}

View File

@@ -0,0 +1,33 @@
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use crate::backend::{Backend, BackendPool};
use crate::balancer::Balancer;
// only the main thread for receiving connections should be
// doing the load balancing. alternatively, each thread
// that handles load balancing should get their own instance.
#[derive(Debug)]
pub struct RoundRobinBalancer {
pool: BackendPool,
index: usize,
}
impl RoundRobinBalancer {
pub fn new(pool: BackendPool) -> RoundRobinBalancer {
Self {
pool,
index: 0,
}
}
}
impl Balancer for RoundRobinBalancer {
fn choose_backend(&mut self) -> Option<Arc<Backend>> {
let backends = self.pool.backends.read().unwrap();
if backends.is_empty() { return None; }
let backend = backends[self.index % backends.len()].clone();
self.index = self.index.wrapping_add(1);
Some(backend)
}
}

6
src/config.rs Normal file
View File

@@ -0,0 +1,6 @@
// TODO: "routing" rules
// backends defined as ip + port
// define sets of backends
// allowed set operations for now is just union
// rules are ip + mask and ports, maps to some of the sets
// defined earlier, along with a routing strategy

View File

@@ -1,31 +1,52 @@
extern crate core;
mod balancer;
mod config;
mod backend;
mod proxy;
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::backend::{Backend, BackendPool};
use crate::balancer::Balancer;
use crate::balancer::round_robin::RoundRobinBalancer;
use crate::proxy::tcp::proxy_tcp_connection;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("0.0.0.0:8080").await?;
let pool = BackendPool::new();
pool.add(Backend::new(
"backend 1".into(),
"127.0.0.1:8081".parse().unwrap(),
));
pool.add(Backend::new(
"backend 2".into(),
"127.0.0.1:8082".parse().unwrap(),
));
let mut balancer = RoundRobinBalancer::new(pool.clone());
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
if let Some(backend) = balancer.choose_backend() {
tokio::spawn(async move {
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
}
}
});
});
} else {
eprintln!("error: no backendsd for conn_id={}", conn_id);
}
}
}

43
src/proxy/mod.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use crate::backend::Backend;
pub mod tcp;
pub struct ConnectionContext {
pub id: u64,
pub client_addr: SocketAddr,
pub start_time: Instant,
pub backend: Arc<Backend>,
pub bytes_transferred: u64,
}
impl ConnectionContext {
pub fn new(id: u64, client_addr: SocketAddr, backend: Arc<Backend>) -> Self {
backend.inc_connections();
Self {
id,
client_addr,
start_time: Instant::now(),
backend,
bytes_transferred: 0,
}
}
}
impl Drop for ConnectionContext {
fn drop(&mut self) {
self.backend.dec_connections();
let duration = self.start_time.elapsed();
println!("info: conn_id={} closed. client={} backend={} bytes={} duration={:.2?}",
self.id,
self.client_addr,
self.backend.address,
self.bytes_transferred,
duration.as_secs_f64()
);
}
}

26
src/proxy/tcp.rs Normal file
View File

@@ -0,0 +1,26 @@
use std::sync::Arc;
use tokio::io;
use tokio::net::TcpStream;
use anywho::Error;
use crate::backend::Backend;
use crate::proxy::ConnectionContext;
pub async fn proxy_tcp_connection(connection_id: u64, mut client_stream: TcpStream, backend: Arc<Backend>) -> Result<(), Error> {
let client_addr = client_stream.peer_addr()?;
let mut ctx = ConnectionContext::new(connection_id, client_addr, backend.clone());
#[cfg(debug_assertions)]
println!("info: conn_id={} connecting to {}", connection_id, ctx.backend.id);
let mut backend_stream = TcpStream::connect(&backend.address).await?;
let (tx, rx) = io::copy_bidirectional(
&mut client_stream,
&mut backend_stream,
).await?;
ctx.bytes_transferred = tx + rx;
Ok(())
}