16 Commits

Author SHA1 Message Date
psun256
b3cadd334c added example 2025-12-10 23:54:54 -05:00
nnhphong
0faa83f271 add report 2025-12-10 23:51:08 -05:00
Jeremy Janella
a74d7e1b1a Delete README.md 2025-12-10 23:28:09 -05:00
psun256
751e7f209d changed shortest prefix match to longest prefix match; update dockerfile 2025-12-10 22:51:57 -05:00
psun256
74e329b17c removed separate port field from config for backends 2025-12-10 20:40:27 -05:00
Ning Qi (Paul) Sun
b9eca8a56e Merge pull request #3 from psun256/healthcheck
Healthcheck
2025-12-10 19:01:40 -05:00
Ning Qi (Paul) Sun
5e165b0c6a Merge branch 'main' into healthcheck 2025-12-10 18:59:20 -05:00
psun256
09966c1e85 applied hot reload to health check logic 2025-12-10 18:58:47 -05:00
Phong Nguyen
8d3ce72649 Merge pull request #4 from psun256/tests/adaptive_weights
feat: adaptive weight tests
2025-12-10 18:29:37 -05:00
nnhphong
7a68e4b17b fix the test 2025-12-10 18:26:52 -05:00
nnhphong
a9db727bde fix the test, all pass now 2025-12-10 18:08:54 -05:00
3b96043dc6 feat: adaptive weight tests 2025-12-10 17:25:16 -05:00
Ning Qi (Paul) Sun
9c24f172e9 Merge branch 'main' into healthcheck 2025-12-10 16:49:16 -05:00
Ning Qi (Paul) Sun
047753ef70 Merge pull request #2 from psun256/auto-reload
added config hot reload
2025-12-10 16:38:38 -05:00
psun256
b5547acb42 fix server metrics framing issue 2025-12-10 16:28:51 -05:00
nnhphong
99e39e82c1 too much changes, idek anymore 2025-12-10 15:36:51 -05:00
24 changed files with 994 additions and 224 deletions

View File

Binary file not shown.

467
Cargo.lock generated
View File

@@ -9,12 +9,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"getrandom", "getrandom 0.3.4",
"once_cell", "once_cell",
"version_check", "version_check",
"zerocopy", "zerocopy",
] ]
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.6.21" version = "0.6.21"
@@ -65,6 +83,12 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "anyhow"
version = "1.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]] [[package]]
name = "anywho" name = "anywho"
version = "0.1.2" version = "0.1.2"
@@ -107,12 +131,34 @@ version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
[[package]]
name = "bumpalo"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.11.0" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]]
name = "cc"
version = "1.2.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215"
dependencies = [
"find-msvc-tools",
"shlex",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.4" version = "1.0.4"
@@ -127,7 +173,20 @@ checksum = "99cbf41c6ec3c4b9eaf7f8f5c11a72cd7d3aa0428125c20d5ef4d09907a0f019"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cpufeatures", "cpufeatures",
"rand_core", "rand_core 0.10.0-rc-2",
]
[[package]]
name = "chrono"
version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-link",
] ]
[[package]] [[package]]
@@ -182,6 +241,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.17" version = "0.2.17"
@@ -209,6 +274,35 @@ dependencies = [
"encoding_rs", "encoding_rs",
] ]
[[package]]
name = "env_filter"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]] [[package]]
name = "foldhash" name = "foldhash"
version = "0.1.5" version = "0.1.5"
@@ -224,6 +318,17 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "getrandom"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.3.4" version = "0.3.4"
@@ -260,6 +365,41 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
]
[[package]]
name = "iana-time-zone"
version = "0.1.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "inotify" name = "inotify"
version = "0.11.0" version = "0.11.0"
@@ -292,6 +432,40 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde_core",
]
[[package]]
name = "jiff-static"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "js-sys"
version = "0.3.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]] [[package]]
name = "kqueue" name = "kqueue"
version = "1.1.1" version = "1.1.1"
@@ -321,17 +495,19 @@ dependencies = [
"cidr", "cidr",
"clap", "clap",
"notify", "notify",
"rand", "rand 0.10.0-rc.5",
"rperf3-rs",
"serde", "serde",
"serde-saphyr", "serde-saphyr",
"serde_json",
"tokio", "tokio",
] ]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.177" version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@@ -348,6 +524,12 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.6" version = "2.7.6"
@@ -356,9 +538,9 @@ checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.1.0" version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@@ -446,6 +628,30 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "portable-atomic"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.103" version = "1.0.103"
@@ -470,6 +676,17 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core 0.6.4",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.10.0-rc.5" version = "0.10.0-rc.5"
@@ -477,8 +694,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307" checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307"
dependencies = [ dependencies = [
"chacha20", "chacha20",
"getrandom", "getrandom 0.3.4",
"rand_core", "rand_core 0.10.0-rc-2",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.16",
] ]
[[package]] [[package]]
@@ -496,6 +732,63 @@ dependencies = [
"bitflags 2.10.0", "bitflags 2.10.0",
] ]
[[package]]
name = "regex"
version = "1.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "rperf3-rs"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e62aa432a9a065421824675d92be33d0efc3180ab71bd6a5322fd0f7fc2f516e"
dependencies = [
"anyhow",
"byteorder",
"chrono",
"clap",
"env_logger",
"hostname",
"libc",
"log",
"parking_lot",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"tokio",
]
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.20" version = "1.0.20"
@@ -589,10 +882,16 @@ dependencies = [
] ]
[[package]] [[package]]
name = "signal-hook-registry" name = "shlex"
version = "1.4.6" version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@@ -627,15 +926,35 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.110" version = "2.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "thiserror"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.48.0" version = "1.48.0"
@@ -707,6 +1026,67 @@ dependencies = [
"wit-bindgen", "wit-bindgen",
] ]
[[package]]
name = "wasm-bindgen"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4"
dependencies = [
"unicode-ident",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]] [[package]]
name = "winapi-util" name = "winapi-util"
version = "0.1.11" version = "0.1.11"
@@ -716,12 +1096,71 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "windows-link" name = "windows-link"
version = "0.2.1" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.60.2" version = "0.60.2"

