Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3f50c1f0a | ||
|
|
07cb45fa73 |
22
.github/workflows/rust.yml
vendored
22
.github/workflows/rust.yml
vendored
@@ -1,22 +0,0 @@
|
|||||||
name: Rust
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ "main" ]
|
|
||||||
pull_request:
|
|
||||||
branches: [ "main" ]
|
|
||||||
|
|
||||||
env:
|
|
||||||
CARGO_TERM_COLOR: always
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- name: Build
|
|
||||||
run: cargo build --verbose
|
|
||||||
- name: Run tests
|
|
||||||
run: cargo test --verbose
|
|
||||||
26
README.md
26
README.md
@@ -5,10 +5,25 @@ Production't graden't load balancer.
|
|||||||
|
|
||||||
## Todo
|
## Todo
|
||||||
- [ ] architecture astronauting
|
- [ ] architecture astronauting
|
||||||
|
- balancer module
|
||||||
|
- just the algorithms i guess
|
||||||
|
-
|
||||||
|
- backend module
|
||||||
|
- manages the backend pool
|
||||||
|
- deals with health / load check
|
||||||
|
- BackendPool for all the backends stored together
|
||||||
|
- Backend for individual backends
|
||||||
|
- has some methods used by balancer module to pick a suitable backend
|
||||||
|
- proxy module
|
||||||
|
- all the different supported protocols to handle
|
||||||
|
- will create a session / stream context structure (ConnectionContext)
|
||||||
|
- not globally tracked (this might change for UDP!)
|
||||||
|
- mainly some metadata
|
||||||
|
- config module
|
||||||
|
- set up all the stuff or something
|
||||||
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
|
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
|
||||||
- [ ] basic backend pooling
|
- [ ] basic backend pooling
|
||||||
- [ ] layer 4 load balancing
|
- [ ] layer 4 load balancing
|
||||||
- [ ] load balancing algorithm from the paper (https://www.wcse.org/WCSE_2018/W110.pdf)
|
|
||||||
|
|
||||||
## notes
|
## notes
|
||||||
tcp, for nginx (and haproxy, its similar):
|
tcp, for nginx (and haproxy, its similar):
|
||||||
@@ -21,7 +36,7 @@ struct ngx_connection_s {
|
|||||||
|
|
||||||
ngx_socket_t fd;
|
ngx_socket_t fd;
|
||||||
|
|
||||||
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for dfferent platforms / protocol
|
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol
|
||||||
ngx_send_pt send; // ditto
|
ngx_send_pt send; // ditto
|
||||||
ngx_recv_chain_pt recv_chain;
|
ngx_recv_chain_pt recv_chain;
|
||||||
ngx_send_chain_pt send_chain;
|
ngx_send_chain_pt send_chain;
|
||||||
@@ -104,3 +119,10 @@ process to load balance:
|
|||||||
- connect to the server
|
- connect to the server
|
||||||
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
|
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
|
||||||
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
|
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
|
||||||
|
|
||||||
|
|
||||||
|
### UDP
|
||||||
|
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional
|
||||||
|
but async write and read don't work on just regular datagrams, so probably not possible.
|
||||||
|
|
||||||
|
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections.
|
||||||
58
src/backend/mod.rs
Normal file
58
src/backend/mod.rs
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
use core::fmt;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Backend {
|
||||||
|
pub id: String,
|
||||||
|
pub address: SocketAddr,
|
||||||
|
pub active_connections: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Backend {
|
||||||
|
pub fn new(id: String, address: SocketAddr) -> Self {
|
||||||
|
Self {
|
||||||
|
id: id.to_string(),
|
||||||
|
address,
|
||||||
|
active_connections: AtomicUsize::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ordering::Relaxed means the ops could be in any order, but since this
|
||||||
|
// is just a metric, and we assume the underlying system is sane
|
||||||
|
// enough not to behave poorly, so SeqCst is probably overkill.
|
||||||
|
pub fn inc_connections(&self) {
|
||||||
|
self.active_connections.fetch_add(1, Ordering::Relaxed);
|
||||||
|
println!("{} has {} connections open", self.id, self.active_connections.load(Ordering::Relaxed));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dec_connections(&self) {
|
||||||
|
self.active_connections.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
println!("{} has {} connections open", self.id, self.active_connections.load(Ordering::Relaxed));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Backend {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{} ({})", self.address, self.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct BackendPool {
|
||||||
|
pub backends: Arc<RwLock<Vec<Arc<Backend>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BackendPool {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
BackendPool {
|
||||||
|
backends: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add(&self, backend: Backend) {
|
||||||
|
self.backends.write().unwrap().push(Arc::new(backend));
|
||||||
|
}
|
||||||
|
}
|
||||||
9
src/balancer/mod.rs
Normal file
9
src/balancer/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
pub mod round_robin;
|
||||||
|
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use crate::backend::Backend;
|
||||||
|
|
||||||
|
pub trait Balancer: Debug + Send + Sync + 'static {
|
||||||
|
fn choose_backend(&mut self) -> Option<Arc<Backend>>;
|
||||||
|
}
|
||||||
33
src/balancer/round_robin.rs
Normal file
33
src/balancer/round_robin.rs
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use crate::backend::{Backend, BackendPool};
|
||||||
|
use crate::balancer::Balancer;
|
||||||
|
|
||||||
|
// only the main thread for receiving connections should be
|
||||||
|
// doing the load balancing. alternatively, each thread
|
||||||
|
// that handles load balancing should get their own instance.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RoundRobinBalancer {
|
||||||
|
pool: BackendPool,
|
||||||
|
index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RoundRobinBalancer {
|
||||||
|
pub fn new(pool: BackendPool) -> RoundRobinBalancer {
|
||||||
|
Self {
|
||||||
|
pool,
|
||||||
|
index: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Balancer for RoundRobinBalancer {
|
||||||
|
fn choose_backend(&mut self) -> Option<Arc<Backend>> {
|
||||||
|
let backends = self.pool.backends.read().unwrap();
|
||||||
|
if backends.is_empty() { return None; }
|
||||||
|
|
||||||
|
let backend = backends[self.index % backends.len()].clone();
|
||||||
|
self.index = self.index.wrapping_add(1);
|
||||||
|
Some(backend)
|
||||||
|
}
|
||||||
|
}
|
||||||
6
src/config.rs
Normal file
6
src/config.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
// TODO: "routing" rules
|
||||||
|
// backends defined as ip + port
|
||||||
|
// define sets of backends
|
||||||
|
// allowed set operations for now is just union
|
||||||
|
// rules are ip + mask and ports, maps to some of the sets
|
||||||
|
// defined earlier, along with a routing strategy
|
||||||
77
src/main.rs
77
src/main.rs
@@ -1,55 +1,52 @@
|
|||||||
macro_rules! info {
|
extern crate core;
|
||||||
($($arg:tt)*) => {{
|
|
||||||
print!("info: ");
|
|
||||||
println!($($arg)*);
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! error {
|
mod balancer;
|
||||||
($($arg:tt)*) => {
|
mod config;
|
||||||
eprint!("error: ");
|
mod backend;
|
||||||
eprintln!($($arg)*);
|
mod proxy;
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
mod netutils;
|
|
||||||
|
|
||||||
use anywho::Error;
|
|
||||||
use netutils::{Backend, tunnel};
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::sync::Mutex;
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
|
use crate::backend::{Backend, BackendPool};
|
||||||
|
use crate::balancer::Balancer;
|
||||||
|
use crate::balancer::round_robin::RoundRobinBalancer;
|
||||||
|
use crate::proxy::tcp::proxy_tcp_connection;
|
||||||
|
|
||||||
|
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let backends = Arc::new(vec![
|
let pool = BackendPool::new();
|
||||||
Backend::new("127.0.0.1:8081".to_string()),
|
|
||||||
Backend::new("127.0.0.1:8082".to_string()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let current_index = Arc::new(Mutex::new(0));
|
pool.add(Backend::new(
|
||||||
|
"backend 1".into(),
|
||||||
|
"127.0.0.1:8081".parse().unwrap(),
|
||||||
|
));
|
||||||
|
|
||||||
info!("enginewhy starting on 0.0.0.0:8080");
|
pool.add(Backend::new(
|
||||||
info!("backends: {:?}", backends);
|
"backend 2".into(),
|
||||||
|
"127.0.0.1:8082".parse().unwrap(),
|
||||||
|
));
|
||||||
|
|
||||||
let listener = TcpListener::bind("0.0.0.0:8080").await?;
|
let mut balancer = RoundRobinBalancer::new(pool.clone());
|
||||||
|
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:8080").await?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (client, addr) = listener.accept().await?;
|
let (socket, _) = listener.accept().await?;
|
||||||
info!("new connection from {}", addr);
|
|
||||||
|
|
||||||
let backend = {
|
let conn_id = NEXT_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
let mut index = current_index.lock().await;
|
|
||||||
let selected_backend = backends[*index].clone();
|
|
||||||
*index = (*index + 1) % backends.len();
|
|
||||||
selected_backend
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("routing client {} to backend {}", addr, backend);
|
if let Some(backend) = balancer.choose_backend() {
|
||||||
|
tokio::spawn(async move {
|
||||||
if let Err(e) = tunnel(client, backend).await {
|
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
|
||||||
error!("proxy failed for {}: {}", addr, e);
|
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
eprintln!("error: no backendsd for conn_id={}", conn_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,52 +0,0 @@
|
|||||||
use std::fmt;
|
|
||||||
use tokio::io;
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
|
|
||||||
use std::error::Error;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Backend {
|
|
||||||
address: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Backend {
|
|
||||||
pub fn new(address: String) -> Self {
|
|
||||||
Backend { address }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for Backend {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "{}", self.address)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box<dyn Error>> {
|
|
||||||
let backend_address: String = backend.address.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await {
|
|
||||||
Ok(s) => {
|
|
||||||
info!("connected to backend {backend_address}");
|
|
||||||
s
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("failed connecting to backend {backend_address}: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (mut read_client, mut write_client) = client_stream.into_split();
|
|
||||||
let (mut read_backend, mut write_backend) = backend_stream.into_split();
|
|
||||||
|
|
||||||
let client_to_backend =
|
|
||||||
tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await });
|
|
||||||
|
|
||||||
let backend_to_client =
|
|
||||||
tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await });
|
|
||||||
|
|
||||||
let _ = tokio::join!(client_to_backend, backend_to_client);
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
43
src/proxy/mod.rs
Normal file
43
src/proxy/mod.rs
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
use crate::backend::Backend;
|
||||||
|
|
||||||
|
pub mod tcp;
|
||||||
|
|
||||||
|
pub struct ConnectionContext {
|
||||||
|
pub id: u64,
|
||||||
|
pub client_addr: SocketAddr,
|
||||||
|
pub start_time: Instant,
|
||||||
|
pub backend: Arc<Backend>,
|
||||||
|
pub bytes_transferred: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionContext {
|
||||||
|
pub fn new(id: u64, client_addr: SocketAddr, backend: Arc<Backend>) -> Self {
|
||||||
|
backend.inc_connections();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
client_addr,
|
||||||
|
start_time: Instant::now(),
|
||||||
|
backend,
|
||||||
|
bytes_transferred: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ConnectionContext {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.backend.dec_connections();
|
||||||
|
let duration = self.start_time.elapsed();
|
||||||
|
|
||||||
|
println!("info: conn_id={} closed. client={} backend={} bytes={} duration={:.2?}",
|
||||||
|
self.id,
|
||||||
|
self.client_addr,
|
||||||
|
self.backend.address,
|
||||||
|
self.bytes_transferred,
|
||||||
|
duration.as_secs_f64()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
26
src/proxy/tcp.rs
Normal file
26
src/proxy/tcp.rs
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use anywho::Error;
|
||||||
|
use crate::backend::Backend;
|
||||||
|
use crate::proxy::ConnectionContext;
|
||||||
|
|
||||||
|
pub async fn proxy_tcp_connection(connection_id: u64, mut client_stream: TcpStream, backend: Arc<Backend>) -> Result<(), Error> {
|
||||||
|
let client_addr = client_stream.peer_addr()?;
|
||||||
|
|
||||||
|
let mut ctx = ConnectionContext::new(connection_id, client_addr, backend.clone());
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
println!("info: conn_id={} connecting to {}", connection_id, ctx.backend.id);
|
||||||
|
|
||||||
|
let mut backend_stream = TcpStream::connect(&backend.address).await?;
|
||||||
|
|
||||||
|
let (tx, rx) = io::copy_bidirectional(
|
||||||
|
&mut client_stream,
|
||||||
|
&mut backend_stream,
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
ctx.bytes_transferred = tx + rx;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user