From 07cb45fa73710b59715917a70ca5fca5972b5d44 Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 3 Dec 2025 21:35:08 -0500 Subject: [PATCH] restructuring stuff --- README.md | 25 ++++++++++++++- src/backend/mod.rs | 71 ++++++++++++++++++++++++++++++++++++++++++ src/balancer/mod.rs | 0 src/config.rs | 0 src/main.rs | 6 ++++ src/proxy/mod.rs | 27 ++++++++++++++++ src/proxy/tcp_proxy.rs | 2 ++ 7 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 src/backend/mod.rs create mode 100644 src/balancer/mod.rs create mode 100644 src/config.rs create mode 100644 src/proxy/mod.rs create mode 100644 src/proxy/tcp_proxy.rs diff --git a/README.md b/README.md index ab29caf..4f4dec2 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,22 @@ Production't graden't load balancer. ## Todo - [ ] architecture astronauting + - balancer module + - just the algorithms i guess +- + - backend module + - manages the backend pool + - deals with health / load check + - BackendPool for all the backends stored together + - Backend for individual backends + - has some methods used by balancer module to pick a suitable backend + - proxy module + - all the different supported protocols to handle + - will create a session / stream context structure (ConnectionContext) + - not globally tracked (this might change for UDP!) + - mainly some metadata + - config module + - set up all the stuff or something - [ ] stream / session handling (i think wrapper around tokio TcpStream) - [ ] basic backend pooling - [ ] layer 4 load balancing @@ -102,4 +118,11 @@ process to load balance: - ask the load balancing algorithm which server in the pool to route to - connect to the server - proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually) -- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions) \ No newline at end of file +- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions) + + +### UDP +UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional +but async write and read don't work on just regular datagrams, so probably not possible. + +Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections. \ No newline at end of file diff --git a/src/backend/mod.rs b/src/backend/mod.rs new file mode 100644 index 0000000..4bfd49c --- /dev/null +++ b/src/backend/mod.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::RwLock; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +pub struct BackendPool { + pub backends: Arc>>>, +} + +#[derive(Debug)] +pub struct Backend { + pub id: String, + pub address: SocketAddr, + pub is_healthy: AtomicBool, // no clue how this should work, for now + pub current_load: AtomicUsize, // no clue how this should work, for now +} + +impl BackendPool { + pub fn new(initial_backends: Vec>) -> Self { + let mut map = HashMap::new(); + for backend in initial_backends { + map.insert(backend.id.clone(), backend); + } + + Self { + backends: Arc::new(RwLock::new(map)), + } + } + + pub fn add_backend(&self, backend: Arc) { + let mut backends_guard = self.backends + .write() + .expect("BackendPool lock poisoned"); + // let backends_guard = self.backends.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + backends_guard.insert(backend.id.clone(), backend); + } + + pub fn get_backend(&self, id: &str) -> Option> { + let backends_guard = self.backends + .read() + .expect("BackendPool lock poisoned"); + // let backends_guard = self.backends.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + backends_guard.get(id).cloned() + } + + pub fn bruh_amogus_sus(&self) { + for k in self.backends.read().unwrap().keys() { + self.backends.write().unwrap().get(k).unwrap().increment_current_load(); + } + } +} + +impl Backend { + pub fn new(id: String, address: SocketAddr) -> Self { + Self { + id: id, + address: address, + is_healthy: AtomicBool::new(false), + current_load: AtomicUsize::new(0), + } + } + + pub fn increment_current_load(&self) { + self.current_load.fetch_add(1, Ordering::SeqCst); + } + + pub fn decrement_current_load(&self) { + self.current_load.fetch_sub(1, Ordering::SeqCst); + } +} \ No newline at end of file diff --git a/src/balancer/mod.rs b/src/balancer/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/main.rs b/src/main.rs index 00e7e65..ca2015f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,11 @@ +mod balancer; +mod config; +mod backend; +mod proxy; + use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs new file mode 100644 index 0000000..9da0885 --- /dev/null +++ b/src/proxy/mod.rs @@ -0,0 +1,27 @@ +mod tcp_proxy; + +use std::net::SocketAddr; +use std::time::Instant; + +// owned and accessed by only one thread. +pub struct ConnectionContext { + pub connection_id: u64, + pub client_addr: SocketAddr, + pub start_time: Instant, + pub backend_addr: Option, + pub bytes_transferred: usize, + // pub protocol: String, + // pub sticky_id: Option, +} + +impl ConnectionContext { + pub fn new(connection_id: u64, client_addr: SocketAddr) -> Self { + Self { + connection_id: connection_id, + client_addr: client_addr, + start_time: Instant::now(), + backend_addr: Default::default(), + bytes_transferred: 0, + } + } +} \ No newline at end of file diff --git a/src/proxy/tcp_proxy.rs b/src/proxy/tcp_proxy.rs new file mode 100644 index 0000000..c5d584a --- /dev/null +++ b/src/proxy/tcp_proxy.rs @@ -0,0 +1,2 @@ +use super::*; +