View File

@@ -8,6 +8,8 @@ anywho = "0.1.2"
tokio = { version = "1.48.0", features = ["full"] } tokio = { version = "1.48.0", features = ["full"] }
rand = "0.10.0-rc.5" rand = "0.10.0-rc.5"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
rperf3-rs = "0.3.9"
cidr = "0.3.1" cidr = "0.3.1"
serde-saphyr = "0.0.10" serde-saphyr = "0.0.10"
arc-swap = "1.7.1" arc-swap = "1.7.1"

View File

@@ -33,7 +33,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
# change to scratch and get comment the apk command for prod, i guess # change to scratch and get comment the apk command for prod, i guess
FROM alpine:latest AS runtime FROM alpine:latest AS runtime
# RUN apk add --no-cache ca-certificates curl netcat-openbsd bind-tools strace RUN apk add --no-cache ca-certificates curl netcat-openbsd bind-tools strace
WORKDIR /enginewhy WORKDIR /enginewhy
COPY --from=builder /enginewhy/target/x86_64-unknown-linux-musl/release/l4lb /usr/bin/l4lb COPY --from=builder /enginewhy/target/x86_64-unknown-linux-musl/release/l4lb /usr/bin/l4lb
ENTRYPOINT ["l4lb"] ENTRYPOINT ["l4lb"]

128
README.md
View File

@@ -1,128 +0,0 @@
# nginy
Production't graden't load balancer.
## Quick links
## Todo
- [ ] architecture astronauting
- balancer module
- just the algorithms i guess
-
- backend module
- manages the backend pool
- deals with health / load check
- BackendPool for all the backends stored together
- Backend for individual backends
- has some methods used by balancer module to pick a suitable backend
- proxy module
- all the different supported protocols to handle
- will create a session / stream context structure (ConnectionContext)
- not globally tracked (this might change for UDP!)
- mainly some metadata
- config module
- set up all the stuff or something
- [ ] stream / session handling (i think wrapper around tokio TcpStream)
- [ ] basic backend pooling
- [ ] layer 4 load balancing
## notes
tcp, for nginx (and haproxy, its similar):
```c
// nginx
struct ngx_connection_s {
void *data;
ngx_event_t *read;
ngx_event_t *write;
ngx_socket_t fd;
ngx_recv_pt recv; // fn pointer to whatever recv fn used (different for idfferent platforms / protocol
ngx_send_pt send; // ditto
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening;
off_t sent;
ngx_log_t *log;
ngx_pool_t *pool;
int type;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
ngx_proxy_protocol_t *proxy_protocol;
#if (NGX_QUIC || NGX_COMPAT)
ngx_quic_stream_t *quic;
#endif
#if (NGX_SSL || NGX_COMPAT)
ngx_ssl_connection_t *ssl;
#endif
ngx_udp_connection_t *udp; // additional stuff for UDP (which is technically connectionless, but they use timeouts and a rbtree to store "sessions")
struct sockaddr *local_sockaddr;
socklen_t local_socklen;
ngx_buf_t *buffer;
ngx_queue_t queue;
ngx_atomic_uint_t number;
ngx_msec_t start_time;
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned pipeline:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned shared:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
unsigned need_last_buf:1;
unsigned need_flush_buf:1;
#if (NGX_HAVE_SENDFILE_NODISKIO || NGX_COMPAT)
unsigned busy_count:2;
#endif
#if (NGX_THREADS || NGX_COMPAT)
ngx_thread_task_t *sendfile_task;
#endif
};
```
process to load balance:
- accept incoming connection
- create some kind of stream / session object
- nginx use this to abstract around tcp and udp layers
- for us we probably don't need as detailed as them, since we have tokio::net, so itll be a wrapper around TcpStream
- ask the load balancing algorithm which server in the pool to route to
- connect to the server
- proxy the data (copy_bidirectional? maybe we want some metrics or logging, so might do manually)
- cleanup when smoeone leavesr or something goes wrong (with TCP, OS / tokio will tell us, with UDP probably just timeout based, and a periodic sweep of all sessions)
### UDP
UDP is connectionless, and i don't think UdpSocket or UdpFramed implement the traits required for tokio copy_bidirectional
but async write and read don't work on just regular datagrams, so probably not possible.
Would require us to implement our own bidirectional copying / proxying, as well as tracking "active" connections.

