12 Commits

Author SHA1 Message Date
nnhphong
6d2b8115f3 add some more tests for lb 2025-12-09 22:01:16 -05:00
nnhphong
08cb522f93 the algorithm is working, but will need more test 2025-12-07 23:04:29 -05:00
nnhphong
742827b16f prune some comment 2025-12-07 21:59:43 -05:00
nnhphong
e19efee895 part of the algorithm, waiting for paul s and jeremy to complete refactoring 2025-12-07 21:56:27 -05:00
nnhphong
393c35bdf8 code for docker infra image 2025-12-07 14:09:38 -05:00
Ning Qi (Paul) Sun
cd23bfdf5a Merge pull request #1 from psun256/merge
Merge & Refactor
2025-12-06 16:09:51 -05:00
4cdf2db0c9 feat: improved logging 2025-12-06 02:16:40 -05:00
606880f928 feat: merged repos 2025-12-06 01:31:33 -05:00
19cd5b7f2a feat: modularized proxy 2025-12-06 00:21:53 -05:00
Ning Qi (Paul) Sun
25c3eb9511 gh action
gh action
2025-12-03 22:07:40 -05:00
psun256
e27bd2aaf0 layer 4 load balancing (round robin, hardcoded backends) 2025-11-29 21:46:26 -05:00
Phong Nguyen
1235d3611d Update README with load balancing details
Added a note about load balancing algorithms from a referenced paper.
2025-12-03 12:47:46 -05:00
20 changed files with 314 additions and 1438 deletions

535
Cargo.lock generated
View File

@@ -2,105 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 4
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "anstream"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "anywho" name = "anywho"
version = "0.1.2" version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2" checksum = "6136f131067f7e821582add37f0823fdba4dbdd8506833c1fd4b0e60a4ddaaf2"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arraydeque"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.10.0" version = "2.10.0"
@@ -119,197 +26,15 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "chacha20"
version = "0.10.0-rc.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cbf41c6ec3c4b9eaf7f8f5c11a72cd7d3aa0428125c20d5ef4d09907a0f019"
dependencies = [
"cfg-if",
"cpufeatures",
"rand_core",
]
[[package]]
name = "cidr"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60"
[[package]]
name = "clap"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "colorchoice"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "encoding_rs_io"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cc3c5651fb62ab8aa3103998dade57efdd028544bd300516baa31840c252a83"
dependencies = [
"encoding_rs",
]
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.3.4" version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"r-efi", "wasi",
"wasip2",
]
[[package]]
name = "hashbrown"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
]
[[package]]
name = "hashlink"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [
"hashbrown",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "inotify"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
dependencies = [
"bitflags 2.10.0",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
] ]
[[package]] [[package]]
@@ -317,13 +42,7 @@ name = "l4lb"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anywho", "anywho",
"arc-swap",
"cidr",
"clap",
"notify",
"rand", "rand",
"serde",
"serde-saphyr",
"tokio", "tokio",
] ]
@@ -342,18 +61,6 @@ dependencies = [
"scopeguard", "scopeguard",
] ]
[[package]]
name = "log"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "memchr"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.1.0" version = "1.1.0"
@@ -361,62 +68,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "nohash-hasher"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
[[package]]
name = "notify"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.10.0",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.60.2",
]
[[package]]
name = "notify-types"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "once_cell_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.5" version = "0.12.5"
@@ -436,7 +91,7 @@ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec 1.15.1", "smallvec",
"windows-link", "windows-link",
] ]
@@ -446,6 +101,15 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.103" version = "1.0.103"
@@ -465,27 +129,34 @@ dependencies = [
] ]
[[package]] [[package]]
name = "r-efi" name = "rand"
version = "5.3.0" version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]] [[package]]
name = "rand" name = "rand_chacha"
version = "0.10.0-rc.5" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"chacha20", "ppv-lite86",
"getrandom",
"rand_core", "rand_core",
] ]
[[package]] [[package]]
name = "rand_core" name = "rand_core"
version = "0.10.0-rc-2" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "104a23e4e8b77312a823b6b5613edbac78397e2f34320bc7ac4277013ec4478e" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
@@ -493,32 +164,7 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [ dependencies = [
"bitflags 2.10.0", "bitflags",
]
[[package]]
name = "ryu"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "saphyr-parser"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb771b59f6b1985d1406325ec28f97cfb14256abcec4fdfb37b36a1766d6af7"
dependencies = [
"arraydeque",
"hashlink",
] ]
[[package]] [[package]]
@@ -527,67 +173,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde-saphyr"
version = "0.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b9e06cddad47cc6214c0c456cf209b99a58b54223e7af2f6d4b88a5a9968499"
dependencies = [
"ahash",
"base64",
"encoding_rs_io",
"nohash-hasher",
"num-traits",
"ryu",
"saphyr-parser",
"serde",
"serde_json",
"smallvec 2.0.0-alpha.12",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
"serde_core",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.6" version = "1.4.6"
@@ -603,12 +188,6 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "smallvec"
version = "2.0.0-alpha.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef784004ca8777809dcdad6ac37629f0a97caee4c685fcea805278d81dd8b857"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.6.1" version = "0.6.1"
@@ -619,12 +198,6 @@ dependencies = [
"windows-sys 0.60.2", "windows-sys 0.60.2",
] ]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.110" version = "2.0.110"
@@ -670,52 +243,12 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.1+wasi-snapshot-preview1" version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
dependencies = [
"wit-bindgen",
]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "windows-link" name = "windows-link"
version = "0.2.1" version = "0.2.1"
@@ -805,12 +338,6 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "wit-bindgen"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
[[package]] [[package]]
name = "zerocopy" name = "zerocopy"
version = "0.8.31" version = "0.8.31"

