19 Commits

Author SHA1 Message Date
psun256
7eae7db1f6 clean up repo 2025-12-18 18:52:28 -05:00
psun256
54978770be added example 2025-12-10 23:54:54 -05:00
nnhphong
6001959cb4 add report 2025-12-10 23:51:08 -05:00
Jeremy Janella
26af533e73 Delete README.md 2025-12-10 23:28:09 -05:00
psun256
2e1446d300 changed shortest prefix match to longest prefix match; update dockerfile 2025-12-10 22:51:57 -05:00
psun256
36026049f9 removed separate port field from config for backends 2025-12-10 20:40:27 -05:00
Ning Qi (Paul) Sun
0496594bfa Merge pull request #3 from psun256/healthcheck
Healthcheck
2025-12-10 19:01:40 -05:00
Ning Qi (Paul) Sun
57b6e6f7a7 Merge branch 'main' into healthcheck 2025-12-10 18:59:20 -05:00
psun256
022c48a041 applied hot reload to health check logic 2025-12-10 18:58:47 -05:00
Phong Nguyen
ca5eba2ea1 Merge pull request #4 from psun256/tests/adaptive_weights
feat: adaptive weight tests
2025-12-10 18:29:37 -05:00
nnhphong
1d75a9c1a1 fix the test 2025-12-10 18:26:52 -05:00
nnhphong
7100614950 fix the test, all pass now 2025-12-10 18:08:54 -05:00
9cae3b767d feat: adaptive weight tests 2025-12-10 17:25:16 -05:00
Ning Qi (Paul) Sun
8212a1a762 Merge branch 'main' into healthcheck 2025-12-10 16:49:16 -05:00
Ning Qi (Paul) Sun
475cd65648 Merge pull request #2 from psun256/auto-reload
added config hot reload
2025-12-10 16:38:38 -05:00
psun256
8d5122ea1d fix server metrics framing issue 2025-12-10 16:28:51 -05:00
psun256
f9e3a08a4e added config hot reload 2025-12-10 15:37:02 -05:00
nnhphong
799adf9877 too much changes, idek anymore 2025-12-10 15:36:51 -05:00
Ning Qi (Paul) Sun
bfb2812402 Add GitHub Actions workflow for Rust project 2025-12-10 03:07:09 -05:00
21 changed files with 468 additions and 175 deletions

8
.idea/.gitignore generated vendored
View File

@@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@@ -2,6 +2,7 @@
<module type="EMPTY_MODULE" version="4"> <module type="EMPTY_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/examples" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" /> <excludeFolder url="file://$MODULE_DIR$/target" />
</content> </content>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DiscordProjectSettings">
<option name="show" value="ASK" />
<option name="description" value="" />
</component>
</project>

2
.idea/modules.xml generated
View File

@@ -2,7 +2,7 @@
<project version="4"> <project version="4">
<component name="ProjectModuleManager"> <component name="ProjectModuleManager">
<modules> <modules>
<module fileurl="file://$PROJECT_DIR$/.idea/l4lb.iml" filepath="$PROJECT_DIR$/.idea/l4lb.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/enginewhy.iml" filepath="$PROJECT_DIR$/.idea/enginewhy.iml" />
</modules> </modules>
</component> </component>
</project> </project>

151
.idea/workspace.xml generated Normal file
View File