View File

@@ -1,11 +1,12 @@
healthcheck_addr: "0.0.0.0:8080"
iperf_addr: "0.0.0.0:5001"
backends: backends:
- id: "srv-1" - id: "srv-1"
ip: "127.0.0.1" ip: "192.67.67.2:8080"
port: 8081
- id: "srv-2" - id: "srv-2"
ip: "127.0.0.1" ip: "192.67.67.3:8080"
port: 8082
clusters: clusters:
main-api: main-api:
@@ -16,17 +17,9 @@ clusters:
rules: rules:
- clients: - clients:
- "0.0.0.0/0:8080" - "172.67.67.2/24:80"
targets: targets:
- "main-api" - "main-api"
strategy:
type: "RoundRobin"
- clients:
- "0.0.0.0/0:6767"
targets: # no issues with duplicate servers or clusters
- "priority-api"
- "priority-api"
- "priority-api" - "priority-api"
strategy: strategy:
type: "Adaptive" type: "Adaptive"

View File

@@ -1,5 +1,4 @@
services: services:
# two-arm load balancer
load-balancer: load-balancer:
image: neoslhp/enginewhy-lb image: neoslhp/enginewhy-lb
container_name: load-balancer container_name: load-balancer

View File

@@ -0,0 +1,38 @@
healthcheck_addr: "10.0.1.10:9000"
iperf_addr: "10.0.1.10:5201"
backends:
- id: "srv-1"
ip: "10.0.1.11:8081"
- id: "srv-2"
ip: "10.0.1.12:8082"
- id: "srv-3"
ip: "10.0.1.13:8083"
- id: "srv-4"
ip: "10.0.1.14:8084"
clusters:
main-api:
- "srv-1"
- "srv-2"
priority-api:
- "srv-3"
- "srv-4"
rules:
- clients:
- "0.0.0.0/0:8080"
targets:
- "main-api"
strategy:
type: "RoundRobin"
- clients:
- "10.0.0.0/24:8080"
- "10.0.0.0/24:25565"
targets:
- "main-api"
- "priority-api"
strategy:
type: "RoundRobin"

View File

