Compare commits
11 Commits
balancer
...
tests/adap
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a68e4b17b | ||
|
|
a9db727bde | ||
| 3b96043dc6 | |||
|
|
5a5106645c | ||
|
|
9fb423b949 | ||
|
|
90d326ba33 | ||
|
|
8170d2a6bf | ||
|
|
9046a85d84 | ||
|
|
20b51c2562 | ||
|
|
a3f50c1f0a | ||
|
|
07cb45fa73 |
286
Cargo.lock
generated
286
Cargo.lock
generated
@@ -2,12 +2,43 @@
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"getrandom",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anywho"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2"
|
||||
|
||||
[[package]]
|
||||
name = "arraydeque"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.22.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.10.0"
|
||||
@@ -27,22 +58,100 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.16"
|
||||
name = "chacha20"
|
||||
version = "0.10.0-rc.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
|
||||
checksum = "99cbf41c6ec3c4b9eaf7f8f5c11a72cd7d3aa0428125c20d5ef4d09907a0f019"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cidr"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60"
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "encoding_rs"
|
||||
version = "0.8.35"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "encoding_rs_io"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cc3c5651fb62ab8aa3103998dade57efdd028544bd300516baa31840c252a83"
|
||||
dependencies = [
|
||||
"encoding_rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
"r-efi",
|
||||
"wasip2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
|
||||
dependencies = [
|
||||
"foldhash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashlink"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
|
||||
dependencies = [
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
||||
|
||||
[[package]]
|
||||
name = "l4lb"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anywho",
|
||||
"cidr",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde-saphyr",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -61,6 +170,12 @@ dependencies = [
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "1.1.0"
|
||||
@@ -72,6 +187,27 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nohash-hasher"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.5"
|
||||
@@ -91,7 +227,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"smallvec 1.15.1",
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
@@ -101,15 +237,6 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
|
||||
dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.103"
|
||||
@@ -129,34 +256,27 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.5"
|
||||
name = "r-efi"
|
||||
version = "5.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
]
|
||||
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.3.1"
|
||||
name = "rand"
|
||||
version = "0.10.0-rc.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||
checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"chacha20",
|
||||
"getrandom",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.6.4"
|
||||
version = "0.10.0-rc-2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
checksum = "104a23e4e8b77312a823b6b5613edbac78397e2f34320bc7ac4277013ec4478e"
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
@@ -167,12 +287,89 @@ dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
|
||||
|
||||
[[package]]
|
||||
name = "saphyr-parser"
|
||||
version = "0.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fb771b59f6b1985d1406325ec28f97cfb14256abcec4fdfb37b36a1766d6af7"
|
||||
dependencies = [
|
||||
"arraydeque",
|
||||
"hashlink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
|
||||
dependencies = [
|
||||
"serde_core",
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde-saphyr"
|
||||
version = "0.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b9e06cddad47cc6214c0c456cf209b99a58b54223e7af2f6d4b88a5a9968499"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"base64",
|
||||
"encoding_rs_io",
|
||||
"nohash-hasher",
|
||||
"num-traits",
|
||||
"ryu",
|
||||
"saphyr-parser",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"smallvec 2.0.0-alpha.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_core"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.145"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
"ryu",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.6"
|
||||
@@ -188,6 +385,12 @@ version = "1.15.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "2.0.0-alpha.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef784004ca8777809dcdad6ac37629f0a97caee4c685fcea805278d81dd8b857"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.6.1"
|
||||
@@ -243,12 +446,27 @@ version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.1+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
|
||||
|
||||
[[package]]
|
||||
name = "wasip2"
|
||||
version = "1.0.1+wasi-0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
|
||||
dependencies = [
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-link"
|
||||
version = "0.2.1"
|
||||
@@ -338,6 +556,12 @@ version = "0.53.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
|
||||
|
||||
[[package]]
|
||||
name = "wit-bindgen"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.31"
|
||||
|
||||
@@ -6,4 +6,7 @@ edition = "2024"
|
||||
[dependencies]
|
||||
anywho = "0.1.2"
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
rand = "0.10.0-rc.5"
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
cidr = "0.3.1"
|
||||
serde-saphyr = "0.0.10"
|
||||
|
||||
33
README.md
33
README.md
@@ -5,10 +5,25 @@ Production't graden't load balancer.
|
||||
|
||||
## Todo
|
||||
- [ ] architecture astronauting
|
||||
- balancer module
|
||||
- just the algorithms i guess
|
||||
-
|
||||
- backend module
|
||||
- manages the backend pool
|
||||
- deals with health / load check
|
||||
- BackendPool for all the backends stored together
|
||||
- Backend for individual backends
|
||||
- has some methods used by balancer module to pick a suitable backend
|
||||
- proxy module
|
||||
- all the different supported protocols to handle
|
||||
- will create a session / stream context structure (ConnectionContext)
|
||||
- not globally tracked (this might change for UDP!)
|
||||
- mainly some metadata
|
||||
- config module
|
||||
- set up all the stuff or something
|
||||
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
|
||||
- [ ] basic backend pooling
|
||||
- [ ] layer 4 load balancing
|
||||
- [ ] load balancing algorithm from the paper (https://www.wcse.org/WCSE_2018/W110.pdf)
|
||||
|
||||
## notes
|
||||
tcp, for nginx (and haproxy, its similar):
|
||||
@@ -21,7 +36,7 @@ struct ngx_connection_s {
|
||||
|
||||
ngx_socket_t fd;
|
||||
|
||||
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for dfferent platforms / protocol
|
||||
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol
|
||||
ngx_send_pt send; // ditto
|
||||
ngx_recv_chain_pt recv_chain;
|
||||
ngx_send_chain_pt send_chain;
|
||||
@@ -105,11 +120,9 @@ process to load balance:
|
||||
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
|
||||
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
|
||||
|
||||
## Load balancer algorithm
|
||||
- Choose a fixed weight coefficient for the resource parameter
|
||||
- Spawn a thread on a load balancer to host the iperf server, used for new onboarding server connecting to the load balancer to measure their maximum bandwidth
|
||||
- Spawn another thread for listening to resource update from connected server
|
||||
- Update the comprehensive load sum from eq (1), update the formula in eq (2) to (5)
|
||||
- Choose alpha for eq (8), and run the algorithm to choose which server
|
||||
- Extract the server from the server id using ```get_backend()```
|
||||
- Use ```tunnel()``` to proxy the packet
|
||||
|
||||
### UDP
|
||||
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional
|
||||
but async write and read don't work on just regular datagrams, so probably not possible.
|
||||
|
||||
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections.
|
||||
35
config.yaml
Normal file
35
config.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
backends:
|
||||
- id: "srv-1"
|
||||
ip: "127.0.0.1"
|
||||
port: 8081
|
||||
|
||||
- id: "srv-2"
|
||||
ip: "127.0.0.1"
|
||||
port: 8082
|
||||
|
||||
clusters:
|
||||
main-api:
|
||||
- "srv-1"
|
||||
- "srv-2"
|
||||
priority-api:
|
||||
- "srv-1"
|
||||
|
||||
rules:
|
||||
- clients:
|
||||
- "0.0.0.0/0:8080"
|
||||
targets:
|
||||
- "main-api"
|
||||
strategy:
|
||||
type: "RoundRobin"
|
||||
|
||||
- clients:
|
||||
- "0.0.0.0/0:6767"
|
||||
- "0.0.0.0/0:6969"
|
||||
targets: # no issues with duplicate servers or clusters
|
||||
- "priority-api"
|
||||
- "priority-api"
|
||||
- "priority-api"
|
||||
strategy:
|
||||
type: "Adaptive"
|
||||
coefficients: [ 1.5, 1.0, 0.5, 0.1 ]
|
||||
alpha: 0.75
|
||||
89
docker-compose.yml
Normal file
89
docker-compose.yml
Normal file
@@ -0,0 +1,89 @@
|
||||
services:
|
||||
# two-arm load balancer
|
||||
load-balancer:
|
||||
image: neoslhp/enginewhy-lb
|
||||
container_name: load-balancer
|
||||
tty: true
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: "4.0"
|
||||
memory: 8G
|
||||
cap_add:
|
||||
- NET_ADMIN
|
||||
- SYS_ADMIN
|
||||
networks:
|
||||
internal:
|
||||
ipv4_address: 172.67.67.67
|
||||
external:
|
||||
ipv4_address: 192.67.67.67
|
||||
|
||||
|
||||
server1-high-cpu:
|
||||
image: neoslhp/enginewhy-server
|
||||
container_name: server1
|
||||
tty: true
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: "4.0"
|
||||
memory: 8G
|
||||
depends_on:
|
||||
- load-balancer
|
||||
cap_add:
|
||||
- NET_ADMIN
|
||||
networks:
|
||||
external:
|
||||
ipv4_address: 192.67.67.2
|
||||
|
||||
server2-low-cpu:
|
||||
image: neoslhp/enginewhy-server
|
||||
container_name: server2
|
||||
tty: true
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: "2.0"
|
||||
memory: 4G
|
||||
depends_on:
|
||||
- load-balancer
|
||||
cap_add:
|
||||
- NET_ADMIN
|
||||
networks:
|
||||
external:
|
||||
ipv4_address: 192.67.67.3
|
||||
|
||||
client:
|
||||
image: neoslhp/enginewhy-ubuntu22.04
|
||||
container_name: client
|
||||
tty: true
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: "4.0"
|
||||
memory: 4G
|
||||
depends_on:
|
||||
- load-balancer
|
||||
cap_add:
|
||||
- NET_ADMIN
|
||||
networks:
|
||||
internal:
|
||||
ipv4_address: 172.67.67.2
|
||||
|
||||
networks:
|
||||
internal:
|
||||
driver: bridge
|
||||
ipam:
|
||||
config:
|
||||
- subnet: 172.67.67.0/24
|
||||
external:
|
||||
driver: bridge
|
||||
ipam:
|
||||
config:
|
||||
- subnet: 192.67.67.0/24
|
||||
|
||||
# Resources:
|
||||
# https://networkgeekstuff.com/networking/basic-load-balancer-scenarios-explained/
|
||||
# https://hub.docker.com/r/linuxserver/wireshark
|
||||
# https://www.wcse.org/WCSE_2018/W110.pdf
|
||||
# Deepseek
|
||||
1
src/backend/health.rs
Normal file
1
src/backend/health.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
93
src/backend/mod.rs
Normal file
93
src/backend/mod.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
pub mod health;
|
||||
|
||||
use core::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
// Physical server health statistics, used for certain load balancing algorithms
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ServerMetrics {
|
||||
pub cpu: f64,
|
||||
pub mem: f64,
|
||||
pub net: f64,
|
||||
pub io: f64,
|
||||
}
|
||||
|
||||
impl ServerMetrics {
|
||||
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
|
||||
self.cpu = cpu;
|
||||
self.mem = mem;
|
||||
self.net = net;
|
||||
self.io = io;
|
||||
}
|
||||
}
|
||||
|
||||
// A possible endpoint for a proxied connection.
|
||||
// Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>>
|
||||
#[derive(Debug)]
|
||||
pub struct Backend {
|
||||
pub id: String,
|
||||
pub address: SocketAddr,
|
||||
pub active_connections: AtomicUsize,
|
||||
pub metrics: Arc<RwLock<ServerMetrics>>,
|
||||
}
|
||||
|
||||
impl Backend {
|
||||
pub fn new(
|
||||
id: String,
|
||||
address: SocketAddr,
|
||||
server_metrics: Arc<RwLock<ServerMetrics>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id: id.to_string(),
|
||||
address,
|
||||
active_connections: AtomicUsize::new(0),
|
||||
metrics: server_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
// Ordering::Relaxed means the ops could be in any order, but since this
|
||||
// is just a metric, and we assume the underlying system is sane
|
||||
// enough not to behave poorly, so SeqCst is probably overkill.
|
||||
pub fn inc_connections(&self) {
|
||||
self.active_connections.fetch_add(1, Ordering::Relaxed);
|
||||
println!(
|
||||
"{} has {} connections open",
|
||||
self.id,
|
||||
self.active_connections.load(Ordering::Relaxed)
|
||||
);
|
||||
}
|
||||
|
||||
pub fn dec_connections(&self) {
|
||||
self.active_connections.fetch_sub(1, Ordering::Relaxed);
|
||||
println!(
|
||||
"{} has {} connections open",
|
||||
self.id,
|
||||
self.active_connections.load(Ordering::Relaxed)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Backend {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{} ({})", self.address, self.id)
|
||||
}
|
||||
}
|
||||
|
||||
// A set of endpoints that can be load balanced around.
|
||||
// Each Balancer owns one of these. Backend instances may be shared
|
||||
// with other Balancer instances, hence Arc<Backend>.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BackendPool {
|
||||
pub backends: Arc<Vec<Arc<Backend>>>,
|
||||
}
|
||||
|
||||
impl BackendPool {
|
||||
pub fn new(backends: Vec<Arc<Backend>>) -> Self {
|
||||
BackendPool {
|
||||
backends: Arc::new(backends),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,183 +1,227 @@
|
||||
use crate::netutils::Backend;
|
||||
use crate::backend::{Backend, BackendPool, ServerMetrics};
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use rand::prelude::*;
|
||||
use rand::rngs::SmallRng;
|
||||
use std::sync::Arc;
|
||||
use std::fmt::Debug;
|
||||
use std::fs::Metadata;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerMetrics {
|
||||
// metrics are percents (0..100)
|
||||
pub cpu: f64,
|
||||
pub mem: f64,
|
||||
pub net: f64,
|
||||
pub io: f64,
|
||||
#[derive(Debug)]
|
||||
struct AdaptiveNode {
|
||||
backend: Arc<Backend>,
|
||||
weight: f64,
|
||||
}
|
||||
|
||||
impl ServerMetrics {
|
||||
pub fn new() -> Self {
|
||||
ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 }
|
||||
}
|
||||
|
||||
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
|
||||
self.cpu = cpu;
|
||||
self.mem = mem;
|
||||
self.net = net;
|
||||
self.io = io;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerState {
|
||||
pub backend: Backend,
|
||||
pub metrics: ServerMetrics,
|
||||
pub weight: f64,
|
||||
}
|
||||
|
||||
impl ServerState {
|
||||
pub fn new(backend: Backend) -> Self {
|
||||
ServerState { backend, metrics: ServerMetrics::new(), weight: 1.0 }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AdaptiveBalancer {
|
||||
servers: Vec<ServerState>,
|
||||
// resource coefficients (cpu, mem, net, io) - sum to 1.0
|
||||
coeffs: [f64; 4],
|
||||
#[derive(Debug)]
|
||||
pub struct AdaptiveWeightBalancer {
|
||||
pool: Vec<AdaptiveNode>,
|
||||
coefficients: [f64; 4],
|
||||
alpha: f64,
|
||||
rng: SmallRng,
|
||||
}
|
||||
|
||||
impl AdaptiveBalancer {
|
||||
pub fn new(backends: Vec<Backend>, coeffs: [f64; 4], alpha: f64) -> Self {
|
||||
let servers = backends.into_iter().map(ServerState::new).collect();
|
||||
let rng = SmallRng::from_entropy();
|
||||
AdaptiveBalancer { servers, coeffs, alpha, rng }
|
||||
}
|
||||
impl AdaptiveWeightBalancer {
|
||||
pub fn new(pool: BackendPool, coefficients: [f64; 4], alpha: f64) -> Self {
|
||||
let nodes = pool
|
||||
.backends
|
||||
.iter()
|
||||
.map(|b| AdaptiveNode {
|
||||
backend: b.clone(),
|
||||
weight: 1f64,
|
||||
})
|
||||
.collect();
|
||||
|
||||
pub fn add_backend(&mut self, backend: Backend) {
|
||||
self.servers.push(ServerState::new(backend));
|
||||
}
|
||||
|
||||
/// Update metrics reported by a backend identified by its display/address.
|
||||
/// If the backend isn't found this is a no-op.
|
||||
pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) {
|
||||
for s in &mut self.servers {
|
||||
if s.backend.to_string() == backend_addr {
|
||||
s.metrics.update(cpu, mem, net, io);
|
||||
return;
|
||||
}
|
||||
AdaptiveWeightBalancer {
|
||||
pool: nodes,
|
||||
coefficients,
|
||||
alpha,
|
||||
rng: SmallRng::from_rng(&mut rand::rng()),
|
||||
}
|
||||
}
|
||||
|
||||
fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 {
|
||||
coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io
|
||||
pub fn metrics_to_weight(&self, metrics: &ServerMetrics) -> f64 {
|
||||
self.coefficients[0] * metrics.cpu
|
||||
+ self.coefficients[1] * metrics.mem
|
||||
+ self.coefficients[2] * metrics.net
|
||||
+ self.coefficients[3] * metrics.io
|
||||
}
|
||||
}
|
||||
|
||||
/// Choose a backend using weighted random selection based on current weights.
|
||||
/// Returns an Arc-wrapped Backend clone so callers can cheaply clone it.
|
||||
pub fn choose_backend(&mut self) -> Option<Arc<Backend>> {
|
||||
if self.servers.is_empty() {
|
||||
impl Balancer for AdaptiveWeightBalancer {
|
||||
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> {
|
||||
if self.pool.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Compute remaining capacity R_i = 100 - composite_load
|
||||
let rs: Vec<f64> = self.servers.iter().map(|s| {
|
||||
Self::metrics_to_weight(&s.metrics, &self.coeffs)
|
||||
}).collect();
|
||||
let ws: Vec<f64> = self.servers.iter().map(|s| s.weight).collect();
|
||||
let ls: Vec<u32> = self.servers.iter().map(|s| s.backend.current_load).collect();
|
||||
let mut r_sum = 0.0;
|
||||
let mut w_sum = 0.0;
|
||||
let mut l_sum = 0;
|
||||
|
||||
let r_sum: f64 = rs.iter().copied().sum::<f64>();
|
||||
let w_sum: f64 = ws.iter().copied().sum::<f64>().max(1e-12);
|
||||
let l_sum: u32 = ls.iter().copied().sum::<u32>();
|
||||
let threshold = self.alpha * (r_sum / w_sum);
|
||||
for node in &self.pool {
|
||||
if let Ok(health) = node.backend.metrics.read() {
|
||||
r_sum += self.metrics_to_weight(&health);
|
||||
}
|
||||
w_sum += node.weight;
|
||||
l_sum += node
|
||||
.backend
|
||||
.active_connections
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for (i, s) in self.servers.iter_mut().enumerate() {
|
||||
let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight };
|
||||
let safe_w_sum = w_sum.max(1e-12);
|
||||
let threshold = self.alpha * (r_sum / safe_w_sum);
|
||||
|
||||
for idx in 0..self.pool.len() {
|
||||
let node = &self.pool[idx];
|
||||
|
||||
if node.weight <= 0.001 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let risk = match node.backend.metrics.read() {
|
||||
Ok(h) => self.metrics_to_weight(&h),
|
||||
Err(_) => f64::MAX,
|
||||
};
|
||||
|
||||
let ratio = risk / node.weight;
|
||||
if ratio <= threshold {
|
||||
return Some(Arc::new(s.backend.clone()));
|
||||
return Some(node.backend.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// If any server satisfies Ri/Wi <= threshold, it means the server
|
||||
// is relatively overloaded and we must adjust its weight using
|
||||
// is relatively overloaded, and we must adjust its weight using
|
||||
// formula (6).
|
||||
let mut total_lwi = 0.0;
|
||||
let l_sum_f64 = l_sum as f64;
|
||||
|
||||
let lwi: Vec<f64> = self.servers.iter().enumerate().map(|(i, s)| {
|
||||
s.backend.current_load as f64 * w_sum / ws[i] * l_sum as f64
|
||||
}).collect();
|
||||
let a_lwi: f64 = lwi.iter().copied().sum::<f64>() / lwi.len() as f64;
|
||||
for (i, s) in self.servers.iter_mut().enumerate() {
|
||||
s.weight += 1 as f64 - lwi[i] / a_lwi;
|
||||
for node in &self.pool {
|
||||
let load = node
|
||||
.backend
|
||||
.active_connections
|
||||
.load(std::sync::atomic::Ordering::Relaxed) as f64;
|
||||
let weight = node.weight.max(1e-12);
|
||||
let lwi = load * (safe_w_sum / weight) * l_sum_f64;
|
||||
total_lwi += lwi;
|
||||
}
|
||||
|
||||
let avg_lwi = (total_lwi / self.pool.len() as f64).max(1e-12);
|
||||
|
||||
// Compute Li = Wi / Ri and choose server minimizing Li.
|
||||
let mut best_idx: Option<usize> = None;
|
||||
let mut best_li = u32::MAX;
|
||||
for (i, s) in self.servers.iter().enumerate() {
|
||||
let li = s.backend.current_load;
|
||||
if li < best_li {
|
||||
best_li = li;
|
||||
best_idx = Some(i);
|
||||
let mut best_backend: Option<Arc<Backend>> = None;
|
||||
let mut min_load = usize::MAX;
|
||||
|
||||
for node in &mut self.pool {
|
||||
let load = node
|
||||
.backend
|
||||
.active_connections
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let load_f64 = load as f64;
|
||||
let weight = node.weight.max(1e-12);
|
||||
|
||||
let lwi = load_f64 * (safe_w_sum / weight) * l_sum_f64;
|
||||
|
||||
let adj = 1.0 - (lwi / avg_lwi);
|
||||
node.weight += adj;
|
||||
|
||||
node.weight = node.weight.clamp(0.1, 100.0);
|
||||
if load < min_load {
|
||||
min_load = load;
|
||||
best_backend = Some(node.backend.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// If nothing chosen, fall back to random selection
|
||||
if best_idx.is_none() {
|
||||
let i = (self.rng.next_u32() as usize) % self.servers.len();
|
||||
return Some(Arc::new(self.servers[i].backend.clone()));
|
||||
match best_backend {
|
||||
Some(backend) => Some(backend),
|
||||
None => {
|
||||
let i = (self.rng.next_u32() as usize) % self.pool.len();
|
||||
Some(self.pool[i].backend.clone())
|
||||
}
|
||||
}
|
||||
|
||||
Some(Arc::new(self.servers[best_idx.unwrap()].backend.clone()))
|
||||
}
|
||||
|
||||
// Expose a snapshot of server weights (for monitoring/testing)
|
||||
pub fn snapshot_weights(&self) -> Vec<(String, f64)> {
|
||||
self.servers.iter().map(|s| (s.backend.to_string(), s.weight)).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::backend::Backend;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
fn backend_factory(id: &str, ip: &str, port: u16) -> Arc<Backend> {
|
||||
Arc::new(Backend::new(
|
||||
id.to_string(),
|
||||
SocketAddr::new(ip.parse().unwrap(), port),
|
||||
Arc::new(RwLock::new(ServerMetrics::default())),
|
||||
))
|
||||
}
|
||||
|
||||
fn unused_ctx() -> ConnectionInfo {
|
||||
ConnectionInfo {
|
||||
client_ip: ("0.0.0.0".parse().unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_weight_update_and_choose() {
|
||||
let backends = vec![Backend::new("127.0.0.1:1".to_string()), Backend::new("127.0.0.1:2".to_string())];
|
||||
let mut b = AdaptiveBalancer::new(backends, [0.5, 0.2, 0.2, 0.1], 0.5);
|
||||
let backends = BackendPool::new(vec![
|
||||
backend_factory("server-0", "127.0.0.1", 3000),
|
||||
backend_factory("server-1", "127.0.0.1", 3001),
|
||||
]);
|
||||
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.5, 0.2, 0.2, 0.1], 0.5);
|
||||
// initially equal weights
|
||||
let snaps = b.snapshot_weights();
|
||||
assert_eq!(snaps.len(), 2);
|
||||
// update one backend to be heavily loaded
|
||||
b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0);
|
||||
b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0);
|
||||
{
|
||||
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
|
||||
sm0_guard.update(90.0, 80.0, 10.0, 5.0);
|
||||
}
|
||||
{
|
||||
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
|
||||
sm1_guard.update(10.0, 5.0, 1.0, 1.0);
|
||||
}
|
||||
|
||||
// Choose backend: should pick the less loaded host (127.0.0.1:2)
|
||||
let chosen = b.choose_backend().expect("should choose a backend");
|
||||
let snaps2 = b.snapshot_weights();
|
||||
println!("{:?}, {:?}", snaps, snaps2);
|
||||
assert_eq!(chosen.to_string(), "127.0.0.1:2");
|
||||
// Choose backend: should pick the less loaded host server1
|
||||
let chosen = b
|
||||
.choose_backend(unused_ctx())
|
||||
.expect("should choose a backend");
|
||||
|
||||
let sm0: &ServerMetrics = &backends.backends.get(0).unwrap().metrics.read().unwrap();
|
||||
let sm1: &ServerMetrics = &backends.backends.get(1).unwrap().metrics.read().unwrap();
|
||||
println!("{:?}, {:?}", sm0, sm1);
|
||||
assert_eq!(chosen.id, "server-1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn choose_none_when_empty() {
|
||||
let mut b = AdaptiveBalancer::new(vec![], [0.5, 0.2, 0.2, 0.1], 0.5);
|
||||
assert!(b.choose_backend().is_none());
|
||||
let mut b =
|
||||
AdaptiveWeightBalancer::new(BackendPool::new(vec![]), [0.5, 0.2, 0.2, 0.1], 0.5);
|
||||
assert!(b.choose_backend(unused_ctx()).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ratio_triggers_immediate_selection() {
|
||||
// Arrange two servers where server 1 has composite load 0 and server 2 has composite load 100.
|
||||
// With alpha = 1.0 and two servers, threshold = 1.0 * (r_sum / w_sum) = 1.0 * (100 / 2) = 50.
|
||||
// Server 1 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately.
|
||||
let backends = vec![Backend::new("127.0.0.1:1".to_string()), Backend::new("127.0.0.1:2".to_string())];
|
||||
let mut b = AdaptiveBalancer::new(backends, [0.25, 0.25, 0.25, 0.25], 1.0);
|
||||
// Server 0 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately.
|
||||
let backends = BackendPool::new(vec![
|
||||
backend_factory("server-0", "127.0.0.1", 3000),
|
||||
backend_factory("server-1", "127.0.0.1", 3001),
|
||||
]);
|
||||
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.25, 0.25, 0.25, 0.25], 1.0);
|
||||
|
||||
b.update_metrics("127.0.0.1:1", 0.0, 0.0, 0.0, 0.0);
|
||||
b.update_metrics("127.0.0.1:2", 100.0, 100.0, 100.0, 100.0);
|
||||
{
|
||||
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
|
||||
sm0_guard.update(0.0, 0.0, 0.0, 0.0);
|
||||
}
|
||||
{
|
||||
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
|
||||
sm1_guard.update(100.0, 100.0, 100.0, 100.0);
|
||||
}
|
||||
|
||||
let chosen = b.choose_backend().expect("should choose a backend");
|
||||
assert_eq!(chosen.to_string(), "127.0.0.1:1");
|
||||
let chosen = b
|
||||
.choose_backend(unused_ctx())
|
||||
.expect("should choose a backend");
|
||||
assert_eq!(chosen.id, "server-0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -185,27 +229,38 @@ mod tests {
|
||||
// Arrange three servers with identical composite loads so no server satisfies Ri/Wi <= threshold
|
||||
// (set alpha < 1 so threshold < ratio). The implementation then falls back to picking the
|
||||
// server with minimum current_load
|
||||
let mut s1 = Backend::new("127.0.0.1:1".to_string());
|
||||
let mut s2 = Backend::new("127.0.0.1:2".to_string());
|
||||
let mut s3 = Backend::new("127.0.0.1:3".to_string());
|
||||
let backends = BackendPool::new(vec![
|
||||
backend_factory("server-0", "127.0.0.1", 3000),
|
||||
backend_factory("server-1", "127.0.0.1", 3001),
|
||||
backend_factory("server-2", "127.0.0.1", 3002),
|
||||
]);
|
||||
|
||||
// set current_loads (field expected to be public)
|
||||
s1.current_load = 10;
|
||||
s2.current_load = 5;
|
||||
s3.current_load = 20;
|
||||
|
||||
{
|
||||
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
|
||||
sm0_guard.update(10.0, 10.0, 10.0, 10.0);
|
||||
}
|
||||
{
|
||||
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
|
||||
sm1_guard.update(5.0, 5.0, 5.0, 5.0);
|
||||
}
|
||||
{
|
||||
let mut sm2_guard = backends.backends.get(2).unwrap().metrics.write().unwrap();
|
||||
sm2_guard.update(20.0, 20.0, 20.0, 20.0);
|
||||
}
|
||||
|
||||
// Use coeffs that only consider CPU so composite load is easy to reason about.
|
||||
let mut bal = AdaptiveBalancer::new(vec![s1, s2, s3], [1.0, 0.0, 0.0, 0.0], 0.5);
|
||||
let mut bal = AdaptiveWeightBalancer::new(backends.clone(), [1.0, 0.0, 0.0, 0.0], 0.5);
|
||||
|
||||
// set identical composite loads > 0 for all so ratio = x and threshold = alpha * x < x
|
||||
// you will have threshold = 25 for all 3 backend servers and ratio = 50
|
||||
// so that forces to choose the smallest current load backend
|
||||
bal.update_metrics("127.0.0.1:1", 50.0, 0.0, 0.0, 0.0);
|
||||
bal.update_metrics("127.0.0.1:2", 50.0, 0.0, 0.0, 0.0);
|
||||
bal.update_metrics("127.0.0.1:3", 50.0, 0.0, 0.0, 0.0);
|
||||
|
||||
let chosen = bal.choose_backend().expect("should choose a backend");
|
||||
// expect server with smallest current_load (127.0.0.1:2)
|
||||
assert_eq!(chosen.to_string(), "127.0.0.1:2");
|
||||
let chosen = bal
|
||||
.choose_backend(unused_ctx())
|
||||
.expect("should choose a backend");
|
||||
// expect server with smallest current_load server-1
|
||||
assert_eq!(chosen.id, "server-1");
|
||||
}
|
||||
}
|
||||
|
||||
108
src/balancer/ip_hashing.rs
Normal file
108
src/balancer/ip_hashing.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
use crate::backend::{Backend, BackendPool};
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SourceIPHash {
|
||||
pool: BackendPool,
|
||||
}
|
||||
|
||||
impl SourceIPHash {
|
||||
pub fn new(pool: BackendPool) -> SourceIPHash {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
impl Balancer for SourceIPHash {
|
||||
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> {
|
||||
let client_ip = ctx.client_ip;
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
client_ip.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
let idx = (hash as usize) % self.pool.backends.len();
|
||||
|
||||
Some(self.pool.backends[idx].clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::IpAddr;
|
||||
use crate::backend::ServerMetrics;
|
||||
|
||||
fn create_dummy_backends(count: usize) -> BackendPool {
|
||||
let mut backends = Vec::new();
|
||||
for i in 1..=count {
|
||||
backends.push(Arc::new(Backend::new(
|
||||
format!("backend {}", i),
|
||||
format!("127.0.0.1:808{}", i).parse().unwrap(),
|
||||
Arc::new(RwLock::new(ServerMetrics::default())),
|
||||
)));
|
||||
}
|
||||
BackendPool::new(backends)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_ip_always_selects_same_backend() {
|
||||
let backends = create_dummy_backends(3);
|
||||
let mut balancer = SourceIPHash::new(backends);
|
||||
|
||||
let client_ip: IpAddr = "192.168.1.100".parse().unwrap();
|
||||
|
||||
let first_choice = balancer.choose_backend(ConnectionInfo { client_ip });
|
||||
let second_choice = balancer.choose_backend(ConnectionInfo { client_ip });
|
||||
|
||||
assert!(first_choice.is_some());
|
||||
assert!(second_choice.is_some());
|
||||
|
||||
let first = first_choice.unwrap();
|
||||
let second = second_choice.unwrap();
|
||||
|
||||
assert_eq!(first.id, second.id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_different_ips_may_select_different_backends() {
|
||||
let backends = create_dummy_backends(2);
|
||||
let mut balancer = SourceIPHash::new(backends);
|
||||
|
||||
let ip1: IpAddr = "192.168.1.100".parse().unwrap();
|
||||
let choice1 = balancer.choose_backend(ConnectionInfo { client_ip: ip1 });
|
||||
|
||||
let ip2: IpAddr = "192.168.1.101".parse().unwrap();
|
||||
let choice2 = balancer.choose_backend(ConnectionInfo { client_ip: ip2 });
|
||||
|
||||
assert!(choice1.is_some());
|
||||
assert!(choice2.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hash_distribution_across_backends() {
|
||||
let pool = create_dummy_backends(3);
|
||||
let backends_ref = pool.backends.clone();
|
||||
|
||||
let mut balancer = SourceIPHash::new(pool);
|
||||
let mut distribution = [0, 0, 0];
|
||||
|
||||
// Test 30 different IPs
|
||||
for i in 0..30 {
|
||||
let client_ip: IpAddr = format!("192.168.1.{}", 100 + i).parse().unwrap();
|
||||
|
||||
if let Some(backend) = balancer.choose_backend(ConnectionInfo { client_ip }) {
|
||||
for (idx, b) in backends_ref.iter().enumerate() {
|
||||
if backend.id == b.id && backend.address == b.address {
|
||||
distribution[idx] += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(distribution[0] > 0, "Backend 0 received no traffic");
|
||||
assert!(distribution[1] > 0, "Backend 1 received no traffic");
|
||||
assert!(distribution[2] > 0, "Backend 2 received no traffic");
|
||||
}
|
||||
}
|
||||
1
src/balancer/least_connections.rs
Normal file
1
src/balancer/least_connections.rs
Normal file
@@ -0,0 +1 @@
|
||||
use super::*;
|
||||
@@ -1,2 +1,18 @@
|
||||
pub mod adaptive_weight;
|
||||
pub use adaptive_weight::AdaptiveBalancer;
|
||||
pub mod ip_hashing;
|
||||
pub mod least_connections;
|
||||
pub mod round_robin;
|
||||
|
||||
use crate::backend::Backend;
|
||||
use std::fmt::Debug;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ConnectionInfo {
|
||||
pub client_ip: IpAddr,
|
||||
}
|
||||
|
||||
pub trait Balancer: Debug + Send + Sync + 'static {
|
||||
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>>;
|
||||
}
|
||||
|
||||
32
src/balancer/round_robin.rs
Normal file
32
src/balancer/round_robin.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use crate::backend::{Backend, BackendPool};
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
// only the main thread for receiving connections should be
|
||||
// doing the load balancing. alternatively, each thread
|
||||
// that handles load balancing should get their own instance.
|
||||
#[derive(Debug)]
|
||||
pub struct RoundRobinBalancer {
|
||||
pool: BackendPool,
|
||||
index: usize,
|
||||
}
|
||||
|
||||
impl RoundRobinBalancer {
|
||||
pub fn new(pool: BackendPool) -> RoundRobinBalancer {
|
||||
Self { pool, index: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl Balancer for RoundRobinBalancer {
|
||||
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> {
|
||||
let backends = self.pool.backends.clone();
|
||||
if backends.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let backend = backends[self.index % backends.len()].clone();
|
||||
self.index = self.index.wrapping_add(1);
|
||||
Some(backend)
|
||||
}
|
||||
}
|
||||
120
src/config/loader.rs
Normal file
120
src/config/loader.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use cidr::IpCidr;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crate::backend::*;
|
||||
use crate::balancer::Balancer;
|
||||
use crate::balancer::adaptive_weight::AdaptiveWeightBalancer;
|
||||
use crate::balancer::round_robin::RoundRobinBalancer;
|
||||
use crate::config::*;
|
||||
|
||||
pub struct RoutingTable {
|
||||
pub balancers: Vec<Box<dyn Balancer + Send>>,
|
||||
pub entries: Vec<(IpCidr, usize)>,
|
||||
}
|
||||
|
||||
fn parse_client(s: &str) -> (IpCidr, u16) {
|
||||
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
|
||||
let (ip_part, port_part) = s.rsplit_once(':').expect("badly formatted client");
|
||||
|
||||
let port: u16 = port_part.parse().expect("bad port");
|
||||
let cidr: IpCidr = ip_part.parse().expect("bad ip/mask");
|
||||
|
||||
(cidr, port)
|
||||
}
|
||||
|
||||
pub type PortListeners = HashMap<u16, RoutingTable>;
|
||||
|
||||
pub fn build_lb(config: AppConfig) -> PortListeners {
|
||||
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
|
||||
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
|
||||
|
||||
for backend_cfg in config.backends {
|
||||
let ip: IpAddr = backend_cfg.ip.parse().unwrap();
|
||||
let addr = SocketAddr::new(ip, backend_cfg.port);
|
||||
|
||||
let health = healths
|
||||
.entry(ip)
|
||||
.or_insert_with(|| Arc::new(RwLock::new(ServerMetrics::default())))
|
||||
.clone();
|
||||
|
||||
let backend = Arc::new(Backend::new(backend_cfg.id.clone(), addr, health));
|
||||
|
||||
backends.insert(backend_cfg.id, backend);
|
||||
}
|
||||
|
||||
let mut listeners: PortListeners = HashMap::new();
|
||||
|
||||
for rule in config.rules {
|
||||
let mut target_backends = Vec::new();
|
||||
|
||||
for target_name in &rule.targets {
|
||||
if let Some(members) = config.clusters.get(target_name) {
|
||||
for member_id in members {
|
||||
if let Some(backend) = backends.get(member_id) {
|
||||
target_backends.push(backend.clone());
|
||||
}
|
||||
}
|
||||
} else if let Some(backend) = backends.get(target_name) {
|
||||
target_backends.push(backend.clone());
|
||||
} else {
|
||||
eprintln!("warning: target {} not found", target_name);
|
||||
}
|
||||
}
|
||||
|
||||
// possible for multiple targets of the same rule to have common backends.
|
||||
target_backends.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
target_backends.dedup_by(|a, b| a.id == b.id);
|
||||
|
||||
if target_backends.is_empty() {
|
||||
eprintln!("warning: rule has no valid targets, skipping.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// for each different client port on this rule, we unfortunately need to make a new
|
||||
// Balancer, since Balancer is not thread safe, requires &mut self for the backend
|
||||
// selection.
|
||||
// a good enough compromise to make a new one for each port, avoids using Mutex, at the
|
||||
// cost of minor penalty to load balancing "quality" when you have several client ports.
|
||||
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
|
||||
|
||||
for client_def in rule.clients {
|
||||
let (cidr, port) = parse_client(&client_def);
|
||||
port_groups.entry(port).or_default().push(cidr);
|
||||
}
|
||||
|
||||
for (port, cidrs) in port_groups {
|
||||
let table = listeners.entry(port).or_insert_with(|| RoutingTable {
|
||||
balancers: Vec::new(),
|
||||
entries: Vec::new(),
|
||||
});
|
||||
|
||||
let pool = BackendPool::new(target_backends.clone());
|
||||
|
||||
let balancer: Box<dyn Balancer + Send> = match &rule.strategy {
|
||||
LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)),
|
||||
LoadBalancerStrategy::Adaptive {
|
||||
coefficients,
|
||||
alpha,
|
||||
} => Box::new(AdaptiveWeightBalancer::new(pool, *coefficients, *alpha)),
|
||||
};
|
||||
|
||||
let balancer_idx = table.balancers.len();
|
||||
table.balancers.push(balancer);
|
||||
|
||||
for cidr in cidrs {
|
||||
table.entries.push((cidr, balancer_idx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sort to make most specific first, so that first match == longest prefix match
|
||||
for table in listeners.values_mut() {
|
||||
table
|
||||
.entries
|
||||
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length()));
|
||||
}
|
||||
|
||||
listeners
|
||||
}
|
||||
46
src/config/mod.rs
Normal file
46
src/config/mod.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
// config is written as a YAML file, the path will be passed to the program.
|
||||
//
|
||||
// the high level structure of the config is that we
|
||||
// first define the individual backends (ip + port) we are going
|
||||
// to load balance around.
|
||||
//
|
||||
// next we define some clusters, which are really more like a short
|
||||
// alias for a group of backends.
|
||||
//
|
||||
// next we define the rules. these are written as a list of
|
||||
// "ip/subnet:port" for the clients, and then a list of clusters
|
||||
// for which backends these are balanced around. and of course
|
||||
// specify which algorithm to use.
|
||||
pub mod loader;
|
||||
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AppConfig {
|
||||
pub backends: Vec<BackendConfig>,
|
||||
#[serde(default)]
|
||||
pub clusters: HashMap<String, Vec<String>>,
|
||||
pub rules: Vec<RuleConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct BackendConfig {
|
||||
pub id: String,
|
||||
pub ip: String,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RuleConfig {
|
||||
pub clients: Vec<String>,
|
||||
pub targets: Vec<String>,
|
||||
pub strategy: LoadBalancerStrategy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum LoadBalancerStrategy {
|
||||
RoundRobin,
|
||||
Adaptive { coefficients: [f64; 4], alpha: f64 },
|
||||
}
|
||||
119
src/main.rs
119
src/main.rs
@@ -1,56 +1,83 @@
|
||||
macro_rules! info {
|
||||
($($arg:tt)*) => {{
|
||||
print!("info: ");
|
||||
println!($($arg)*);
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! error {
|
||||
($($arg:tt)*) => {
|
||||
eprint!("error: ");
|
||||
eprintln!($($arg)*);
|
||||
};
|
||||
}
|
||||
|
||||
mod netutils;
|
||||
mod backend;
|
||||
mod balancer;
|
||||
mod config;
|
||||
mod proxy;
|
||||
|
||||
use anywho::Error;
|
||||
use netutils::{Backend, tunnel};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use crate::proxy::tcp::proxy_tcp_connection;
|
||||
use std::fs::File;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
let backends = Arc::new(vec![
|
||||
Backend::new("127.0.0.1:8081".to_string()),
|
||||
Backend::new("127.0.0.1:8082".to_string()),
|
||||
]);
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let f = File::open("config.yaml").expect("couldn't open config.yaml");
|
||||
let app_config: config::AppConfig = serde_saphyr::from_reader(f)?;
|
||||
|
||||
let current_index = Arc::new(Mutex::new(0));
|
||||
println!(
|
||||
"Loaded {} backends, {} rules.",
|
||||
app_config.backends.len(),
|
||||
app_config.rules.len()
|
||||
);
|
||||
|
||||
info!("enginewhy starting on 0.0.0.0:8080");
|
||||
info!("backends: {:?}", backends);
|
||||
let listeners = config::loader::build_lb(app_config);
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:8080").await?;
|
||||
|
||||
loop {
|
||||
let (client, addr) = listener.accept().await?;
|
||||
info!("new connection from {}", addr);
|
||||
|
||||
let backend = {
|
||||
let mut index = current_index.lock().await;
|
||||
let selected_backend = backends[*index].clone();
|
||||
*index = (*index + 1) % backends.len();
|
||||
selected_backend
|
||||
};
|
||||
|
||||
info!("routing client {} to backend {}", addr, backend);
|
||||
|
||||
if let Err(e) = tunnel(client, backend).await {
|
||||
error!("proxy failed for {}: {}", addr, e);
|
||||
}
|
||||
if listeners.is_empty() {
|
||||
eprintln!("its a lawless land");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut handles = Vec::new();
|
||||
|
||||
for (port, mut routing_table) in listeners {
|
||||
handles.push(tokio::spawn(async move {
|
||||
let addr = format!("0.0.0.0:{}", port);
|
||||
println!("Starting tcp listener on {}", addr);
|
||||
|
||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind port");
|
||||
|
||||
loop {
|
||||
let (socket, remote_addr) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
eprintln!("error: listener port {}: {}", port, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let remote_ip = remote_addr.ip();
|
||||
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let mut chosen_backend = None;
|
||||
|
||||
for (cidr, balancer_idx) in &mut routing_table.entries {
|
||||
if cidr.contains(&remote_ip) {
|
||||
let balancer = &mut routing_table.balancers[*balancer_idx];
|
||||
chosen_backend = balancer.choose_backend(ConnectionInfo {
|
||||
client_ip: remote_ip,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(backend) = chosen_backend {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
|
||||
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
println!("error: no matching rule for {} on port {}", remote_ip, port);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
let _ = h.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
use std::fmt;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use std::error::Error;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Backend {
|
||||
address: String,
|
||||
pub current_load : u32
|
||||
}
|
||||
|
||||
impl Backend {
|
||||
pub fn new(address: String) -> Self {
|
||||
Backend {
|
||||
address,
|
||||
current_load : 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Backend {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.address)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box<dyn Error>> {
|
||||
let backend_address: String = backend.address.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await {
|
||||
Ok(s) => {
|
||||
info!("connected to backend {backend_address}");
|
||||
s
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed connecting to backend {backend_address}: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let (mut read_client, mut write_client) = client_stream.into_split();
|
||||
let (mut read_backend, mut write_backend) = backend_stream.into_split();
|
||||
|
||||
let client_to_backend =
|
||||
tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await });
|
||||
|
||||
let backend_to_client =
|
||||
tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await });
|
||||
|
||||
let _ = tokio::join!(client_to_backend, backend_to_client);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
44
src/proxy/mod.rs
Normal file
44
src/proxy/mod.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use crate::backend::Backend;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
pub mod tcp;
|
||||
|
||||
pub struct ConnectionContext {
|
||||
pub id: u64,
|
||||
pub client_addr: SocketAddr,
|
||||
pub start_time: Instant,
|
||||
pub backend: Arc<Backend>,
|
||||
pub bytes_transferred: u64,
|
||||
}
|
||||
|
||||
impl ConnectionContext {
|
||||
pub fn new(id: u64, client_addr: SocketAddr, backend: Arc<Backend>) -> Self {
|
||||
backend.inc_connections();
|
||||
|
||||
Self {
|
||||
id,
|
||||
client_addr,
|
||||
start_time: Instant::now(),
|
||||
backend,
|
||||
bytes_transferred: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionContext {
|
||||
fn drop(&mut self) {
|
||||
self.backend.dec_connections();
|
||||
let duration = self.start_time.elapsed();
|
||||
|
||||
println!(
|
||||
"info: conn_id={} closed. client={} backend={} bytes={} duration={:.2?}",
|
||||
self.id,
|
||||
self.client_addr,
|
||||
self.backend.address,
|
||||
self.bytes_transferred,
|
||||
duration.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
30
src/proxy/tcp.rs
Normal file
30
src/proxy/tcp.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use crate::backend::Backend;
|
||||
use crate::proxy::ConnectionContext;
|
||||
use anywho::Error;
|
||||
use std::sync::Arc;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
pub async fn proxy_tcp_connection(
|
||||
connection_id: u64,
|
||||
mut client_stream: TcpStream,
|
||||
backend: Arc<Backend>,
|
||||
) -> Result<(), Error> {
|
||||
let client_addr = client_stream.peer_addr()?;
|
||||
|
||||
let mut ctx = ConnectionContext::new(connection_id, client_addr, backend.clone());
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
println!(
|
||||
"info: conn_id={} connecting to {}",
|
||||
connection_id, ctx.backend.id
|
||||
);
|
||||
|
||||
let mut backend_stream = TcpStream::connect(&backend.address).await?;
|
||||
|
||||
let (tx, rx) = io::copy_bidirectional(&mut client_stream, &mut backend_stream).await?;
|
||||
|
||||
ctx.bytes_transferred = tx + rx;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user