View File

@@ -6,10 +6,4 @@ edition = "2024"
[dependencies] [dependencies]
anywho = "0.1.2" anywho = "0.1.2"
tokio = { version = "1.48.0", features = ["full"] } tokio = { version = "1.48.0", features = ["full"] }
rand = "0.10.0-rc.5" rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1.0.228", features = ["derive"] }
cidr = "0.3.1"
serde-saphyr = "0.0.10"
arc-swap = "1.7.1"
clap = { version = "4.5.53", features = ["derive"] }
notify = "8.2.0"

View File

@@ -5,25 +5,10 @@ Production't graden't load balancer.
## Todo ## Todo
- [ ] architecture astronauting - [ ] architecture astronauting
- balancer module
- just the algorithms i guess
-
- backend module
- manages the backend pool
- deals with health / load check
- BackendPool for all the backends stored together
- Backend for individual backends
- has some methods used by balancer module to pick a suitable backend
- proxy module
- all the different supported protocols to handle
- will create a session / stream context structure (ConnectionContext)
- not globally tracked (this might change for UDP!)
- mainly some metadata
- config module
- set up all the stuff or something
- [ ] stream / session handling (i think wrapper around tokio TcpStream) - [ ] stream / session handling (i think wrapper around tokio TcpStream)
- [ ] basic backend pooling - [ ] basic backend pooling
- [ ] layer 4 load balancing - [ ] layer 4 load balancing
- [ ] load balancing algorithm from the paper (https://www.wcse.org/WCSE_2018/W110.pdf)
## notes ## notes
tcp, for nginx (and haproxy, its similar): tcp, for nginx (and haproxy, its similar):
@@ -36,7 +21,7 @@ struct ngx_connection_s {
ngx_socket_t fd; ngx_socket_t fd;
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for dfferent platforms / protocol
ngx_send_pt send; // ditto ngx_send_pt send; // ditto
ngx_recv_chain_pt recv_chain; ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain; ngx_send_chain_pt send_chain;
@@ -120,9 +105,11 @@ process to load balance:
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually) - proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions) - cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
## Load balancer algorithm
### UDP - Choose a fixed weight coefficient for the resource parameter
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional - Spawn a thread on a load balancer to host the iperf server, used for new onboarding server connecting to the load balancer to measure their maximum bandwidth
but async write and read don't work on just regular datagrams, so probably not possible. - Spawn another thread for listening to resource update from connected server
- Update the comprehensive load sum from eq (1), update the formula in eq (2) to (5)
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections. - Choose alpha for eq (8), and run the algorithm to choose which server
- Extract the server from the server id using ```get_backend()```
- Use ```tunnel()``` to proxy the packet

BIN
W110.pdf Normal file
View File

Binary file not shown.

View File

@@ -1,34 +0,0 @@
backends:
- id: "srv-1"
ip: "127.0.0.1"
port: 8081
- id: "srv-2"
ip: "127.0.0.1"
port: 8082
clusters:
main-api:
- "srv-1"
- "srv-2"
priority-api:
- "srv-1"
rules:
- clients:
- "0.0.0.0/0:8080"
targets:
- "main-api"
strategy:
type: "RoundRobin"
- clients:
- "0.0.0.0/0:6767"
targets: # no issues with duplicate servers or clusters
- "priority-api"
- "priority-api"
- "priority-api"
strategy:
type: "Adaptive"
coefficients: [ 1.5, 1.0, 0.5, 0.1 ]
alpha: 0.75

View File

@@ -1,89 +0,0 @@
services:
# two-arm load balancer
load-balancer:
image: neoslhp/enginewhy-lb
container_name: load-balancer
tty: true
deploy:
resources:
limits:
cpus: "4.0"
memory: 8G
cap_add:
- NET_ADMIN
- SYS_ADMIN
networks:
internal:
ipv4_address: 172.67.67.67
external:
ipv4_address: 192.67.67.67
server1-high-cpu:
image: neoslhp/enginewhy-server
container_name: server1
tty: true
deploy:
resources:
limits:
cpus: "4.0"
memory: 8G
depends_on:
- load-balancer
cap_add:
- NET_ADMIN
networks:
external:
ipv4_address: 192.67.67.2
server2-low-cpu:
image: neoslhp/enginewhy-server
container_name: server2
tty: true
deploy:
resources:
limits:
cpus: "2.0"
memory: 4G
depends_on:
- load-balancer
cap_add:
- NET_ADMIN
networks:
external:
ipv4_address: 192.67.67.3
client:
image: neoslhp/enginewhy-ubuntu22.04
container_name: client
tty: true
deploy:
resources:
limits:
cpus: "4.0"
memory: 4G
depends_on:
- load-balancer
cap_add:
- NET_ADMIN
networks:
internal:
ipv4_address: 172.67.67.2
networks:
internal:
driver: bridge
ipam:
config:
- subnet: 172.67.67.0/24
external:
driver: bridge
ipam:
config:
- subnet: 192.67.67.0/24
# Resources:
# https://networkgeekstuff.com/networking/basic-load-balancer-scenarios-explained/
# https://hub.docker.com/r/linuxserver/wireshark
# https://www.wcse.org/WCSE_2018/W110.pdf
# Deepseek