@@ -0,0 +1,110 @@
services:
load-balancer:
image: enginewhy
container_name: load-balancer
tty: true
cap_add:
- NET_ADMIN
- SYS_ADMIN
volumes:
- ./config.yaml:/enginewhy/config.yaml
networks:
net_1:
ipv4_address: 10.0.1.10
net_2:
ipv4_address: 10.0.0.10
net_3:
ipv4_address: 10.0.2.10
srv-1:
image: nicolaka/netshoot
container_name: srv-1
tty: true
command: ["python3", "-m", "http.server", "8081", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.11
ports:
- "8081:8081"
volumes:
- ./srv1:/root/www
cap_add: [ NET_ADMIN ]
srv-2:
image: nicolaka/netshoot
container_name: srv-2
tty: true
command: ["python3", "-m", "http.server", "8082", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.12
ports:
- "8082:8082"
volumes:
- ./srv2:/root/www
cap_add: [ NET_ADMIN ]
srv-3:
image: nicolaka/netshoot
container_name: srv-3
tty: true
command: ["python3", "-m", "http.server", "8083", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.13
ports:
- "8083:8083"
volumes:
- ./srv3:/root/www
cap_add: [ NET_ADMIN ]
srv-4:
image: nicolaka/netshoot
container_name: srv-4
tty: true
command: ["python3", "-m", "http.server", "8084", "--directory", "/root/www"]
networks:
net_1:
ipv4_address: 10.0.1.14
ports:
- "8084:8084"
volumes:
- ./srv4:/root/www
cap_add: [ NET_ADMIN ]
client-net2:
image: nicolaka/netshoot
container_name: client-net2
tty: true
networks:
net_2:
ipv4_address: 10.0.0.11
cap_add: [ NET_ADMIN ]
client-net3:
image: nicolaka/netshoot
container_name: client-net3
tty: true
networks:
net_3:
ipv4_address: 10.0.2.11
cap_add: [ NET_ADMIN ]
networks:
net_1:
driver: bridge
ipam:
config:
- subnet: 10.0.1.0/24
net_2:
driver: bridge
ipam:
config:
- subnet: 10.0.0.0/24
net_3:
driver: bridge
ipam:
config:
- subnet: 10.0.2.0/24

View File

@@ -0,0 +1 @@
Hello from server 1!

View File

@@ -0,0 +1 @@
Hello from server 2!

View File

@@ -0,0 +1 @@
Hello from server 3!

View File

@@ -0,0 +1 @@
Hello from server 4!

View File

@@ -146,20 +146,14 @@ async fn main() -> std::io::Result<()> {
} }
println!(); println!();
// Identify this process (client) by the local socket address used to connect
let server_identifier = match stream.local_addr() {
Ok(addr) => addr.to_string(),
Err(_) => format!("localhost:{}", PORT),
};
let mut packet: HashMap<String, Value> = HashMap::new(); let mut packet: HashMap<String, Value> = HashMap::new();
packet.insert("server_ip".to_string(), Value::String(server_identifier));
packet.insert("cpu".to_string(), Value::from(cpu_usage)); // % packet.insert("cpu".to_string(), Value::from(cpu_usage)); // %
packet.insert("mem".to_string(), Value::from(mem_usage)); // % packet.insert("mem".to_string(), Value::from(mem_usage)); // %
packet.insert("net".to_string(), Value::from(net_usage_pct)); packet.insert("net".to_string(), Value::from(net_usage_pct));
packet.insert("io".to_string(), Value::from(io_usage)); packet.insert("io".to_string(), Value::from(io_usage));
let serialized_packet = serde_json::to_string(&packet)?; let serialized_packet = serde_json::to_string(&packet)?;
serialized_packet.push('\n');
let _ = stream.write(serialized_packet.as_bytes()); let _ = stream.write(serialized_packet.as_bytes());
thread::sleep(Duration::from_secs(10)); thread::sleep(Duration::from_secs(10));

0
report.md Normal file
View File

View File

@@ -1 +1,134 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
use serde_json::Value;
use rperf3::{Config, Server};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::io::AsyncBufReadExt;
// Physical server health statistics, used for certain load balancing algorithms
#[derive(Debug, Default)]
pub struct ServerMetrics {
pub cpu: f64,
pub mem: f64,
pub net: f64,
pub io: f64,
}
impl ServerMetrics {
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu;
self.mem = mem;
self.net = net;
self.io = io;
}
}
pub async fn start_healthcheck_listener(
addr: &str,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
) -> std::io::Result<()> {
let addrs = tokio::net::lookup_host(addr).await?;
let mut listener = None;
for a in addrs {
let socket = match a {
SocketAddr::V4(_) => TcpSocket::new_v4()?,
SocketAddr::V6(_) => TcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
if socket.bind(a).is_ok() {
listener = Some(socket.listen(1024)?);
break;
}
}
let listener = listener.ok_or_else(|| {
eprintln!("health listener could not bind to port");
std::io::Error::new(std::io::ErrorKind::Other, "health listener failed")
})?;
println!("healthcheck server listening on {}", addr);
loop {
let (stream, remote_addr) = match listener.accept().await {
Ok(v) => v,
Err(e) => {
continue;
}
};
if let Err(e) = handle_metrics_stream(stream, &healths).await {
eprintln!("connection handler error: {}", e);
}
}
}
pub async fn start_iperf_server(addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let sock = addr.parse::<SocketAddr>()?;
let mut config = Config::server(sock.port());
config.bind_addr = Some(sock.ip());
let server = Server::new(config);
println!("iperf server listening on {}", addr);
server.run().await?;
Ok(())
}
async fn handle_metrics_stream(
stream: TcpStream,
healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
) -> std::io::Result<()> {
let server_ip = stream.peer_addr()?.ip();
let mut reader = tokio::io::BufReader::new(stream);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
if let Err(e) = process_metrics(server_ip, &line, healths) {
eprintln!("skipping invalid packet: {}", e);
}
}
Err(e) => {
eprintln!("connection error: {}", e);
break;
}
}
}
Ok(())
}
fn process_metrics(
server_ip: IpAddr,
json_str: &str,
healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
) -> Result<(), String> {
let parsed: Value =
serde_json::from_str(json_str).map_err(|e| format!("parse error: {}", e))?;
let metrics_lock = healths
.get(&server_ip)
.ok_or_else(|| format!("unknown server: {}", server_ip))?;
let get_f64 = |key: &str| -> Result<f64, String> {
parsed
.get(key)
.and_then(|v| v.as_f64())
.ok_or_else(|| format!("invalid '{}'", key))
};
if let Ok(mut guard) = metrics_lock.write() {
guard.update(
get_f64("cpu")?,
get_f64("mem")?,
get_f64("net")?,
get_f64("io")?,
);
}
Ok(())
}