@@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="ALL" />
</component>
<component name="CargoProjects">
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml">
<package file="$PROJECT_DIR$">
<feature name="default" enabled="true" />
</package>
</cargoProject>
</component>
<component name="ChangeListManager">
<list default="true" id="5bc80de0-5ea4-4d85-9c70-3d4dd48ecd6e" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/.gitignore" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/l4lb.iml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/modules.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/modules.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/config.yaml" beforeDir="false" afterPath="$PROJECT_DIR$/config.yaml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/report.md" beforeDir="false" afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/backend/health.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/backend/health.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/balancer/adaptive_weight.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/balancer/adaptive_weight.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/balancer/round_robin.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/balancer/round_robin.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/config/loader.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/config/loader.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/main.rs" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="ExecutionTargetManager" SELECTED_TARGET="RsBuildProfile:dev" />
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MacroExpansionManager">
<option name="directoryName" value="mBGI6hzN" />
</component>
<component name="ProjectColorInfo"><![CDATA[{
"customColor": "",
"associatedIndex": 2
}]]></component>
<component name="ProjectId" id="372Va98jrjZ8PHjIMvsLlQC60VZ" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"Cargo.clippy.executor": "Run",
"ModuleVcsDetector.initialDetectionPerformed": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"RunOnceActivity.git.unshallow": "true",
"RunOnceActivity.rust.reset.selective.auto.import": "true",
"com.intellij.ml.llm.matterhorn.ej.ui.settings.DefaultModelSelectionForGA.v1": "true",
"git-widget-placeholder": "main",
"junie.onboarding.icon.badge.shown": "true",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"org.rust.cargo.project.model.PROJECT_DISCOVERY": "true",
"org.rust.first.attach.projects": "true",
"settings.editor.selected.configurable": "language.rust.build.tool.cargo",
"to.speed.mode.migration.done": "true",
"vue.rearranger.settings.migration": "true"
}
}]]></component>
<component name="RunAnythingCache">
<option name="myCommands">
<command value="cargo clippy" />
</option>
</component>
<component name="RunManager" selected="Cargo.clippy">
<configuration name="Run l4lb" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package l4lb --bin l4lb" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
<configuration name="Test l4lb" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="test --workspace" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
<configuration name="clippy" type="CargoCommandRunConfiguration" factoryName="Cargo Command" temporary="true">
<option name="command" value="clippy" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
<recent_temporary>
<list>
<item itemvalue="Cargo.clippy" />
<item itemvalue="Cargo.clippy" />
</list>
</recent_temporary>
</component>
<component name="RustProjectSettings">
<option name="toolchainHomeDirectory" value="$USER_HOME$/.cargo/bin" />
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="5bc80de0-5ea4-4d85-9c70-3d4dd48ecd6e" name="Changes" comment="" />
<created>1766101214189</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1766101214189</updated>
<workItem from="1766101215227" duration="703000" />
</task>
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
</project>

View File

@@ -33,8 +33,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
# change to scratch and get comment the apk command for prod, i guess # change to scratch and get comment the apk command for prod, i guess
FROM alpine:latest AS runtime FROM alpine:latest AS runtime
# RUN apk add --no-cache ca-certificates curl netcat-openbsd bind-tools strace RUN apk add --no-cache ca-certificates curl netcat-openbsd bind-tools strace
WORKDIR /enginewhy WORKDIR /enginewhy
COPY --from=builder /enginewhy/target/x86_64-unknown-linux-musl/release/l4lb /usr/bin/l4lb COPY --from=builder /enginewhy/target/x86_64-unknown-linux-musl/release/l4lb /usr/bin/l4lb
COPY config.yaml .
ENTRYPOINT ["l4lb"] ENTRYPOINT ["l4lb"]

226
README.md
View File