View File

@@ -1 +0,0 @@

View File

@@ -1,93 +0,0 @@
pub mod health;
use core::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::{AtomicUsize, Ordering};
// Physical server health statistics, used for certain load balancing algorithms
#[derive(Debug, Default)]
pub struct ServerMetrics {
pub cpu: f64,
pub mem: f64,
pub net: f64,
pub io: f64,
}
impl ServerMetrics {
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu;
self.mem = mem;
self.net = net;
self.io = io;
}
}
// A possible endpoint for a proxied connection.
// Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>>
#[derive(Debug)]
pub struct Backend {
pub id: String,
pub address: SocketAddr,
pub active_connections: AtomicUsize,
pub metrics: Arc<RwLock<ServerMetrics>>,
}
impl Backend {
pub fn new(
id: String,
address: SocketAddr,
server_metrics: Arc<RwLock<ServerMetrics>>,
) -> Self {
Self {
id: id.to_string(),
address,
active_connections: AtomicUsize::new(0),
metrics: server_metrics,
}
}
// Ordering::Relaxed means the ops could be in any order, but since this
// is just a metric, and we assume the underlying system is sane
// enough not to behave poorly, so SeqCst is probably overkill.
pub fn inc_connections(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
println!(
"{} has {} connections open",
self.id,
self.active_connections.load(Ordering::Relaxed)
);
}
pub fn dec_connections(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
println!(
"{} has {} connections open",
self.id,
self.active_connections.load(Ordering::Relaxed)
);
}
}
impl fmt::Display for Backend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({})", self.address, self.id)
}
}
// A set of endpoints that can be load balanced around.
// Each Balancer owns one of these. Backend instances may be shared
// with other Balancer instances, hence Arc<Backend>.
#[derive(Clone, Debug)]
pub struct BackendPool {
pub backends: Arc<Vec<Arc<Backend>>>,
}
impl BackendPool {
pub fn new(backends: Vec<Arc<Backend>>) -> Self {
BackendPool {
backends: Arc::new(backends),
}
}
}

View File

