7 Commits

Author SHA1 Message Date
Ning Qi (Paul) Sun
cd23bfdf5a Merge pull request #1 from psun256/merge
Merge & Refactor
2025-12-06 16:09:51 -05:00
4cdf2db0c9 feat: improved logging 2025-12-06 02:16:40 -05:00
606880f928 feat: merged repos 2025-12-06 01:31:33 -05:00
19cd5b7f2a feat: modularized proxy 2025-12-06 00:21:53 -05:00
Ning Qi (Paul) Sun
25c3eb9511 gh action
gh action
2025-12-03 22:07:40 -05:00
psun256
e27bd2aaf0 layer 4 load balancing (round robin, hardcoded backends) 2025-11-29 21:46:26 -05:00
Phong Nguyen
1235d3611d Update README with load balancing details
Added a note about load balancing algorithms from a referenced paper.
2025-12-03 12:47:46 -05:00
4 changed files with 122 additions and 23 deletions

22
.github/workflows/rust.yml vendored Normal file
View File

@@ -0,0 +1,22 @@
name: Rust
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

View File

@@ -8,6 +8,7 @@ Production't graden't load balancer.
- [ ] stream / session handling (i think wrapper around tokio TcpStream) - [ ] stream / session handling (i think wrapper around tokio TcpStream)
- [ ] basic backend pooling - [ ] basic backend pooling
- [ ] layer 4 load balancing - [ ] layer 4 load balancing
- [ ] load balancing algorithm from the paper (https://www.wcse.org/WCSE_2018/W110.pdf)
## notes ## notes
tcp, for nginx (and haproxy, its similar): tcp, for nginx (and haproxy, its similar):
@@ -20,7 +21,7 @@ struct ngx_connection_s {
ngx_socket_t fd; ngx_socket_t fd;
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for dfferent platforms / protocol
ngx_send_pt send; // ditto ngx_send_pt send; // ditto
ngx_recv_chain_pt recv_chain; ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain; ngx_send_chain_pt send_chain;

View File

@@ -1,31 +1,55 @@
macro_rules! info {
($($arg:tt)*) => {{
print!("info: ");
println!($($arg)*);
}};
}
macro_rules! error {
($($arg:tt)*) => {
eprint!("error: ");
eprintln!($($arg)*);
};
}
mod netutils;
use anywho::Error;
use netutils::{Backend, tunnel};
use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Error> {
let backends = Arc::new(vec![
Backend::new("127.0.0.1:8081".to_string()),
Backend::new("127.0.0.1:8082".to_string()),
]);
let current_index = Arc::new(Mutex::new(0));
info!("enginewhy starting on 0.0.0.0:8080");
info!("backends: {:?}", backends);
let listener = TcpListener::bind("0.0.0.0:8080").await?; let listener = TcpListener::bind("0.0.0.0:8080").await?;
loop { loop {
let (mut socket, _) = listener.accept().await?; let (client, addr) = listener.accept().await?;
info!("new connection from {}", addr);
tokio::spawn(async move { let backend = {
let mut buf = [0; 1024]; let mut index = current_index.lock().await;
let selected_backend = backends[*index].clone();
*index = (*index + 1) % backends.len();
selected_backend
};
loop { info!("routing client {} to backend {}", addr, backend);
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await { if let Err(e) = tunnel(client, backend).await {
eprintln!("failed to write to socket; err = {:?}", e); error!("proxy failed for {}: {}", addr, e);
return; }
}
}
});
} }
} }

52
src/netutils.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::fmt;
use tokio::io;
use tokio::net::TcpStream;
use std::error::Error;
#[derive(Clone, Debug)]
pub struct Backend {
address: String,
}
impl Backend {
pub fn new(address: String) -> Self {
Backend { address }
}
}
impl fmt::Display for Backend {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.address)
}
}
pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box<dyn Error>> {
let backend_address: String = backend.address.clone();
tokio::spawn(async move {
let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await {
Ok(s) => {
info!("connected to backend {backend_address}");
s
}
Err(e) => {
error!("failed connecting to backend {backend_address}: {e}");
return;
}
};
let (mut read_client, mut write_client) = client_stream.into_split();
let (mut read_backend, mut write_backend) = backend_stream.into_split();
let client_to_backend =
tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await });
let backend_to_client =
tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await });
let _ = tokio::join!(client_to_backend, backend_to_client);
});
Ok(())
}