View File

@@ -1,29 +1,12 @@
pub mod health; pub mod health;
use crate::backend::health::ServerMetrics;
use core::fmt; use core::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
// Physical server health statistics, used for certain load balancing algorithms
#[derive(Debug, Default)]
pub struct ServerMetrics {
pub cpu: f64,
pub mem: f64,
pub net: f64,
pub io: f64,
}
impl ServerMetrics {
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
self.cpu = cpu;
self.mem = mem;
self.net = net;
self.io = io;
}
}
// A possible endpoint for a proxied connection. // A possible endpoint for a proxied connection.
// Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>> // Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>>
#[derive(Debug)] #[derive(Debug)]

View File

@@ -1,4 +1,5 @@
use crate::backend::{Backend, BackendPool, ServerMetrics}; use crate::backend::{Backend, BackendPool};
use crate::backend::health::ServerMetrics;
use crate::balancer::{Balancer, ConnectionInfo}; use crate::balancer::{Balancer, ConnectionInfo};
use rand::prelude::*; use rand::prelude::*;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
@@ -27,7 +28,7 @@ impl AdaptiveWeightBalancer {
.iter() .iter()
.map(|b| AdaptiveNode { .map(|b| AdaptiveNode {
backend: b.clone(), backend: b.clone(),
weight: 0f64, weight: 1f64,
}) })
.collect(); .collect();
@@ -85,7 +86,6 @@ impl Balancer for AdaptiveWeightBalancer {
}; };
let ratio = risk / node.weight; let ratio = risk / node.weight;
if ratio <= threshold { if ratio <= threshold {
return Some(node.backend.clone()); return Some(node.backend.clone());
} }
@@ -142,3 +142,126 @@ impl Balancer for AdaptiveWeightBalancer {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::Backend;
use std::net::SocketAddr;
fn backend_factory(id: &str, ip: &str, port: u16) -> Arc<Backend> {
Arc::new(Backend::new(
id.to_string(),
SocketAddr::new(ip.parse().unwrap(), port),
Arc::new(RwLock::new(ServerMetrics::default())),
))
}
fn unused_ctx() -> ConnectionInfo {
ConnectionInfo {
client_ip: ("0.0.0.0".parse().unwrap()),
}
}
#[test]
fn basic_weight_update_and_choose() {
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
]);
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.5, 0.2, 0.2, 0.1], 0.5);
// initially equal weights
// update one backend to be heavily loaded
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(90.0, 80.0, 10.0, 5.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(10.0, 5.0, 1.0, 1.0);
}
// Choose backend: should pick the less loaded host server1
let chosen = b
.choose_backend(unused_ctx())
.expect("should choose a backend");
let sm0: &ServerMetrics = &backends.backends.get(0).unwrap().metrics.read().unwrap();
let sm1: &ServerMetrics = &backends.backends.get(1).unwrap().metrics.read().unwrap();
println!("{:?}, {:?}", sm0, sm1);
assert_eq!(chosen.id, "server-1");
}
#[test]
fn choose_none_when_empty() {
let mut b =
AdaptiveWeightBalancer::new(BackendPool::new(vec![]), [0.5, 0.2, 0.2, 0.1], 0.5);
assert!(b.choose_backend(unused_ctx()).is_none());
}
#[test]
fn ratio_triggers_immediate_selection() {
// Arrange two servers where server 1 has composite load 0 and server 2 has composite load 100.
// With alpha = 1.0 and two servers, threshold = 1.0 * (r_sum / w_sum) = 1.0 * (100 / 2) = 50.
// Server 0 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately.
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
]);
let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.25, 0.25, 0.25, 0.25], 1.0);
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(0.0, 0.0, 0.0, 0.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(100.0, 100.0, 100.0, 100.0);
}
let chosen = b
.choose_backend(unused_ctx())
.expect("should choose a backend");
assert_eq!(chosen.id, "server-0");
}
#[test]
fn choose_min_current_load_when_no_ratio() {
// Arrange three servers with identical composite loads so no server satisfies Ri/Wi <= threshold
// (set alpha < 1 so threshold < ratio). The implementation then falls back to picking the
// server with minimum current_load
let backends = BackendPool::new(vec![
backend_factory("server-0", "127.0.0.1", 3000),
backend_factory("server-1", "127.0.0.1", 3001),
backend_factory("server-2", "127.0.0.1", 3002),
]);
// set current_loads (field expected to be public)
{
let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap();
sm0_guard.update(10.0, 10.0, 10.0, 10.0);
}
{
let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap();
sm1_guard.update(5.0, 5.0, 5.0, 5.0);
}
{
let mut sm2_guard = backends.backends.get(2).unwrap().metrics.write().unwrap();
sm2_guard.update(20.0, 20.0, 20.0, 20.0);
}
// Use coeffs that only consider CPU so composite load is easy to reason about.
let mut bal = AdaptiveWeightBalancer::new(backends.clone(), [1.0, 0.0, 0.0, 0.0], 0.5);
// set identical composite loads > 0 for all so ratio = x and threshold = alpha * x < x
// you will have threshold = 25 for all 3 backend servers and ratio = 50
// so that forces to choose the smallest current load backend
let chosen = bal
.choose_backend(unused_ctx())
.expect("should choose a backend");
// expect server with smallest current_load server-1
assert_eq!(chosen.id, "server-1");
}
}

