Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd23bfdf5a | ||
| 4cdf2db0c9 | |||
| 606880f928 | |||
| 19cd5b7f2a | |||
|
|
25c3eb9511 | ||
|
|
e27bd2aaf0 | ||
|
|
1235d3611d |
22
.github/workflows/rust.yml
vendored
Normal file
22
.github/workflows/rust.yml
vendored
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
name: Rust
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ "main" ]
|
||||||
|
pull_request:
|
||||||
|
branches: [ "main" ]
|
||||||
|
|
||||||
|
env:
|
||||||
|
CARGO_TERM_COLOR: always
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- name: Build
|
||||||
|
run: cargo build --verbose
|
||||||
|
- name: Run tests
|
||||||
|
run: cargo test --verbose
|
||||||
@@ -7,7 +7,8 @@ Production't graden't load balancer.
|
|||||||
- [ ] architecture astronauting
|
- [ ] architecture astronauting
|
||||||
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
|
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
|
||||||
- [ ] basic backend pooling
|
- [ ] basic backend pooling
|
||||||
- [ ] layer 4 load balancing
|
- [ ] layer 4 load balancing
|
||||||
|
- [ ] load balancing algorithm from the paper (https://www.wcse.org/WCSE_2018/W110.pdf)
|
||||||
|
|
||||||
## notes
|
## notes
|
||||||
tcp, for nginx (and haproxy, its similar):
|
tcp, for nginx (and haproxy, its similar):
|
||||||
@@ -20,7 +21,7 @@ struct ngx_connection_s {
|
|||||||
|
|
||||||
ngx_socket_t fd;
|
ngx_socket_t fd;
|
||||||
|
|
||||||
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol
|
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for dfferent platforms / protocol
|
||||||
ngx_send_pt send; // ditto
|
ngx_send_pt send; // ditto
|
||||||
ngx_recv_chain_pt recv_chain;
|
ngx_recv_chain_pt recv_chain;
|
||||||
ngx_send_chain_pt send_chain;
|
ngx_send_chain_pt send_chain;
|
||||||
@@ -102,4 +103,4 @@ process to load balance:
|
|||||||
- ask the load balancing algorithm which server in the pool to route to
|
- ask the load balancing algorithm which server in the pool to route to
|
||||||
- connect to the server
|
- connect to the server
|
||||||
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
|
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
|
||||||
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
|
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
|
||||||
|
|||||||
64
src/main.rs
64
src/main.rs
@@ -1,31 +1,55 @@
|
|||||||
|
macro_rules! info {
|
||||||
|
($($arg:tt)*) => {{
|
||||||
|
print!("info: ");
|
||||||
|
println!($($arg)*);
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! error {
|
||||||
|
($($arg:tt)*) => {
|
||||||
|
eprint!("error: ");
|
||||||
|
eprintln!($($arg)*);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
mod netutils;
|
||||||
|
|
||||||
|
use anywho::Error;
|
||||||
|
use netutils::{Backend, tunnel};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Error> {
|
||||||
|
let backends = Arc::new(vec![
|
||||||
|
Backend::new("127.0.0.1:8081".to_string()),
|
||||||
|
Backend::new("127.0.0.1:8082".to_string()),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let current_index = Arc::new(Mutex::new(0));
|
||||||
|
|
||||||
|
info!("enginewhy starting on 0.0.0.0:8080");
|
||||||
|
info!("backends: {:?}", backends);
|
||||||
|
|
||||||
let listener = TcpListener::bind("0.0.0.0:8080").await?;
|
let listener = TcpListener::bind("0.0.0.0:8080").await?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (mut socket, _) = listener.accept().await?;
|
let (client, addr) = listener.accept().await?;
|
||||||
|
info!("new connection from {}", addr);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
let backend = {
|
||||||
let mut buf = [0; 1024];
|
let mut index = current_index.lock().await;
|
||||||
|
let selected_backend = backends[*index].clone();
|
||||||
|
*index = (*index + 1) % backends.len();
|
||||||
|
selected_backend
|
||||||
|
};
|
||||||
|
|
||||||
loop {
|
info!("routing client {} to backend {}", addr, backend);
|
||||||
let n = match socket.read(&mut buf).await {
|
|
||||||
Ok(0) => return,
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("failed to read from socket; err = {:?}", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = socket.write_all(&buf[0..n]).await {
|
if let Err(e) = tunnel(client, backend).await {
|
||||||
eprintln!("failed to write to socket; err = {:?}", e);
|
error!("proxy failed for {}: {}", addr, e);
|
||||||
return;
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
52
src/netutils.rs
Normal file
52
src/netutils.rs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
use std::fmt;
|
||||||
|
use tokio::io;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Backend {
|
||||||
|
address: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Backend {
|
||||||
|
pub fn new(address: String) -> Self {
|
||||||
|
Backend { address }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Backend {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box<dyn Error>> {
|
||||||
|
let backend_address: String = backend.address.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await {
|
||||||
|
Ok(s) => {
|
||||||
|
info!("connected to backend {backend_address}");
|
||||||
|
s
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("failed connecting to backend {backend_address}: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (mut read_client, mut write_client) = client_stream.into_split();
|
||||||
|
let (mut read_backend, mut write_backend) = backend_stream.into_split();
|
||||||
|
|
||||||
|
let client_to_backend =
|
||||||
|
tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await });
|
||||||
|
|
||||||
|
let backend_to_client =
|
||||||
|
tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await });
|
||||||
|
|
||||||
|
let _ = tokio::join!(client_to_backend, backend_to_client);
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user