@@ -1,144 +1,211 @@
use crate::backend::{Backend, BackendPool, ServerMetrics}; use crate::netutils::Backend;
use crate::balancer::{Balancer, ConnectionInfo};
use rand::prelude::*; use rand::prelude::*;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use std::fmt::Debug; use std::sync::Arc;
use std::fs::Metadata;
use std::sync::{Arc, RwLock};
#[derive(Debug)]
struct AdaptiveNode { #[derive(Debug, Clone)]
backend: Arc<Backend>, pub struct ServerMetrics {
weight: f64, // metrics are percents (0..100)
pub cpu: f64,
pub mem: f64,
pub net: f64,
pub io: f64,
} }
#[derive(Debug)] impl ServerMetrics {
pub struct AdaptiveWeightBalancer { pub fn new() -> Self {
pool: Vec<AdaptiveNode>, ServerMetrics { cpu: 0.0, mem: 0.0, net: 0.0, io: 0.0 }
coefficients: [f64; 4], }
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu;
self.mem = mem;
self.net = net;
self.io = io;
}
}
#[derive(Debug, Clone)]
pub struct ServerState {
pub backend: Backend,
pub metrics: ServerMetrics,
pub weight: f64,
}
impl ServerState {
pub fn new(backend: Backend) -> Self {
ServerState { backend, metrics: ServerMetrics::new(), weight: 1.0 }
}
}
pub struct AdaptiveBalancer {
servers: Vec<ServerState>,
// resource coefficients (cpu, mem, net, io) - sum to 1.0
coeffs: [f64; 4],
alpha: f64, alpha: f64,
rng: SmallRng, rng: SmallRng,
} }
impl AdaptiveWeightBalancer { impl AdaptiveBalancer {
pub fn new(pool: BackendPool, coefficients: [f64; 4], alpha: f64) -> Self { pub fn new(backends: Vec<Backend>, coeffs: [f64; 4], alpha: f64) -> Self {
let nodes = pool let servers = backends.into_iter().map(ServerState::new).collect();
.backends let rng = SmallRng::from_entropy();
.iter() AdaptiveBalancer { servers, coeffs, alpha, rng }
.map(|b| AdaptiveNode { }
backend: b.clone(),
weight: 0f64,
})
.collect();
AdaptiveWeightBalancer { pub fn add_backend(&mut self, backend: Backend) {
pool: nodes, self.servers.push(ServerState::new(backend));
coefficients, }
alpha,
rng: SmallRng::from_rng(&mut rand::rng()), /// Update metrics reported by a backend identified by its display/address.
/// If the backend isn't found this is a no-op.
pub fn update_metrics(&mut self, backend_addr: &str, cpu: f64, mem: f64, net: f64, io: f64) {
for s in &mut self.servers {
if s.backend.to_string() == backend_addr {
s.metrics.update(cpu, mem, net, io);
return;
}
} }
} }
pub fn metrics_to_weight(&self, metrics: &ServerMetrics) -> f64 { fn metrics_to_weight(metrics: &ServerMetrics, coeffs: &[f64; 4]) -> f64 {
self.coefficients[0] * metrics.cpu coeffs[0] * metrics.cpu + coeffs[1] * metrics.mem + coeffs[2] * metrics.net + coeffs[3] * metrics.io
+ self.coefficients[1] * metrics.mem
+ self.coefficients[2] * metrics.net
+ self.coefficients[3] * metrics.io
} }
}
impl Balancer for AdaptiveWeightBalancer { /// Choose a backend using weighted random selection based on current weights.
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> { /// Returns an Arc-wrapped Backend clone so callers can cheaply clone it.
if self.pool.is_empty() { pub fn choose_backend(&mut self) -> Option<Arc<Backend>> {
if self.servers.is_empty() {
return None; return None;
} }
// Compute remaining capacity R_i = 100 - composite_load // Compute remaining capacity R_i = 100 - composite_load
let mut r_sum = 0.0; let rs: Vec<f64> = self.servers.iter().map(|s| {
let mut w_sum = 0.0; Self::metrics_to_weight(&s.metrics, &self.coeffs)
let mut l_sum = 0; }).collect();
let ws: Vec<f64> = self.servers.iter().map(|s| s.weight).collect();
let ls: Vec<u32> = self.servers.iter().map(|s| s.backend.current_load).collect();
for node in &self.pool { let r_sum: f64 = rs.iter().copied().sum::<f64>();
if let Ok(health) = node.backend.metrics.read() { let w_sum: f64 = ws.iter().copied().sum::<f64>().max(1e-12);
r_sum += self.metrics_to_weight(&health); let l_sum: u32 = ls.iter().copied().sum::<u32>();
} let threshold = self.alpha * (r_sum / w_sum);
w_sum += node.weight;
l_sum += node
.backend
.active_connections
.load(std::sync::atomic::Ordering::Relaxed);
}
let safe_w_sum = w_sum.max(1e-12);
let threshold = self.alpha * (r_sum / safe_w_sum);
for idx in 0..self.pool.len() {
let node = &self.pool[idx];
if node.weight <= 0.001 {
continue;
}
let risk = match node.backend.metrics.read() {
Ok(h) => self.metrics_to_weight(&h),
Err(_) => f64::MAX,
};
let ratio = risk / node.weight;
for (i, s) in self.servers.iter_mut().enumerate() {
let ratio = if s.weight <= 0.0 { f64::INFINITY } else { rs[i] / s.weight };
if ratio <= threshold { if ratio <= threshold {
return Some(node.backend.clone()); return Some(Arc::new(s.backend.clone()));
} }
} }
// If any server satisfies Ri/Wi <= threshold, it means the server // If any server satisfies Ri/Wi <= threshold, it means the server
// is relatively overloaded, and we must adjust its weight using // is relatively overloaded and we must adjust its weight using
// formula (6). // formula (6).
let mut total_lwi = 0.0;
let l_sum_f64 = l_sum as f64;
for node in &self.pool { let lwi: Vec<f64> = self.servers.iter().enumerate().map(|(i, s)| {
let load = node s.backend.current_load as f64 * w_sum / ws[i] * l_sum as f64
.backend }).collect();
.active_connections let a_lwi: f64 = lwi.iter().copied().sum::<f64>() / lwi.len() as f64;
.load(std::sync::atomic::Ordering::Relaxed) as f64; for (i, s) in self.servers.iter_mut().enumerate() {
let weight = node.weight.max(1e-12); s.weight += 1 as f64 - lwi[i] / a_lwi;
let lwi = load * (safe_w_sum / weight) * l_sum_f64;
total_lwi += lwi;
} }
let avg_lwi = (total_lwi / self.pool.len() as f64).max(1e-12);
// Compute Li = Wi / Ri and choose server minimizing Li. // Compute Li = Wi / Ri and choose server minimizing Li.
let mut best_backend: Option<Arc<Backend>> = None; let mut best_idx: Option<usize> = None;
let mut min_load = usize::MAX; let mut best_li = u32::MAX;
for (i, s) in self.servers.iter().enumerate() {
for node in &mut self.pool { let li = s.backend.current_load;
let load = node if li < best_li {
.backend best_li = li;
.active_connections best_idx = Some(i);
.load(std::sync::atomic::Ordering::Relaxed);
let load_f64 = load as f64;
let weight = node.weight.max(1e-12);
let lwi = load_f64 * (safe_w_sum / weight) * l_sum_f64;
let adj = 1.0 - (lwi / avg_lwi);
node.weight += adj;
node.weight = node.weight.clamp(0.1, 100.0);
if load < min_load {
min_load = load;
best_backend = Some(node.backend.clone());
} }
} }
match best_backend { // If nothing chosen, fall back to random selection
Some(backend) => Some(backend), if best_idx.is_none() {
None => { let i = (self.rng.next_u32() as usize) % self.servers.len();
let i = (self.rng.next_u32() as usize) % self.pool.len(); return Some(Arc::new(self.servers[i].backend.clone()));
Some(self.pool[i].backend.clone())
} }
Some(Arc::new(self.servers[best_idx.unwrap()].backend.clone()))
} }
// Expose a snapshot of server weights (for monitoring/testing)
pub fn snapshot_weights(&self) -> Vec<(String, f64)> {
self.servers.iter().map(|s| (s.backend.to_string(), s.weight)).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_weight_update_and_choose() {
let backends = vec![Backend::new("127.0.0.1:1".to_string()), Backend::new("127.0.0.1:2".to_string())];
let mut b = AdaptiveBalancer::new(backends, [0.5, 0.2, 0.2, 0.1], 0.5);
// initially equal weights
let snaps = b.snapshot_weights();
assert_eq!(snaps.len(), 2);
// update one backend to be heavily loaded
b.update_metrics("127.0.0.1:1", 90.0, 80.0, 10.0, 5.0);
b.update_metrics("127.0.0.1:2", 10.0, 5.0, 1.0, 1.0);
// Choose backend: should pick the less loaded host (127.0.0.1:2)
let chosen = b.choose_backend().expect("should choose a backend");
let snaps2 = b.snapshot_weights();
println!("{:?}, {:?}", snaps, snaps2);
assert_eq!(chosen.to_string(), "127.0.0.1:2");
}
#[test]
fn choose_none_when_empty() {
let mut b = AdaptiveBalancer::new(vec![], [0.5, 0.2, 0.2, 0.1], 0.5);
assert!(b.choose_backend().is_none());
}
#[test]
fn ratio_triggers_immediate_selection() {
// Arrange two servers where server 1 has composite load 0 and server 2 has composite load 100.
// With alpha = 1.0 and two servers, threshold = 1.0 * (r_sum / w_sum) = 1.0 * (100 / 2) = 50.
// Server 1 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately.
let backends = vec![Backend::new("127.0.0.1:1".to_string()), Backend::new("127.0.0.1:2".to_string())];
let mut b = AdaptiveBalancer::new(backends, [0.25, 0.25, 0.25, 0.25], 1.0);
b.update_metrics("127.0.0.1:1", 0.0, 0.0, 0.0, 0.0);
b.update_metrics("127.0.0.1:2", 100.0, 100.0, 100.0, 100.0);
let chosen = b.choose_backend().expect("should choose a backend");
assert_eq!(chosen.to_string(), "127.0.0.1:1");
}
#[test]
fn choose_min_current_load_when_no_ratio() {
// Arrange three servers with identical composite loads so no server satisfies Ri/Wi <= threshold
// (set alpha < 1 so threshold < ratio). The implementation then falls back to picking the
// server with minimum current_load
let mut s1 = Backend::new("127.0.0.1:1".to_string());
let mut s2 = Backend::new("127.0.0.1:2".to_string());
let mut s3 = Backend::new("127.0.0.1:3".to_string());
// set current_loads (field expected to be public)
s1.current_load = 10;
s2.current_load = 5;
s3.current_load = 20;
// Use coeffs that only consider CPU so composite load is easy to reason about.
let mut bal = AdaptiveBalancer::new(vec![s1, s2, s3], [1.0, 0.0, 0.0, 0.0], 0.5);
// set identical composite loads > 0 for all so ratio = x and threshold = alpha * x < x
// you will have threshold = 25 for all 3 backend servers and ratio = 50
// so that forces to choose the smallest current load backend
bal.update_metrics("127.0.0.1:1", 50.0, 0.0, 0.0, 0.0);
bal.update_metrics("127.0.0.1:2", 50.0, 0.0, 0.0, 0.0);
bal.update_metrics("127.0.0.1:3", 50.0, 0.0, 0.0, 0.0);
let chosen = bal.choose_backend().expect("should choose a backend");
// expect server with smallest current_load (127.0.0.1:2)
assert_eq!(chosen.to_string(), "127.0.0.1:2");
} }
} }

View File

@@ -1,108 +0,0 @@
use crate::backend::{Backend, BackendPool};
use crate::balancer::{Balancer, ConnectionInfo};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::{Arc, RwLock};
#[derive(Debug)]
pub struct SourceIPHash {
pool: BackendPool,
}
impl SourceIPHash {
pub fn new(pool: BackendPool) -> SourceIPHash {
Self { pool }
}
}
impl Balancer for SourceIPHash {
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> {
let client_ip = ctx.client_ip;
let mut hasher = DefaultHasher::new();
client_ip.hash(&mut hasher);
let hash = hasher.finish();
let idx = (hash as usize) % self.pool.backends.len();
Some(self.pool.backends[idx].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::IpAddr;
use crate::backend::ServerMetrics;
fn create_dummy_backends(count: usize) -> BackendPool {
let mut backends = Vec::new();
for i in 1..=count {
backends.push(Arc::new(Backend::new(
format!("backend {}", i),
format!("127.0.0.1:808{}", i).parse().unwrap(),
Arc::new(RwLock::new(ServerMetrics::default())),
)));
}
BackendPool::new(backends)
}
#[test]
fn test_same_ip_always_selects_same_backend() {
let backends = create_dummy_backends(3);
let mut balancer = SourceIPHash::new(backends);
let client_ip: IpAddr = "192.168.1.100".parse().unwrap();
let first_choice = balancer.choose_backend(ConnectionInfo { client_ip });
let second_choice = balancer.choose_backend(ConnectionInfo { client_ip });
assert!(first_choice.is_some());
assert!(second_choice.is_some());
let first = first_choice.unwrap();
let second = second_choice.unwrap();
assert_eq!(first.id, second.id);
}
#[test]
fn test_different_ips_may_select_different_backends() {
let backends = create_dummy_backends(2);
let mut balancer = SourceIPHash::new(backends);
let ip1: IpAddr = "192.168.1.100".parse().unwrap();
let choice1 = balancer.choose_backend(ConnectionInfo { client_ip: ip1 });
let ip2: IpAddr = "192.168.1.101".parse().unwrap();
let choice2 = balancer.choose_backend(ConnectionInfo { client_ip: ip2 });
assert!(choice1.is_some());
assert!(choice2.is_some());
}
#[test]
fn test_hash_distribution_across_backends() {
let pool = create_dummy_backends(3);
let backends_ref = pool.backends.clone();
let mut balancer = SourceIPHash::new(pool);
let mut distribution = [0, 0, 0];
// Test 30 different IPs
for i in 0..30 {
let client_ip: IpAddr = format!("192.168.1.{}", 100 + i).parse().unwrap();
if let Some(backend) = balancer.choose_backend(ConnectionInfo { client_ip }) {
for (idx, b) in backends_ref.iter().enumerate() {
if backend.id == b.id && backend.address == b.address {
distribution[idx] += 1;
break;
}
}
}
}
assert!(distribution[0] > 0, "Backend 0 received no traffic");
assert!(distribution[1] > 0, "Backend 1 received no traffic");
assert!(distribution[2] > 0, "Backend 2 received no traffic");
}
}

View File

@@ -1 +0,0 @@
use super::*;

View File

@@ -1,18 +1,2 @@
pub mod adaptive_weight; pub mod adaptive_weight;
pub mod ip_hashing; pub use adaptive_weight::AdaptiveBalancer;
pub mod least_connections;
pub mod round_robin;
use crate::backend::Backend;
use std::fmt::Debug;
use std::net::IpAddr;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub client_ip: IpAddr,
}
pub trait Balancer: Debug + Send + Sync + 'static {
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>>;
}

0
src/balancer/random.rs Normal file
View File

View File

@@ -1,32 +0,0 @@
use crate::backend::{Backend, BackendPool};
use crate::balancer::{Balancer, ConnectionInfo};
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
// only the main thread for receiving connections should be
// doing the load balancing. alternatively, each thread
// that handles load balancing should get their own instance.
#[derive(Debug)]
pub struct RoundRobinBalancer {
pool: BackendPool,
index: usize,
}
impl RoundRobinBalancer {
pub fn new(pool: BackendPool) -> RoundRobinBalancer {
Self { pool, index: 0 }
}
}
impl Balancer for RoundRobinBalancer {
fn choose_backend(&mut self, ctx: ConnectionInfo) -> Option<Arc<Backend>> {
let backends = self.pool.backends.clone();
if backends.is_empty() {
return None;
}
let backend = backends[self.index % backends.len()].clone();
self.index = self.index.wrapping_add(1);
Some(backend)
}
}

View File

@@ -1,125 +0,0 @@
use cidr::IpCidr;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
use crate::backend::*;
use crate::balancer::Balancer;
use crate::balancer::adaptive_weight::AdaptiveWeightBalancer;
use crate::balancer::round_robin::RoundRobinBalancer;
use crate::config::*;
pub struct RoutingTable {
pub balancers: Vec<Box<dyn Balancer + Send>>,
pub entries: Vec<(IpCidr, usize)>,
}
pub type PortListeners = HashMap<u16, RoutingTable>;
fn parse_client(s: &str) -> Result<(IpCidr, u16), String> {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':')
.ok_or_else(|| format!("badly formatted client: {}", s))?;
let port = port_part.parse()
.map_err(|_| format!("bad port: {}", s))?;
let cidr = ip_part.parse()
.map_err(|_| format!("bad ip/mask: {}", s))?;
Ok((cidr, port))
}
pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>), String> {
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
for backend_cfg in config.backends {
let ip: IpAddr = backend_cfg.ip.parse()
.map_err(|_| format!("bad ip: {}", backend_cfg.ip))?;
let addr = SocketAddr::new(ip, backend_cfg.port);
let health = healths
.entry(ip)
.or_insert_with(|| Arc::new(RwLock::new(ServerMetrics::default())))
.clone();
let backend = Arc::new(Backend::new(backend_cfg.id.clone(), addr, health));
backends.insert(backend_cfg.id, backend);
}
let mut listeners: PortListeners = HashMap::new();
for rule in config.rules {
let mut target_backends = Vec::new();
for target_name in &rule.targets {
if let Some(members) = config.clusters.get(target_name) {
for member_id in members {
if let Some(backend) = backends.get(member_id) {
target_backends.push(backend.clone());
}
}
} else if let Some(backend) = backends.get(target_name) {
target_backends.push(backend.clone());
} else {
eprintln!("warning: target {} not found", target_name);
}
}
// possible for multiple targets of the same rule to have common backends.
target_backends.sort_by(|a, b| a.id.cmp(&b.id));
target_backends.dedup_by(|a, b| a.id == b.id);
if target_backends.is_empty() {
eprintln!("warning: rule has no valid targets, skipping.");
continue;
}
// for each different client port on this rule, we unfortunately need to make a new
// Balancer, since Balancer is not thread safe, requires &mut self for the backend
// selection.
// a good enough compromise to make a new one for each port, avoids using Mutex, at the
// cost of minor penalty to load balancing "quality" when you have several client ports.
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
for client_def in rule.clients {
let (cidr, port) = parse_client(&client_def)?;
port_groups.entry(port).or_default().push(cidr);
}
for (port, cidrs) in port_groups {
let table = listeners.entry(port).or_insert_with(|| RoutingTable {
balancers: Vec::new(),
entries: Vec::new(),
});
let pool = BackendPool::new(target_backends.clone());
let balancer: Box<dyn Balancer + Send> = match &rule.strategy {
LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)),
LoadBalancerStrategy::Adaptive {
coefficients,
alpha,
} => Box::new(AdaptiveWeightBalancer::new(pool, *coefficients, *alpha)),
};
let balancer_idx = table.balancers.len();
table.balancers.push(balancer);
for cidr in cidrs {
table.entries.push((cidr, balancer_idx));
}
}
}
// sort to make most specific first, so that first match == longest prefix match
for table in listeners.values_mut() {
table
.entries
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length()));
}
Ok((listeners, healths))
}