View File

@@ -1,7 +1,7 @@
use crate::backend::{Backend, BackendPool}; use crate::backend::{Backend, BackendPool};
use crate::balancer::{Balancer, ConnectionInfo}; use crate::balancer::{Balancer, ConnectionInfo};
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
#[derive(Debug)] #[derive(Debug)]
pub struct SourceIPHash { pub struct SourceIPHash {
@@ -30,8 +30,9 @@ impl Balancer for SourceIPHash {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::backend::health::ServerMetrics;
use std::net::IpAddr; use std::net::IpAddr;
use crate::backend::ServerMetrics; use std::sync::RwLock;
fn create_dummy_backends(count: usize) -> BackendPool { fn create_dummy_backends(count: usize) -> BackendPool {
let mut backends = Vec::new(); let mut backends = Vec::new();

View File

@@ -1 +1 @@
use super::*; // use super::*;

View File

@@ -1,7 +1,7 @@
use crate::backend::{Backend, BackendPool}; use crate::backend::{Backend, BackendPool};
use crate::balancer::{Balancer, ConnectionInfo}; use crate::balancer::{Balancer, ConnectionInfo};
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
// only the main thread for receiving connections should be // only the main thread for receiving connections should be
// doing the load balancing. alternatively, each thread // doing the load balancing. alternatively, each thread

View File

@@ -3,10 +3,12 @@ use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use crate::backend::health::*;
use crate::backend::*; use crate::backend::*;
use crate::balancer::Balancer; use crate::balancer::Balancer;
use crate::balancer::adaptive_weight::AdaptiveWeightBalancer; use crate::balancer::adaptive_weight::AdaptiveWeightBalancer;
use crate::balancer::round_robin::RoundRobinBalancer; use crate::balancer::round_robin::RoundRobinBalancer;
use crate::balancer::ip_hashing::SourceIPHash;
use crate::config::*; use crate::config::*;
pub struct RoutingTable { pub struct RoutingTable {
@@ -18,26 +20,26 @@ pub type PortListeners = HashMap<u16, RoutingTable>;
fn parse_client(s: &str) -> Result<(IpCidr, u16), String> { fn parse_client(s: &str) -> Result<(IpCidr, u16), String> {
// just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80) // just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80)
let (ip_part, port_part) = s.rsplit_once(':') let (ip_part, port_part) = s
.rsplit_once(':')
.ok_or_else(|| format!("badly formatted client: {}", s))?; .ok_or_else(|| format!("badly formatted client: {}", s))?;
let port = port_part.parse() let port = port_part.parse().map_err(|_| format!("bad port: {}", s))?;
.map_err(|_| format!("bad port: {}", s))?; let cidr = ip_part.parse().map_err(|_| format!("bad ip/mask: {}", s))?;
let cidr = ip_part.parse()
.map_err(|_| format!("bad ip/mask: {}", s))?;
Ok((cidr, port)) Ok((cidr, port))
} }
pub fn build_lb(
pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>), String> { config: &AppConfig,
) -> Result<(PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>), String> {
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new(); let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new(); let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
for backend_cfg in config.backends { for backend_cfg in &config.backends {
let ip: IpAddr = backend_cfg.ip.parse() let addr: SocketAddr = backend_cfg.ip.parse()
.map_err(|_| format!("bad ip: {}", backend_cfg.ip))?; .map_err(|_| format!("bad ip: {}", backend_cfg.ip))?;
let addr = SocketAddr::new(ip, backend_cfg.port); let ip = addr.ip();
let health = healths let health = healths
.entry(ip) .entry(ip)
@@ -46,12 +48,12 @@ pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc
let backend = Arc::new(Backend::new(backend_cfg.id.clone(), addr, health)); let backend = Arc::new(Backend::new(backend_cfg.id.clone(), addr, health));
backends.insert(backend_cfg.id, backend); backends.insert(backend_cfg.id.clone(), backend);
} }
let mut listeners: PortListeners = HashMap::new(); let mut listeners: PortListeners = HashMap::new();
for rule in config.rules { for rule in &config.rules {
let mut target_backends = Vec::new(); let mut target_backends = Vec::new();
for target_name in &rule.targets { for target_name in &rule.targets {
@@ -84,7 +86,7 @@ pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc
// cost of minor penalty to load balancing "quality" when you have several client ports. // cost of minor penalty to load balancing "quality" when you have several client ports.
let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new(); let mut port_groups: HashMap<u16, Vec<IpCidr>> = HashMap::new();
for client_def in rule.clients { for client_def in &rule.clients {
let (cidr, port) = parse_client(&client_def)?; let (cidr, port) = parse_client(&client_def)?;
port_groups.entry(port).or_default().push(cidr); port_groups.entry(port).or_default().push(cidr);
} }
@@ -99,6 +101,7 @@ pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc
let balancer: Box<dyn Balancer + Send> = match &rule.strategy { let balancer: Box<dyn Balancer + Send> = match &rule.strategy {
LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)), LoadBalancerStrategy::RoundRobin => Box::new(RoundRobinBalancer::new(pool)),
LoadBalancerStrategy::SourceIPHash => Box::new(SourceIPHash::new(pool)),
LoadBalancerStrategy::Adaptive { LoadBalancerStrategy::Adaptive {
coefficients, coefficients,
alpha, alpha,
@@ -118,7 +121,7 @@ pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap<IpAddr, Arc
for table in listeners.values_mut() { for table in listeners.values_mut() {
table table
.entries .entries
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); .sort_by(|(a, _), (b, _)| b.network_length().cmp(&a.network_length()));
} }
Ok((listeners, healths)) Ok((listeners, healths))

View File

@@ -16,11 +16,27 @@ pub mod loader;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
fn default_healthcheck_addr() -> String {
"0.0.0.0:8080".to_string()
}
fn default_iperf_addr() -> String {
"0.0.0.0:5201".to_string()
}
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
#[serde(default = "default_healthcheck_addr")]
pub healthcheck_addr: String,
#[serde(default = "default_iperf_addr")]
pub iperf_addr: String,
pub backends: Vec<BackendConfig>, pub backends: Vec<BackendConfig>,
#[serde(default)] #[serde(default)]
pub clusters: HashMap<String, Vec<String>>, pub clusters: HashMap<String, Vec<String>>,
pub rules: Vec<RuleConfig>, pub rules: Vec<RuleConfig>,
} }
@@ -28,7 +44,6 @@ pub struct AppConfig {
pub struct BackendConfig { pub struct BackendConfig {
pub id: String, pub id: String,
pub ip: String, pub ip: String,
pub port: u16,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@@ -42,5 +57,6 @@ pub struct RuleConfig {
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum LoadBalancerStrategy { pub enum LoadBalancerStrategy {
RoundRobin, RoundRobin,
SourceIPHash,
Adaptive { coefficients: [f64; 4], alpha: f64 }, Adaptive { coefficients: [f64; 4], alpha: f64 },
} }

View File

@@ -3,29 +3,35 @@ mod balancer;
mod config; mod config;
mod proxy; mod proxy;
use std::collections::HashMap; use crate::backend::health::{start_healthcheck_listener, start_iperf_server, ServerMetrics};
use crate::balancer::{ConnectionInfo}; use crate::balancer::ConnectionInfo;
use crate::config::loader::{build_lb, RoutingTable};
use crate::proxy::tcp::proxy_tcp_connection; use crate::proxy::tcp::proxy_tcp_connection;
use anywho::Error;
use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::hash::Hash;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf; use std::path::PathBuf;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use anywho::Error; use tokio::io::AsyncBufReadExt;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::backend::ServerMetrics;
use crate::config::loader::{build_lb, RoutingTable};
use notify::{Watcher, RecursiveMode, Event};
use clap::Parser; use clap::Parser;
use notify::{Event, RecursiveMode, Watcher};
use std::cmp;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
struct ProgramState { struct ProgramState {
tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>, tx_rt_map: HashMap<u16, mpsc::UnboundedSender<RoutingTable>>,
healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>, healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>,
health_listener: Option<tokio::task::JoinHandle<()>>,
iperf_server: Option<tokio::task::JoinHandle<()>>,
health_listener_addr: Option<String>,
iperf_server_addr: Option<String>,
} }
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@@ -49,6 +55,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(Mutex::new(ProgramState { let state = Arc::new(Mutex::new(ProgramState {
tx_rt_map: HashMap::new(), tx_rt_map: HashMap::new(),
healths: HashMap::new(), healths: HashMap::new(),
health_listener: None,
iperf_server: None,
health_listener_addr: None,
iperf_server_addr: None,
})); }));
if let Err(e) = load_config(&args.config, state.clone()).await { if let Err(e) = load_config(&args.config, state.clone()).await {
@@ -57,6 +67,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config_path = args.config.clone(); let config_path = args.config.clone();
let state_clone = state.clone(); let state_clone = state.clone();
tokio::spawn(async move { tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(1); let (tx, mut rx) = mpsc::channel(1);
@@ -66,26 +77,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = tx.blocking_send(()); let _ = tx.blocking_send(());
} }
} }
}).unwrap(); })
.unwrap();
watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap(); watcher
.watch(&config_path, RecursiveMode::NonRecursive)
.unwrap();
println!("watching for changes to {:?}", config_path); println!("watching for changes to {:?}", config_path);
while rx.recv().await.is_some() { while rx.recv().await.is_some() {
// for some reason, saving on certain text editors fires several events,
// and this causes us to reload a lot. try to flush some events, add a tiny delay
// to mitigate this
while rx.try_recv().is_ok() {}
tokio::time::sleep(Duration::from_millis(50)).await;
while rx.try_recv().is_ok() {}
if let Err(e) = load_config(&config_path, state_clone.clone()).await { if let Err(e) = load_config(&config_path, state_clone.clone()).await {
eprintln!("loading config failed: {}", e); eprintln!("loading config failed: {}", e);
} }
} }
}); });
loop { tokio::time::sleep(Duration::from_hours(1)).await; } loop {
tokio::time::sleep(Duration::from_hours(1)).await;
}
} }
async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> { async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<(), Error> {
let f = File::open(path)?; let f = File::open(path)?;
let app_config: config::AppConfig = match serde_saphyr::from_reader(f) { let app_config: config::AppConfig = match serde_saphyr::from_reader(f) {
Ok(app_config) => app_config, Ok(app_config) => app_config,
Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); } Err(e) => {
eprintln!("error parsing config {}", e);
return Ok(());
}
}; };
println!( println!(
@@ -94,7 +121,7 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
app_config.rules.len() app_config.rules.len()
); );
let (mut listeners, health_monitors) = match build_lb(app_config) { let (mut listeners, health_monitors) = match build_lb(&app_config) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
eprintln!("config has logical errors: {}", e); eprintln!("config has logical errors: {}", e);
@@ -103,7 +130,8 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
}; };
let mut prog_state = state.lock().unwrap(); let mut prog_state = state.lock().unwrap();
let ports_to_remove: Vec<u16> = prog_state.tx_rt_map let ports_to_remove: Vec<u16> = prog_state
.tx_rt_map
.keys() .keys()
.cloned() .cloned()
.filter(|port| !listeners.contains_key(port)) .filter(|port| !listeners.contains_key(port))
@@ -113,6 +141,38 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
prog_state.tx_rt_map.remove(&port); prog_state.tx_rt_map.remove(&port);
} }
if let Some(handle) = prog_state.health_listener.take() {
handle.abort();
}
let health_map: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = health_monitors.clone();
let health_addr = app_config.healthcheck_addr.clone();
let health_addr_c = health_addr.clone();
let health_handle = tokio::spawn(async move {
if let Err(e) = start_healthcheck_listener(&health_addr, health_map).await {
eprintln!("health check listener failed: {}", e);
}
});
prog_state.health_listener = Some(health_handle);
prog_state.health_listener_addr = Some(health_addr_c);
// maybe restart iperf server
let iperf_addr = app_config.iperf_addr.clone();
if prog_state.iperf_server_addr.as_ref() != Some(&iperf_addr) {
if let Some(handle) = prog_state.iperf_server.take() {
handle.abort();
}
let iperf_addr_c = iperf_addr.clone();
let iperf_handle = tokio::spawn(async move {
if let Err(e) = start_iperf_server(iperf_addr.as_str()).await {
eprintln!("iperf server failed: {}", e);
}
});
prog_state.iperf_server = Some(iperf_handle);
prog_state.iperf_server_addr = Some(iperf_addr_c);
}
prog_state.healths = health_monitors; prog_state.healths = health_monitors;
for (port, routing_table) in listeners.drain() { for (port, routing_table) in listeners.drain() {
if let Some(x) = prog_state.tx_rt_map.get_mut(&port) { if let Some(x) = prog_state.tx_rt_map.get_mut(&port) {
@@ -133,7 +193,7 @@ async fn load_config(path: &PathBuf, state: Arc<Mutex<ProgramState>>) -> Result<
async fn run_listener( async fn run_listener(
port: u16, port: u16,
mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>, mut rx_rt: mpsc::UnboundedReceiver<RoutingTable>,
mut current_table: RoutingTable mut current_table: RoutingTable,
) { ) {
let addr = format!("0.0.0.0:{}", port); let addr = format!("0.0.0.0:{}", port);
println!("Starting tcp listener on {}", addr); println!("Starting tcp listener on {}", addr);