@@ -1,128 +1,122 @@
# nginy # nginy
Production't graden't load balancer.
## Quick links EngineWhy, also known as nginy, is a simple but flexible TCP load balancer.
## Todo ## Contents
- [ ] architecture astronauting - [Features](#features)
- balancer module - [Getting started](#getting-started)
- just the algorithms i guess - [Running as a standalone binary](#running-as-a-standalone-binary)
- - [Running as a dockerized application](#running-as-a-dockerized-application)
- backend module - [Configuration](#configuration)
- manages the backend pool - [Examples](#examples)
- deals with health / load check
- BackendPool for all the backends stored together
- Backend for individual backends
- has some methods used by balancer module to pick a suitable backend
- proxy module
- all the different supported protocols to handle
- will create a session / stream context structure (ConnectionContext)
- not globally tracked (this might change for UDP!)
- mainly some metadata
- config module
- set up all the stuff or something
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
- [ ] basic backend pooling
- [ ] layer 4 load balancing
## notes ## Features
tcp, for nginx (and haproxy, its similar):
```c
// nginx
struct ngx_connection_s {
void *data;
ngx_event_t *read;
ngx_event_t *write;
ngx_socket_t fd; - Multiple different load balancing algorithms
- Round Robin
- IP Hashing
- Adaptive algorithm (based on [this paper](https://www.wcse.org/WCSE_2018/W110.pdf))
- Cluster based backend grouping, and CIDR based client matching for fine-grained control.
- YAML based config for readability and simplicity.
- Automatic hot-reload of configuration without interruptions to service.
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol ## Getting started
ngx_send_pt send; // ditto
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening; ### Running as a standalone binary
You must have the latest stable Rust release (1.91 at the time of writing), and a valid configuration file (see [Configuration](#configuration) for details).
off_t sent; 1. Clone the repository:
```sh
ngx_log_t *log; git clone https://github.com/psun256/enginewhy.git
cd enginewhy
ngx_pool_t *pool; ```
2. Build the application:
int type; ```sh
cargo build --release
struct sockaddr *sockaddr; ```
socklen_t socklen; 3. Run the application:
ngx_str_t addr_text; ```sh
./target/release/l4lb -c path/to/config.yaml
ngx_proxy_protocol_t *proxy_protocol;
#if (NGX_QUIC || NGX_COMPAT)
ngx_quic_stream_t *quic;
#endif
#if (NGX_SSL || NGX_COMPAT)
ngx_ssl_connection_t *ssl;
#endif
ngx_udp_connection_t *udp; // additional stuff for UDP (which is technically connectionless, but they use timeouts and a rbtree to store "sessions")
struct sockaddr *local_sockaddr;
socklen_t local_socklen;
ngx_buf_t *buffer;
ngx_queue_t queue;
ngx_atomic_uint_t number;
ngx_msec_t start_time;
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned pipeline:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned shared:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
unsigned need_last_buf:1;
unsigned need_flush_buf:1;
#if (NGX_HAVE_SENDFILE_NODISKIO || NGX_COMPAT)
unsigned busy_count:2;
#endif
#if (NGX_THREADS || NGX_COMPAT)
ngx_thread_task_t *sendfile_task;
#endif
};
``` ```
process to load balance:
- accept incoming connection
- create some kind of stream / session object
- nginx use this to abstract around tcp and udp layers
- for us we probably don't need as detailed as them, since we have tokio::net, so itll be a wrapper around TcpStream
- ask the load balancing algorithm which server in the pool to route to
- connect to the server
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
### Running as a dockerized application
You may also consider running the load balancer as a dockerized application.
### UDP 1. Clone the repository:
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional ```sh
but async write and read don't work on just regular datagrams, so probably not possible. git clone https://github.com/psun256/enginewhy.git
cd enginewhy
```
2. Build the application:
```sh
docker build -t enginewhy .
```
3. Run the application:
```sh
docker run -v "path/to/config.yaml:/enginewhy/config.yaml" enginewhy
```
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections. ## Configuration
Configuration file is written in YAML.
By default, the program will look for a file named `config.yaml` in the working directory.
You can change this by specifying the path with `-c` or `--config` when running the program.
The file consists of:
- Defining the health response and iperf port (IP + port).
- A set of backends (IP + Port for each).
- A set of clusters, which act as group aliases for a set of backends.
- A list of rules, which consist of:
- Clients: One or more CIDR + port number, used to match incoming client connection
- Targets: One or more clusters / backend names.
- Strategy: A load balancing algorithm to use.
- Some algorithms have additional configuration, like the Adaptive algorithm.
Sample configuration:
```yml
healthcheck_addr: "10.0.1.10:9000"
iperf_addr: "10.0.1.10:5201"
backends:
- id: "srv-1"
ip: "10.0.1.11:8081"
- id: "srv-2"
ip: "10.0.1.12:8082"
- id: "srv-3"
ip: "10.0.1.13:8083"
- id: "srv-4"
ip: "10.0.1.14:8084"
clusters:
main-api:
- "srv-1"
- "srv-2"
priority-api:
- "srv-3"
- "srv-4"
rules:
- clients:
- "0.0.0.0/0:8080"
targets:
- "main-api"
strategy:
type: "RoundRobin"
- clients:
- "10.0.0.0/24:8080"
- "10.0.0.0/24:25565"
targets:
- "main-api"
- "priority-api"
strategy:
type: "RoundRobin"
```
An incoming client will be matched with whatever rule has the longest matching prefix on the correct port.
## Examples
You can find some examples in the `examples` directory of the project.
To run these, just run `docker compose up`.

View File

@@ -1,39 +1,38 @@
healthcheck_addr: "127.0.0.1:9000" healthcheck_addr: "10.0.1.10:9000"
iperf_addr: "0.0.0.0:5200" iperf_addr: "10.0.1.10:5201"
backends: backends:
- id: "srv-1" - id: "srv-1"
ip: "127.0.0.1" ip: "10.0.1.11:8081"
port: 8081
- id: "srv-2" - id: "srv-2"
ip: "127.0.0.1" ip: "10.0.1.12:8082"
port: 8082 - id: "srv-3"
ip: "10.0.1.13:8083"
- id: "srv-4"
ip: "10.0.1.14:8084"
clusters: clusters:
main-api: main-api:
- "srv-1" - "srv-1"
- "srv-2" - "srv-2"
priority-api: priority-api:
- "srv-1" - "srv-3"
- "srv-4"
rules: rules:
- clients: - clients:
- "0.0.0.0/0:8888" - "0.0.0.0/0:8080"
targets: targets:
- "main-api" - "main-api"
strategy: strategy:
type: "RoundRobin" type: "RoundRobin"
- clients: - clients:
- "0.0.0.0/0:6767" - "10.0.0.0/24:8080"
- "0.0.0.0/0:6969" - "10.0.0.0/24:25565"
targets: # no issues with duplicate servers or clusters targets:
- "priority-api" - "main-api"
- "priority-api"
- "priority-api" - "priority-api"
strategy: strategy:
type: "Adaptive" type: "RoundRobin"
coefficients: [ 1.5, 1.0, 0.5, 0.1 ]
alpha: 0.75

View File

@@ -1,5 +1,4 @@
services: services:
# two-arm load balancer
load-balancer: load-balancer:
image: neoslhp/enginewhy-lb image: neoslhp/enginewhy-lb
container_name: load-balancer container_name: load-balancer

View File

@@ -0,0 +1,38 @@
healthcheck_addr: "10.0.1.10:9000"
iperf_addr: "10.0.1.10:5201"
backends:
- id: "srv-1"
ip: "10.0.1.11:8081"
- id: "srv-2"
ip: "10.0.1.12:8082"
- id: "srv-3"
ip: "10.0.1.13:8083"
- id: "srv-4"
ip: "10.0.1.14:8084"
clusters:
main-api:
- "srv-1"
- "srv-2"
priority-api:
- "srv-3"
- "srv-4"
rules:
- clients:
- "0.0.0.0/0:8080"
targets:
- "main-api"
strategy:
type: "RoundRobin"
- clients:
- "10.0.0.0/24:8080"
- "10.0.0.0/24:25565"
targets:
- "main-api"
- "priority-api"
strategy:
type: "RoundRobin"

View File

@@ -0,0 +1,110 @@
services:
load-balancer:
image: enginewhy
container_name: load-balancer
tty: true
cap_add:
- NET_ADMIN
- SYS_ADMIN
volumes:
- ./config.yaml:/enginewhy/config.yaml
networks:
net_1:
ipv4_address: 10.0.1.10
net_2:
ipv4_address: 10.0.0.10
net_3:
ipv4_address: 10.0.2.10
srv-1:
image: nicolaka/netshoot
container_name: srv-1
tty: true
command: ["python3", "-m", "http.server", "8081", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.11
ports:
- "8081:8081"
volumes:
- ./srv1:/root/www
cap_add: [ NET_ADMIN ]
srv-2:
image: nicolaka/netshoot
container_name: srv-2
tty: true
command: ["python3", "-m", "http.server", "8082", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.12
ports:
- "8082:8082"
volumes:
- ./srv2:/root/www
cap_add: [ NET_ADMIN ]
srv-3:
image: nicolaka/netshoot
container_name: srv-3
tty: true
command: ["python3", "-m", "http.server", "8083", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.13
ports:
- "8083:8083"
volumes:
- ./srv3:/root/www
cap_add: [ NET_ADMIN ]
srv-4:
image: nicolaka/netshoot
container_name: srv-4
tty: true
command: ["python3", "-m", "http.server", "8084", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.14
ports:
- "8084:8084"
volumes:
- ./srv4:/root/www
cap_add: [ NET_ADMIN ]
client-net2:
image: nicolaka/netshoot
container_name: client-net2
tty: true
networks:
net_2:
ipv4_address: 10.0.0.11
cap_add: [ NET_ADMIN ]
client-net3:
image: nicolaka/netshoot
container_name: client-net3
tty: true
networks:
net_3:
ipv4_address: 10.0.2.11
cap_add: [ NET_ADMIN ]
networks:
net_1:
driver: bridge
ipam:
config:
- subnet: 10.0.1.0/24
net_2:
driver: bridge
ipam:
config:
- subnet: 10.0.0.0/24
net_3:
driver: bridge
ipam:
config:
- subnet: 10.0.2.0/24

View File

@@ -0,0 +1 @@
Hello from server 1!

View File

@@ -0,0 +1 @@
Hello from server 2!

View File

@@ -0,0 +1 @@
Hello from server 3!

View File

@@ -0,0 +1 @@
Hello from server 4!

View File

@@ -1,10 +1,10 @@
use rperf3::{Config, Server};
use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use serde_json::Value;
use rperf3::{Config, Server};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::net::{TcpSocket, TcpStream};
// Physical server health statistics, used for certain load balancing algorithms // Physical server health statistics, used for certain load balancing algorithms
#[derive(Debug, Default)] #[derive(Debug, Default)]
@@ -47,14 +47,14 @@ pub async fn start_healthcheck_listener(
let listener = listener.ok_or_else(|| { let listener = listener.ok_or_else(|| {
eprintln!("health listener could not bind to port"); eprintln!("health listener could not bind to port");
std::io::Error::new(std::io::ErrorKind::Other, "health listener failed") std::io::Error::other("health listener failed")
})?; })?;
println!("healthcheck server listening on {}", addr); println!("healthcheck server listening on {}", addr);
loop { loop {
let (stream, remote_addr) = match listener.accept().await { let (stream, _remote_addr) = match listener.accept().await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(_e) => {
continue; continue;
} }
}; };

View File

@@ -1,11 +1,10 @@
use crate::backend::{Backend, BackendPool};
use crate::backend::health::ServerMetrics; use crate::backend::health::ServerMetrics;
use crate::backend::{Backend, BackendPool};
use crate::balancer::{Balancer, ConnectionInfo}; use crate::balancer::{Balancer, ConnectionInfo};
use rand::prelude::*; use rand::prelude::*;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::Metadata; use std::sync::Arc;
use std::sync::{Arc, RwLock};
#[derive(Debug)] #[derive(Debug)]
struct AdaptiveNode { struct AdaptiveNode {
@@ -49,7 +48,7 @@ impl AdaptiveWeightBalancer {
} }
impl Balancer for AdaptiveWeightBalancer { impl Balancer for AdaptiveWeightBalancer {
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> { fn choose_backend(&mut self, _ctx: ConnectionInfo) -> Option<Arc<Backend>> {
if self.pool.is_empty() { if self.pool.is_empty() {
return None; return None;
} }
@@ -148,6 +147,7 @@ mod tests {
use super::*; use super::*;
use crate::backend::Backend; use crate::backend::Backend;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::RwLock;
fn backend_factory(id: &str, ip: &str, port: u16) -> Arc<Backend> { fn backend_factory(id: &str, ip: &str, port: u16) -> Arc<Backend> {
Arc::new(Backend::new( Arc::new(Backend::new(

View File

@@ -19,7 +19,7 @@ impl RoundRobinBalancer {
} }
impl Balancer for RoundRobinBalancer { impl Balancer for RoundRobinBalancer {
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> { fn choose_backend(&mut self, _ctx: ConnectionInfo) -> Option<Arc<Backend>> {
let backends = self.pool.backends.clone(); let backends = self.pool.backends.clone();
if backends.is_empty() { if backends.is_empty() {
return None; return None;

View File

@@ -7,6 +7,7 @@ use crate::backend::health::*;
use crate::backend::*; use crate::backend::*;
use crate::balancer::Balancer; use crate::balancer::Balancer;
use crate::balancer::adaptive_weight::AdaptiveWeightBalancer; use crate::balancer::adaptive_weight::AdaptiveWeightBalancer;
use crate::balancer::ip_hashing::SourceIPHash;
use crate::balancer::round_robin::RoundRobinBalancer; use crate::balancer::round_robin::RoundRobinBalancer;
use crate::config::*; use crate::config::*;
@@ -36,11 +37,11 @@ pub fn build_lb(
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new(); let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
for backend_cfg in &config.backends { for backend_cfg in &config.backends {
let ip: IpAddr = backend_cfg let addr: SocketAddr = backend_cfg
.ip .ip
.parse() .parse()
.map_err(|_| format!("bad ip: {}", backend_cfg.ip))?; .map_err(|_| format!("bad ip: {}", backend_cfg.ip))?;
let addr = SocketAddr::new(ip, backend_cfg.port); let ip = addr.ip();
let health = healths let health = healths
.entry(ip) .entry(ip)
@@ -88,7 +89,7 @@ pub fn build_lb(
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new(); let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
for client_def in &rule.clients { for client_def in &rule.clients {
let (cidr, port) = parse_client(&client_def)?; let (cidr, port) = parse_client(client_def)?;
port_groups.entry(port).or_default().push(cidr); port_groups.entry(port).or_default().push(cidr);
} }
@@ -102,6 +103,7 @@ pub fn build_lb(
let balancer: Box<dyn Balancer + Send> = match &rule.strategy { let balancer: Box<dyn Balancer + Send> = match &rule.strategy {
LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)), LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)),
LoadBalancerStrategy::SourceIPHash => Box::new(SourceIPHash::new(pool)),
LoadBalancerStrategy::Adaptive { LoadBalancerStrategy::Adaptive {
coefficients, coefficients,
alpha, alpha,
@@ -121,7 +123,7 @@ pub fn build_lb(
for table in listeners.values_mut() { for table in listeners.values_mut() {
table table
.entries .entries
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); .sort_by(|(a, _), (b, _)| b.network_length().cmp(&a.network_length()));
} }
Ok((listeners, healths)) Ok((listeners, healths))

View File

@@ -44,7 +44,6 @@ pub struct AppConfig {
pub struct BackendConfig { pub struct BackendConfig {
pub id: String, pub id: String,
pub ip: String, pub ip: String,
pub port: u16,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@@ -58,5 +57,6 @@ pub struct RuleConfig {
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum LoadBalancerStrategy { pub enum LoadBalancerStrategy {
RoundRobin, RoundRobin,
SourceIPHash,
Adaptive { coefficients: [f64; 4], alpha: f64 }, Adaptive { coefficients: [f64; 4], alpha: f64 },
} }

View File

@@ -3,25 +3,22 @@ mod balancer;
mod config; mod config;
mod proxy; mod proxy;
use crate::backend::health::{start_healthcheck_listener, start_iperf_server, ServerMetrics}; use crate::backend::health::{ServerMetrics, start_healthcheck_listener, start_iperf_server};
use crate::balancer::ConnectionInfo; use crate::balancer::ConnectionInfo;
use crate::config::loader::{build_lb, RoutingTable}; use crate::config::loader::{RoutingTable, build_lb};
use crate::proxy::tcp::proxy_tcp_connection; use crate::proxy::tcp::proxy_tcp_connection;
use anywho::Error; use anywho::Error;
use clap::Parser;
use notify::{Event, RecursiveMode, Watcher};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::hash::Hash; use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::io::AsyncBufReadExt;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use clap::Parser;
use notify::{Event, RecursiveMode, Watcher};
use std::cmp;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
@@ -72,10 +69,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(1); let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| { let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res { if let Ok(event) = res
if event.kind.is_modify() { && event.kind.is_modify()
let _ = tx.blocking_send(()); {
} let _ = tx.blocking_send(());
} }
}) })
.unwrap(); .unwrap();