View File

@@ -1,46 +0,0 @@
// config is written as a YAML file, the path will be passed to the program.
//
// the high level structure of the config is that we
// first define the individual backends (ip + port) we are going
// to load balance around.
//
// next we define some clusters, which are really more like a short
// alias for a group of backends.
//
// next we define the rules. these are written as a list of
// "ip/subnet:port" for the clients, and then a list of clusters
// for which backends these are balanced around. and of course
// specify which algorithm to use.
pub mod loader;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Debug, Deserialize)]
pub struct AppConfig {
pub backends: Vec<BackendConfig>,
#[serde(default)]
pub clusters: HashMap<String, Vec<String>>,
pub rules: Vec<RuleConfig>,
}
#[derive(Debug, Deserialize)]
pub struct BackendConfig {
pub id: String,
pub ip: String,
pub port: u16,
}
#[derive(Debug, Deserialize)]
pub struct RuleConfig {
pub clients: Vec<String>,
pub targets: Vec<String>,
pub strategy: LoadBalancerStrategy,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
pub enum LoadBalancerStrategy {
RoundRobin,
Adaptive { coefficients: [f64; 4], alpha: f64 },
}

View File

@@ -1,192 +1,56 @@
mod backend; macro_rules! info {
($($arg:tt)*) => {{
print!("info: ");
println!($($arg)*);
}};
}
macro_rules! error {
($($arg:tt)*) => {
eprint!("error: ");
eprintln!($($arg)*);
};
}
mod netutils;
mod balancer; mod balancer;
mod config;
mod proxy;
use std::collections::HashMap;
use crate::balancer::{ConnectionInfo};
use crate::proxy::tcp::proxy_tcp_connection;
use std::fs::File;
use std::path::PathBuf;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use anywho::Error; use anywho::Error;
use netutils::{Backend, tunnel};
use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::Mutex;
use crate::backend::ServerMetrics;
use crate::config::loader::{build_lb, RoutingTable};
use notify::{Watcher, RecursiveMode, Event};
use clap::Parser;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
struct ProgramState {
tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Error> {
let args = Args::parse(); let backends = Arc::new(vec![
Backend::new("127.0.0.1:8081".to_string()),
Backend::new("127.0.0.1:8082".to_string()),
]);
if !args.config.is_file() { let current_index = Arc::new(Mutex::new(0));
eprintln!("config file not found or not accessible");
std::process::exit(1);
}
println!("reading config from {:?}", args.config); info!("enginewhy starting on 0.0.0.0:8080");
info!("backends: {:?}", backends);
let state = Arc::new(Mutex::new(ProgramState { let listener = TcpListener::bind("0.0.0.0:8080").await?;
tx_rt_map: HashMap::new(),
healths: HashMap::new(),
}));
if let Err(e) = load_config(&args.config, state.clone()).await {
eprintln!("config file loading failed: {}", e);
}
let config_path = args.config.clone();
let state_clone = state.clone();
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(1);
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
if event.kind.is_modify() {
let _ = tx.blocking_send(());
}
}
}).unwrap();
watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap();
println!("watching for changes to {:?}", config_path);
while rx.recv().await.is_some() {
if let Err(e) = load_config(&config_path, state_clone.clone()).await {
eprintln!("loading config failed: {}", e);
}
}
});
loop { tokio::time::sleep(Duration::from_hours(1)).await; }
}
async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> {
let f = File::open(path)?;
let app_config: config::AppConfig = match serde_saphyr::from_reader(f) {
Ok(app_config) => app_config,
Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); }
};
println!(
"Loaded config, with {} backends, {} rules.",
app_config.backends.len(),
app_config.rules.len()
);
let (mut listeners, health_monitors) = match build_lb(app_config) {
Ok(v) => v,
Err(e) => {
eprintln!("config has logical errors: {}", e);
return Ok(());
}
};
let mut prog_state = state.lock().unwrap();
let ports_to_remove: Vec<u16> = prog_state.tx_rt_map
.keys()
.cloned()
.filter(|port| !listeners.contains_key(port))
.collect();
for port in ports_to_remove {
prog_state.tx_rt_map.remove(&port);
}
prog_state.healths = health_monitors;
for (port, routing_table) in listeners.drain() {
if let Some(x) = prog_state.tx_rt_map.get_mut(&port) {
x.send(routing_table)?;
println!("updated rules on port {}", port);
} else {
let (tx_rt, rx_rt) = mpsc::unbounded_channel();
prog_state.tx_rt_map.insert(port, tx_rt);
tokio::spawn(run_listener(port, rx_rt, routing_table));
}
}
println!("reload complete");
Ok(())
}
async fn run_listener(
port: u16,
mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>,
mut current_table: RoutingTable
) {
let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr);
let listener = TcpListener::bind(&addr).await.expect("Failed to bind port");
loop { loop {
tokio::select! { let (client, addr) = listener.accept().await?;
msg = rx_rt.recv() => { info!("new connection from {}", addr);
match msg {
Some(new_table) => {
current_table = new_table;
}
None => {
println!("Unbinding listener on port {}", port);
break;
}
}
}
accept_result = listener.accept() => {
match accept_result {
Ok((socket, remote_addr)) => {
let remote_ip = remote_addr.ip();
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
let mut chosen_backend = None; let backend = {
let mut index = current_index.lock().await;
let selected_backend = backends[*index].clone();
*index = (*index + 1) % backends.len();
selected_backend
};
for (cidr, balancer_idx) in &mut current_table.entries { info!("routing client {} to backend {}", addr, backend);
if cidr.contains(&remote_ip) {
let balancer = &mut current_table.balancers[*balancer_idx];
chosen_backend = balancer.choose_backend(ConnectionInfo {
client_ip: remote_ip,
});
break;
}
}
if let Some(backend) = chosen_backend { if let Err(e) = tunnel(client, backend).await {
tokio::spawn(async move { error!("proxy failed for {}: {}", addr, e);
if let Err(e) = proxy_tcp_connection(conn_id, socket, backend).await {
eprintln!("error: conn_id={} proxy failed: {}", conn_id, e);
}
});
} else {
println!("error: no matching rule for {} on port {}", remote_ip, port);
}
}
Err(e) => {
eprintln!("error: listener port {}: {}", port, e);
continue;
}
}
}
} }
} }
} }

56
src/netutils.rs Normal file
View File

@@ -0,0 +1,56 @@
use std::fmt;
use tokio::io;
use tokio::net::TcpStream;
use std::error::Error;
#[derive(Clone, Debug)]
pub struct Backend {
address: String,
pub current_load : u32
}
impl Backend {
pub fn new(address: String) -> Self {
Backend {
address,
current_load : 0
}
}
}
impl fmt::Display for Backend {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.address)
}
}
pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box<dyn Error>> {
let backend_address: String = backend.address.clone();
tokio::spawn(async move {
let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await {
Ok(s) => {
info!("connected to backend {backend_address}");
s
}
Err(e) => {
error!("failed connecting to backend {backend_address}: {e}");
return;
}
};
let (mut read_client, mut write_client) = client_stream.into_split();
let (mut read_backend, mut write_backend) = backend_stream.into_split();
let client_to_backend =
tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await });
let backend_to_client =
tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await });
let _ = tokio::join!(client_to_backend, backend_to_client);
});
Ok(())
}

View File

@@ -1,44 +0,0 @@
use crate::backend::Backend;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
pub mod tcp;
pub struct ConnectionContext {
pub id: u64,
pub client_addr: SocketAddr,
pub start_time: Instant,
pub backend: Arc<Backend>,
pub bytes_transferred: u64,
}
impl ConnectionContext {
pub fn new(id: u64, client_addr: SocketAddr, backend: Arc<Backend>) -> Self {
backend.inc_connections();
Self {
id,
client_addr,
start_time: Instant::now(),
backend,
bytes_transferred: 0,
}
}
}
impl Drop for ConnectionContext {
fn drop(&mut self) {
self.backend.dec_connections();
let duration = self.start_time.elapsed();
println!(
"info: conn_id={} closed. client={} backend={} bytes={} duration={:.2?}",
self.id,
self.client_addr,
self.backend.address,
self.bytes_transferred,
duration.as_secs_f64()
);
}
}

View File

@@ -1,30 +0,0 @@
use crate::backend::Backend;
use crate::proxy::ConnectionContext;
use anywho::Error;
use std::sync::Arc;
use tokio::io;
use tokio::net::TcpStream;
pub async fn proxy_tcp_connection(
connection_id: u64,
mut client_stream: TcpStream,
backend: Arc<Backend>,
) -> Result<(), Error> {
let client_addr = client_stream.peer_addr()?;
let mut ctx = ConnectionContext::new(connection_id, client_addr, backend.clone());
#[cfg(debug_assertions)]
println!(
"info: conn_id={} connecting to {}",
connection_id, ctx.backend.id
);
let mut backend_stream = TcpStream::connect(&backend.address).await?;
let (tx, rx) = io::copy_bidirectional(&mut client_stream, &mut backend_stream).await?;
ctx.bytes_transferred = tx + rx;
Ok(())
}