From 799adf987799a69ee3cf7376c6032d5fc0baa28b Mon Sep 17 00:00:00 2001 From: nnhphong Date: Wed, 10 Dec 2025 15:36:51 -0500 Subject: [PATCH 1/3] too much changes, idek anymore --- Cargo.lock | 581 +++++++++++++++++++++++++++++- Cargo.toml | 2 + Dockerfile | 3 +- config.yaml | 2 +- src/backend/health.rs | 16 + src/backend/mod.rs | 19 +- src/balancer/adaptive_weight.rs | 6 +- src/balancer/ip_hashing.rs | 2 +- src/balancer/least_connections.rs | 2 +- src/balancer/round_robin.rs | 2 +- src/config/loader.rs | 5 +- src/main.rs | 80 +++- 12 files changed, 685 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25cc39d..b10fe27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,12 +9,86 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" dependencies = [ "cfg-if", - "getrandom", + "getrandom 0.3.4", "once_cell", "version_check", "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + [[package]] name = "anywho" version = "0.1.2" @@ -45,12 +119,34 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +[[package]] +name = "cc" +version = "1.2.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" +dependencies = [ + "find-msvc-tools", + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -65,7 +161,20 @@ checksum = "99cbf41c6ec3c4b9eaf7f8f5c11a72cd7d3aa0428125c20d5ef4d09907a0f019" dependencies = [ "cfg-if", "cpufeatures", - "rand_core", + "rand_core 0.10.0-rc-2", +] + +[[package]] +name = "chrono" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", ] [[package]] @@ -74,6 +183,58 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60" +[[package]] +name = "clap" +version = "4.5.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -101,12 +262,52 @@ dependencies = [ "encoding_rs", ] +[[package]] +name = "env_filter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" + [[package]] name = "foldhash" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -137,21 +338,104 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jiff" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "js-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "l4lb" version = "0.1.0" dependencies = [ "anywho", "cidr", - "rand", + "rand 0.10.0-rc.5", + "rperf3-rs", "serde", "serde-saphyr", + "serde_json", "tokio", ] @@ -170,6 +454,18 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "memchr" version = "2.7.6" @@ -208,6 +504,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "parking_lot" version = "0.12.5" @@ -237,6 +539,30 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -261,6 +587,17 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.10.0-rc.5" @@ -268,8 +605,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be866deebbade98028b705499827ad6967c8bb1e21f96a2609913c8c076e9307" dependencies = [ "chacha20", - "getrandom", - "rand_core", + "getrandom 0.3.4", + "rand_core 0.10.0-rc-2", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -287,6 +643,63 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" + +[[package]] +name = "rperf3-rs" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62aa432a9a065421824675d92be33d0efc3180ab71bd6a5322fd0f7fc2f516e" +dependencies = [ + "anyhow", + "byteorder", + "chrono", + "clap", + "env_logger", + "hostname", + "libc", + "log", + "parking_lot", + "rand 0.8.5", + "serde", + "serde_json", + "thiserror", + "tokio", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "ryu" version = "1.0.20" @@ -370,6 +783,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -401,6 +820,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.110" @@ -412,6 +837,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio" version = "1.48.0" @@ -446,6 +891,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "version_check" version = "0.9.5" @@ -467,12 +918,132 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index f55a2b5..d4f1082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,7 @@ anywho = "0.1.2" tokio = { version = "1.48.0", features = ["full"] } rand = "0.10.0-rc.5" serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" +rperf3-rs = "0.3.9" cidr = "0.3.1" serde-saphyr = "0.0.10" diff --git a/Dockerfile b/Dockerfile index 47cc789..4917c8f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,4 +36,5 @@ FROM alpine:latest AS runtime # RUN apk add --no-cache ca-certificates curl netcat-openbsd bind-tools strace WORKDIR /enginewhy COPY --from=builder /enginewhy/target/x86_64-unknown-linux-musl/release/l4lb /usr/bin/l4lb -ENTRYPOINT ["l4lb"] \ No newline at end of file +COPY config.yaml . +ENTRYPOINT ["l4lb"] diff --git a/config.yaml b/config.yaml index 214f970..ce4013a 100644 --- a/config.yaml +++ b/config.yaml @@ -16,7 +16,7 @@ clusters: rules: - clients: - - "0.0.0.0/0:8080" + - "0.0.0.0/0:8888" targets: - "main-api" strategy: diff --git a/src/backend/health.rs b/src/backend/health.rs index 8b13789..0591438 100644 --- a/src/backend/health.rs +++ b/src/backend/health.rs @@ -1 +1,17 @@ +// Physical server health statistics, used for certain load balancing algorithms +#[derive(Debug, Default)] +pub struct ServerMetrics { + pub cpu: f64, + pub mem: f64, + pub net: f64, + pub io: f64, +} +impl ServerMetrics { + pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) { + self.cpu = cpu; + self.mem = mem; + self.net = net; + self.io = io; + } +} diff --git a/src/backend/mod.rs b/src/backend/mod.rs index f6547ed..d84aa76 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -5,24 +5,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::{AtomicUsize, Ordering}; - -// Physical server health statistics, used for certain load balancing algorithms -#[derive(Debug, Default)] -pub struct ServerMetrics { - pub cpu: f64, - pub mem: f64, - pub net: f64, - pub io: f64, -} - -impl ServerMetrics { - pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) { - self.cpu = cpu; - self.mem = mem; - self.net = net; - self.io = io; - } -} +use crate::backend::health::ServerMetrics; // A possible endpoint for a proxied connection. // Note that multiple may live on the same server, hence the Arc> diff --git a/src/balancer/adaptive_weight.rs b/src/balancer/adaptive_weight.rs index abcd486..c23f904 100644 --- a/src/balancer/adaptive_weight.rs +++ b/src/balancer/adaptive_weight.rs @@ -1,10 +1,10 @@ -use crate::backend::{Backend, BackendPool, ServerMetrics}; +use crate::backend::{Backend, BackendPool}; +use crate::backend::health::ServerMetrics; use crate::balancer::{Balancer, ConnectionInfo}; use rand::prelude::*; use rand::rngs::SmallRng; use std::fmt::Debug; -use std::fs::Metadata; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc}; #[derive(Debug)] struct AdaptiveNode { diff --git a/src/balancer/ip_hashing.rs b/src/balancer/ip_hashing.rs index 25e3c63..2cc5318 100644 --- a/src/balancer/ip_hashing.rs +++ b/src/balancer/ip_hashing.rs @@ -1,7 +1,7 @@ use crate::backend::{Backend, BackendPool}; use crate::balancer::{Balancer, ConnectionInfo}; use std::hash::{DefaultHasher, Hash, Hasher}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc}; #[derive(Debug)] pub struct SourceIPHash { diff --git a/src/balancer/least_connections.rs b/src/balancer/least_connections.rs index 4563e55..35ba390 100644 --- a/src/balancer/least_connections.rs +++ b/src/balancer/least_connections.rs @@ -1 +1 @@ -use super::*; +// use super::*; diff --git a/src/balancer/round_robin.rs b/src/balancer/round_robin.rs index 7c9972d..e0d60d1 100644 --- a/src/balancer/round_robin.rs +++ b/src/balancer/round_robin.rs @@ -1,7 +1,7 @@ use crate::backend::{Backend, BackendPool}; use crate::balancer::{Balancer, ConnectionInfo}; use std::fmt::Debug; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc}; // only the main thread for receiving connections should be // doing the load balancing. alternatively, each thread diff --git a/src/config/loader.rs b/src/config/loader.rs index 81c7314..43a2406 100644 --- a/src/config/loader.rs +++ b/src/config/loader.rs @@ -4,6 +4,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; use crate::backend::*; +use crate::backend::health::*; use crate::balancer::Balancer; use crate::balancer::adaptive_weight::AdaptiveWeightBalancer; use crate::balancer::round_robin::RoundRobinBalancer; @@ -26,7 +27,7 @@ fn parse_client(s: &str) -> (IpCidr, u16) { pub type PortListeners = HashMap; -pub fn build_lb(config: AppConfig) -> PortListeners { +pub fn build_lb(config: AppConfig) -> (PortListeners, HashMap>>) { let mut healths: HashMap>> = HashMap::new(); let mut backends: HashMap> = HashMap::new(); @@ -116,5 +117,5 @@ pub fn build_lb(config: AppConfig) -> PortListeners { .sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length())); } - listeners + (listeners, healths) } diff --git a/src/main.rs b/src/main.rs index b459cde..aaefa85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,10 +7,74 @@ use crate::balancer::{Balancer, ConnectionInfo}; use crate::proxy::tcp::proxy_tcp_connection; use std::fs::File; use std::sync::atomic::{AtomicU64, Ordering}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncReadExt; +use serde_json::Value; +use std::collections::HashMap; +use std::net::{IpAddr}; +use std::sync::{Arc, RwLock}; +use crate::backend::health::ServerMetrics; +use rperf3::{Server, Config}; +use std::io::Read; + static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); +async fn start_iperf_server() -> Result<(), Box> { + let config = Config::server(5001); + let server = Server::new(config); + server.run().await?; + Ok(()) +} + +async fn handle_connection(mut stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { + loop { + let mut buffer = [0u8; 512]; + let bytes_read = stream.read(&mut buffer).await?; + let receiving_data = String::from_utf8((buffer[..bytes_read]).to_vec()) + .unwrap(); + + let parsed_recdata : Value = serde_json::from_str(&receiving_data).unwrap(); + let server_ip: IpAddr = parsed_recdata["server_ip"] + .as_str() + .unwrap() + .parse() + .unwrap(); + + healths.get(&server_ip) + .unwrap() + .write() + .unwrap() + .update( + parsed_recdata["cpu"].as_f64().unwrap(), + parsed_recdata["mem"].as_f64().unwrap(), + parsed_recdata["net"].as_f64().unwrap(), + parsed_recdata["io"].as_f64().unwrap(), + ); + } + #[warn(unreachable_code)] + Ok(()) +} + +async fn start_healthcheck_listener(addr: &str, healths: HashMap>>) -> std::io::Result<()> { + let listener = TcpListener::bind(addr).await?; + println!("TCP server listening on {}", addr); + loop { + let (stream, remote_addr) = match listener.accept().await { + Ok(v) => v, + Err(e) => { + continue; + } + }; + + if let Err(e) = handle_connection(stream, &healths).await { + eprintln!("connection handler error: {}", e); + } + } + + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), Box> { let f = File::open("config.yaml").expect("couldn't open config.yaml"); @@ -22,7 +86,7 @@ async fn main() -> Result<(), Box> { app_config.rules.len() ); - let listeners = config::loader::build_lb(app_config); + let (listeners, healths) = config::loader::build_lb(app_config); if listeners.is_empty() { eprintln!("its a lawless land"); @@ -31,6 +95,18 @@ async fn main() -> Result<(), Box> { let mut handles = Vec::new(); + handles.push( + tokio::spawn(async { + start_healthcheck_listener("127.0.0.1:8080", healths).await.unwrap(); + }) + ); + + handles.push( + tokio::spawn(async { + start_iperf_server().await; + }) + ); + for (port, mut routing_table) in listeners { handles.push(tokio::spawn(async move { let addr = format!("0.0.0.0:{}", port); From 8d5122ea1d309c3e6a1609e4b555ac81cb0f2fbe Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 10 Dec 2025 16:04:14 -0500 Subject: [PATCH 2/3] fix server metrics framing issue --- infra/enginewhy-server.rs | 8 +---- src/main.rs | 76 ++++++++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/infra/enginewhy-server.rs b/infra/enginewhy-server.rs index 8786484..4614649 100644 --- a/infra/enginewhy-server.rs +++ b/infra/enginewhy-server.rs @@ -146,20 +146,14 @@ async fn main() -> std::io::Result<()> { } println!(); - // Identify this process (client) by the local socket address used to connect - let server_identifier = match stream.local_addr() { - Ok(addr) => addr.to_string(), - Err(_) => format!("localhost:{}", PORT), - }; - let mut packet: HashMap = HashMap::new(); - packet.insert("server_ip".to_string(), Value::String(server_identifier)); packet.insert("cpu".to_string(), Value::from(cpu_usage)); // % packet.insert("mem".to_string(), Value::from(mem_usage)); // % packet.insert("net".to_string(), Value::from(net_usage_pct)); packet.insert("io".to_string(), Value::from(io_usage)); let serialized_packet = serde_json::to_string(&packet)?; + serialized_packet.push('\n'); let _ = stream.write(serialized_packet.as_bytes()); thread::sleep(Duration::from_secs(10)); diff --git a/src/main.rs b/src/main.rs index aaefa85..2395f2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use crate::proxy::tcp::proxy_tcp_connection; use std::fs::File; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncBufReadExt, AsyncReadExt}; use serde_json::Value; use std::collections::HashMap; use std::net::{IpAddr}; @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; use crate::backend::health::ServerMetrics; use rperf3::{Server, Config}; use std::io::Read; - +use std::io::{BufRead, BufReader}; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); @@ -27,32 +27,52 @@ async fn start_iperf_server() -> Result<(), Box> { Ok(()) } -async fn handle_connection(mut stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { - loop { - let mut buffer = [0u8; 512]; - let bytes_read = stream.read(&mut buffer).await?; - let receiving_data = String::from_utf8((buffer[..bytes_read]).to_vec()) - .unwrap(); - - let parsed_recdata : Value = serde_json::from_str(&receiving_data).unwrap(); - let server_ip: IpAddr = parsed_recdata["server_ip"] - .as_str() - .unwrap() - .parse() - .unwrap(); +async fn handle_metrics_stream(stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { + let server_ip = stream.peer_addr()?.ip(); + let mut reader = tokio::io::BufReader::new(stream); + let mut line = String::new(); - healths.get(&server_ip) - .unwrap() - .write() - .unwrap() - .update( - parsed_recdata["cpu"].as_f64().unwrap(), - parsed_recdata["mem"].as_f64().unwrap(), - parsed_recdata["net"].as_f64().unwrap(), - parsed_recdata["io"].as_f64().unwrap(), - ); + loop { + line.clear(); + + match reader.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => { + if let Err(e) = process_metrics(server_ip, &line, healths) { + eprintln!("skipping invalid packet: {}", e); + } + } + Err(e) => { + eprintln!("connection error: {}", e); + break; + } + } } - #[warn(unreachable_code)] + Ok(()) +} + +fn process_metrics(server_ip: IpAddr, json_str: &str, healths: &HashMap>>) -> Result<(), String> { + let parsed: Value = serde_json::from_str(json_str) + .map_err(|e| format!("parse error: {}", e))?; + + let metrics_lock = healths.get(&server_ip) + .ok_or_else(|| format!("unknown server: {}", server_ip))?; + + let get_f64 = |key: &str| -> Result { + parsed.get(key) + .and_then(|v| v.as_f64()) + .ok_or_else(|| format!("invalid '{}'", key)) + }; + + if let Ok(mut guard) = metrics_lock.write() { + guard.update( + get_f64("cpu")?, + get_f64("mem")?, + get_f64("net")?, + get_f64("io")?, + ); + } + Ok(()) } @@ -67,7 +87,7 @@ async fn start_healthcheck_listener(addr: &str, healths: HashMap Result<(), Box> { handles.push( tokio::spawn(async { - start_healthcheck_listener("127.0.0.1:8080", healths).await.unwrap(); + start_healthcheck_listener("0.0.0.0:8080", healths).await.unwrap(); }) ); From 022c48a041cf1ea4dbd0dc0f4c41a5611bd2b940 Mon Sep 17 00:00:00 2001 From: psun256 Date: Wed, 10 Dec 2025 18:52:40 -0500 Subject: [PATCH 3/3] applied hot reload to health check logic --- Cargo.lock | 67 +++++++++-- config.yaml | 5 + src/backend/health.rs | 117 +++++++++++++++++++ src/backend/mod.rs | 2 +- src/balancer/adaptive_weight.rs | 129 ++++++++++++++++++++- src/balancer/ip_hashing.rs | 7 +- src/balancer/round_robin.rs | 2 +- src/config/loader.rs | 27 +++-- src/config/mod.rs | 16 +++ src/main.rs | 193 +++++++++++++------------------- 10 files changed, 421 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5886925..4811635 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,26 @@ dependencies = [ "cc", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -446,6 +466,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "l4lb" version = "0.1.0" @@ -453,6 +493,8 @@ dependencies = [ "anywho", "arc-swap", "cidr", + "clap", + "notify", "rand 0.10.0-rc.5", "rperf3-rs", "serde", @@ -463,9 +505,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "lock_api" @@ -496,9 +538,9 @@ checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "log", @@ -847,9 +889,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.6" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" +checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" dependencies = [ "libc", ] @@ -884,9 +926,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.110" +version = "2.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" +checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" dependencies = [ "proc-macro2", "quote", @@ -1045,6 +1087,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/config.yaml b/config.yaml index e2e1ce1..47fe30d 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,7 @@ +healthcheck_addr: "127.0.0.1:9000" + +iperf_addr: "0.0.0.0:5200" + backends: - id: "srv-1" ip: "127.0.0.1" @@ -24,6 +28,7 @@ rules: - clients: - "0.0.0.0/0:6767" + - "0.0.0.0/0:6969" targets: # no issues with duplicate servers or clusters - "priority-api" - "priority-api" diff --git a/src/backend/health.rs b/src/backend/health.rs index 0591438..3801020 100644 --- a/src/backend/health.rs +++ b/src/backend/health.rs @@ -1,3 +1,11 @@ +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{Arc, RwLock}; +use serde_json::Value; +use rperf3::{Config, Server}; +use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::io::AsyncBufReadExt; + // Physical server health statistics, used for certain load balancing algorithms #[derive(Debug, Default)] pub struct ServerMetrics { @@ -15,3 +23,112 @@ impl ServerMetrics { self.io = io; } } + +pub async fn start_healthcheck_listener( + addr: &str, + healths: HashMap>>, +) -> std::io::Result<()> { + let addrs = tokio::net::lookup_host(addr).await?; + let mut listener = None; + + for a in addrs { + let socket = match a { + SocketAddr::V4(_) => TcpSocket::new_v4()?, + SocketAddr::V6(_) => TcpSocket::new_v6()?, + }; + + socket.set_reuseaddr(true)?; + + if socket.bind(a).is_ok() { + listener = Some(socket.listen(1024)?); + break; + } + } + + let listener = listener.ok_or_else(|| { + eprintln!("health listener could not bind to port"); + std::io::Error::new(std::io::ErrorKind::Other, "health listener failed") + })?; + + println!("healthcheck server listening on {}", addr); + loop { + let (stream, remote_addr) = match listener.accept().await { + Ok(v) => v, + Err(e) => { + continue; + } + }; + + if let Err(e) = handle_metrics_stream(stream, &healths).await { + eprintln!("connection handler error: {}", e); + } + } +} + +pub async fn start_iperf_server(addr: &str) -> Result<(), Box> { + let sock = addr.parse::()?; + let mut config = Config::server(sock.port()); + config.bind_addr = Some(sock.ip()); + let server = Server::new(config); + println!("iperf server listening on {}", addr); + server.run().await?; + Ok(()) +} + +async fn handle_metrics_stream( + stream: TcpStream, + healths: &HashMap>>, +) -> std::io::Result<()> { + let server_ip = stream.peer_addr()?.ip(); + let mut reader = tokio::io::BufReader::new(stream); + let mut line = String::new(); + + loop { + line.clear(); + + match reader.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => { + if let Err(e) = process_metrics(server_ip, &line, healths) { + eprintln!("skipping invalid packet: {}", e); + } + } + Err(e) => { + eprintln!("connection error: {}", e); + break; + } + } + } + Ok(()) +} + +fn process_metrics( + server_ip: IpAddr, + json_str: &str, + healths: &HashMap>>, +) -> Result<(), String> { + let parsed: Value = + serde_json::from_str(json_str).map_err(|e| format!("parse error: {}", e))?; + + let metrics_lock = healths + .get(&server_ip) + .ok_or_else(|| format!("unknown server: {}", server_ip))?; + + let get_f64 = |key: &str| -> Result { + parsed + .get(key) + .and_then(|v| v.as_f64()) + .ok_or_else(|| format!("invalid '{}'", key)) + }; + + if let Ok(mut guard) = metrics_lock.write() { + guard.update( + get_f64("cpu")?, + get_f64("mem")?, + get_f64("net")?, + get_f64("io")?, + ); + } + + Ok(()) +} \ No newline at end of file diff --git a/src/backend/mod.rs b/src/backend/mod.rs index d84aa76..445a292 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -1,11 +1,11 @@ pub mod health; +use crate::backend::health::ServerMetrics; use core::fmt; use std::net::SocketAddr; use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::{AtomicUsize, Ordering}; -use crate::backend::health::ServerMetrics; // A possible endpoint for a proxied connection. // Note that multiple may live on the same server, hence the Arc> diff --git a/src/balancer/adaptive_weight.rs b/src/balancer/adaptive_weight.rs index c23f904..dcef3dc 100644 --- a/src/balancer/adaptive_weight.rs +++ b/src/balancer/adaptive_weight.rs @@ -4,7 +4,8 @@ use crate::balancer::{Balancer, ConnectionInfo}; use rand::prelude::*; use rand::rngs::SmallRng; use std::fmt::Debug; -use std::sync::{Arc}; +use std::fs::Metadata; +use std::sync::{Arc, RwLock}; #[derive(Debug)] struct AdaptiveNode { @@ -27,7 +28,7 @@ impl AdaptiveWeightBalancer { .iter() .map(|b| AdaptiveNode { backend: b.clone(), - weight: 0f64, + weight: 1f64, }) .collect(); @@ -85,7 +86,6 @@ impl Balancer for AdaptiveWeightBalancer { }; let ratio = risk / node.weight; - if ratio <= threshold { return Some(node.backend.clone()); } @@ -142,3 +142,126 @@ impl Balancer for AdaptiveWeightBalancer { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::Backend; + use std::net::SocketAddr; + + fn backend_factory(id: &str, ip: &str, port: u16) -> Arc { + Arc::new(Backend::new( + id.to_string(), + SocketAddr::new(ip.parse().unwrap(), port), + Arc::new(RwLock::new(ServerMetrics::default())), + )) + } + + fn unused_ctx() -> ConnectionInfo { + ConnectionInfo { + client_ip: ("0.0.0.0".parse().unwrap()), + } + } + + #[test] + fn basic_weight_update_and_choose() { + let backends = BackendPool::new(vec![ + backend_factory("server-0", "127.0.0.1", 3000), + backend_factory("server-1", "127.0.0.1", 3001), + ]); + let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.5, 0.2, 0.2, 0.1], 0.5); + // initially equal weights + // update one backend to be heavily loaded + { + let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap(); + sm0_guard.update(90.0, 80.0, 10.0, 5.0); + } + { + let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap(); + sm1_guard.update(10.0, 5.0, 1.0, 1.0); + } + + // Choose backend: should pick the less loaded host server1 + let chosen = b + .choose_backend(unused_ctx()) + .expect("should choose a backend"); + + let sm0: &ServerMetrics = &backends.backends.get(0).unwrap().metrics.read().unwrap(); + let sm1: &ServerMetrics = &backends.backends.get(1).unwrap().metrics.read().unwrap(); + println!("{:?}, {:?}", sm0, sm1); + assert_eq!(chosen.id, "server-1"); + } + + #[test] + fn choose_none_when_empty() { + let mut b = + AdaptiveWeightBalancer::new(BackendPool::new(vec![]), [0.5, 0.2, 0.2, 0.1], 0.5); + assert!(b.choose_backend(unused_ctx()).is_none()); + } + + #[test] + fn ratio_triggers_immediate_selection() { + // Arrange two servers where server 1 has composite load 0 and server 2 has composite load 100. + // With alpha = 1.0 and two servers, threshold = 1.0 * (r_sum / w_sum) = 1.0 * (100 / 2) = 50. + // Server 0 ratio = 0 / 1 = 0 <= 50 so it should be chosen immediately. + let backends = BackendPool::new(vec![ + backend_factory("server-0", "127.0.0.1", 3000), + backend_factory("server-1", "127.0.0.1", 3001), + ]); + let mut b = AdaptiveWeightBalancer::new(backends.clone(), [0.25, 0.25, 0.25, 0.25], 1.0); + + { + let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap(); + sm0_guard.update(0.0, 0.0, 0.0, 0.0); + } + { + let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap(); + sm1_guard.update(100.0, 100.0, 100.0, 100.0); + } + + let chosen = b + .choose_backend(unused_ctx()) + .expect("should choose a backend"); + assert_eq!(chosen.id, "server-0"); + } + + #[test] + fn choose_min_current_load_when_no_ratio() { + // Arrange three servers with identical composite loads so no server satisfies Ri/Wi <= threshold + // (set alpha < 1 so threshold < ratio). The implementation then falls back to picking the + // server with minimum current_load + let backends = BackendPool::new(vec![ + backend_factory("server-0", "127.0.0.1", 3000), + backend_factory("server-1", "127.0.0.1", 3001), + backend_factory("server-2", "127.0.0.1", 3002), + ]); + + // set current_loads (field expected to be public) + + { + let mut sm0_guard = backends.backends.get(0).unwrap().metrics.write().unwrap(); + sm0_guard.update(10.0, 10.0, 10.0, 10.0); + } + { + let mut sm1_guard = backends.backends.get(1).unwrap().metrics.write().unwrap(); + sm1_guard.update(5.0, 5.0, 5.0, 5.0); + } + { + let mut sm2_guard = backends.backends.get(2).unwrap().metrics.write().unwrap(); + sm2_guard.update(20.0, 20.0, 20.0, 20.0); + } + + // Use coeffs that only consider CPU so composite load is easy to reason about. + let mut bal = AdaptiveWeightBalancer::new(backends.clone(), [1.0, 0.0, 0.0, 0.0], 0.5); + + // set identical composite loads > 0 for all so ratio = x and threshold = alpha * x < x + // you will have threshold = 25 for all 3 backend servers and ratio = 50 + // so that forces to choose the smallest current load backend + + let chosen = bal + .choose_backend(unused_ctx()) + .expect("should choose a backend"); + // expect server with smallest current_load server-1 + assert_eq!(chosen.id, "server-1"); + } +} \ No newline at end of file diff --git a/src/balancer/ip_hashing.rs b/src/balancer/ip_hashing.rs index 2cc5318..f48a776 100644 --- a/src/balancer/ip_hashing.rs +++ b/src/balancer/ip_hashing.rs @@ -1,7 +1,7 @@ use crate::backend::{Backend, BackendPool}; use crate::balancer::{Balancer, ConnectionInfo}; use std::hash::{DefaultHasher, Hash, Hasher}; -use std::sync::{Arc}; +use std::sync::Arc; #[derive(Debug)] pub struct SourceIPHash { @@ -30,8 +30,9 @@ impl Balancer for SourceIPHash { #[cfg(test)] mod tests { use super::*; + use crate::backend::health::ServerMetrics; use std::net::IpAddr; - use crate::backend::ServerMetrics; + use std::sync::RwLock; fn create_dummy_backends(count: usize) -> BackendPool { let mut backends = Vec::new(); @@ -105,4 +106,4 @@ mod tests { assert!(distribution[1] > 0, "Backend 1 received no traffic"); assert!(distribution[2] > 0, "Backend 2 received no traffic"); } -} \ No newline at end of file +} diff --git a/src/balancer/round_robin.rs b/src/balancer/round_robin.rs index e0d60d1..83680ea 100644 --- a/src/balancer/round_robin.rs +++ b/src/balancer/round_robin.rs @@ -1,7 +1,7 @@ use crate::backend::{Backend, BackendPool}; use crate::balancer::{Balancer, ConnectionInfo}; use std::fmt::Debug; -use std::sync::{Arc}; +use std::sync::Arc; // only the main thread for receiving connections should be // doing the load balancing. alternatively, each thread diff --git a/src/config/loader.rs b/src/config/loader.rs index 1a6f458..f597d00 100644 --- a/src/config/loader.rs +++ b/src/config/loader.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; -use crate::backend::*; use crate::backend::health::*; +use crate::backend::*; use crate::balancer::Balancer; use crate::balancer::adaptive_weight::AdaptiveWeightBalancer; use crate::balancer::round_robin::RoundRobinBalancer; @@ -19,23 +19,26 @@ pub type PortListeners = HashMap; fn parse_client(s: &str) -> Result<(IpCidr, u16), String> { // just splits "0.0.0.0/0:80" into ("0.0.0.0/0", 80) - let (ip_part, port_part) = s.rsplit_once(':') + let (ip_part, port_part) = s + .rsplit_once(':') .ok_or_else(|| format!("badly formatted client: {}", s))?; - let port = port_part.parse() - .map_err(|_| format!("bad port: {}", s))?; - let cidr = ip_part.parse() - .map_err(|_| format!("bad ip/mask: {}", s))?; + let port = port_part.parse().map_err(|_| format!("bad port: {}", s))?; + let cidr = ip_part.parse().map_err(|_| format!("bad ip/mask: {}", s))?; Ok((cidr, port)) } -pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap>>), String> { +pub fn build_lb( + config: &AppConfig, +) -> Result<(PortListeners, HashMap>>), String> { let mut healths: HashMap>> = HashMap::new(); let mut backends: HashMap> = HashMap::new(); - for backend_cfg in config.backends { - let ip: IpAddr = backend_cfg.ip.parse() + for backend_cfg in &config.backends { + let ip: IpAddr = backend_cfg + .ip + .parse() .map_err(|_| format!("bad ip: {}", backend_cfg.ip))?; let addr = SocketAddr::new(ip, backend_cfg.port); @@ -46,12 +49,12 @@ pub fn build_lb(config: AppConfig) -> Result<(PortListeners, HashMap Result<(PortListeners, HashMap> = HashMap::new(); - for client_def in rule.clients { + for client_def in &rule.clients { let (cidr, port) = parse_client(&client_def)?; port_groups.entry(port).or_default().push(cidr); } diff --git a/src/config/mod.rs b/src/config/mod.rs index e65f437..4b63927 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -16,11 +16,27 @@ pub mod loader; use serde::Deserialize; use std::collections::HashMap; +fn default_healthcheck_addr() -> String { + "0.0.0.0:8080".to_string() +} + +fn default_iperf_addr() -> String { + "0.0.0.0:5201".to_string() +} + #[derive(Debug, Deserialize)] pub struct AppConfig { + #[serde(default = "default_healthcheck_addr")] + pub healthcheck_addr: String, + + #[serde(default = "default_iperf_addr")] + pub iperf_addr: String, + pub backends: Vec, + #[serde(default)] pub clusters: HashMap>, + pub rules: Vec, } diff --git a/src/main.rs b/src/main.rs index 7c90f86..9f6bc8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,115 +3,35 @@ mod balancer; mod config; mod proxy; -use std::collections::HashMap; -use crate::balancer::{ConnectionInfo}; +use crate::backend::health::{start_healthcheck_listener, start_iperf_server, ServerMetrics}; +use crate::balancer::ConnectionInfo; +use crate::config::loader::{build_lb, RoutingTable}; use crate::proxy::tcp::proxy_tcp_connection; -use std::fs::File; -use std::path::PathBuf; -use std::net::IpAddr; -use std::sync::atomic::{AtomicU64, Ordering}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt}; -use serde_json::Value; +use anywho::Error; use std::collections::HashMap; -use std::net::{IpAddr}; -use std::sync::{Arc, RwLock}; -use crate::backend::health::ServerMetrics; -use rperf3::{Server, Config}; -use std::io::Read; -use std::io::{BufRead, BufReader}; - -static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); - -async fn start_iperf_server() -> Result<(), Box> { - let config = Config::server(5001); - let server = Server::new(config); - server.run().await?; - Ok(()) -} - -async fn handle_metrics_stream(stream: TcpStream, healths: &HashMap>>) -> std::io::Result<()> { - let server_ip = stream.peer_addr()?.ip(); - let mut reader = tokio::io::BufReader::new(stream); - let mut line = String::new(); - - loop { - line.clear(); - - match reader.read_line(&mut line).await { - Ok(0) => break, - Ok(_) => { - if let Err(e) = process_metrics(server_ip, &line, healths) { - eprintln!("skipping invalid packet: {}", e); - } - } - Err(e) => { - eprintln!("connection error: {}", e); - break; - } - } - } - Ok(()) -} - -fn process_metrics(server_ip: IpAddr, json_str: &str, healths: &HashMap>>) -> Result<(), String> { - let parsed: Value = serde_json::from_str(json_str) - .map_err(|e| format!("parse error: {}", e))?; - - let metrics_lock = healths.get(&server_ip) - .ok_or_else(|| format!("unknown server: {}", server_ip))?; - - let get_f64 = |key: &str| -> Result { - parsed.get(key) - .and_then(|v| v.as_f64()) - .ok_or_else(|| format!("invalid '{}'", key)) - }; - - if let Ok(mut guard) = metrics_lock.write() { - guard.update( - get_f64("cpu")?, - get_f64("mem")?, - get_f64("net")?, - get_f64("io")?, - ); - } - - Ok(()) -} - -async fn start_healthcheck_listener(addr: &str, healths: HashMap>>) -> std::io::Result<()> { - let listener = TcpListener::bind(addr).await?; - println!("TCP server listening on {}", addr); - loop { - let (stream, remote_addr) = match listener.accept().await { - Ok(v) => v, - Err(e) => { - continue; - } - }; - - if let Err(e) = handle_metrics_stream(stream, &healths).await { - eprintln!("connection handler error: {}", e); - } - } - - Ok(()) +use std::fs::File; +use std::hash::Hash; +use std::net::{IpAddr, SocketAddr}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; -use anywho::Error; +use tokio::io::AsyncBufReadExt; use tokio::net::TcpListener; use tokio::sync::mpsc; -use crate::backend::ServerMetrics; -use crate::config::loader::{build_lb, RoutingTable}; - -use notify::{Watcher, RecursiveMode, Event}; use clap::Parser; +use notify::{Event, RecursiveMode, Watcher}; +use std::cmp; static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1); struct ProgramState { tx_rt_map: HashMap>, healths: HashMap>>, + health_listener: Option>, + iperf_server: Option>, + health_listener_addr: Option, + iperf_server_addr: Option, } #[derive(Parser, Debug)] @@ -135,6 +55,10 @@ async fn main() -> Result<(), Box> { let state = Arc::new(Mutex::new(ProgramState { tx_rt_map: HashMap::new(), healths: HashMap::new(), + health_listener: None, + iperf_server: None, + health_listener_addr: None, + iperf_server_addr: None, })); if let Err(e) = load_config(&args.config, state.clone()).await { @@ -143,48 +67,52 @@ async fn main() -> Result<(), Box> { let config_path = args.config.clone(); let state_clone = state.clone(); - - handles.push( - tokio::spawn(async { - start_healthcheck_listener("0.0.0.0:8080", healths).await.unwrap(); - }) - ); - - handles.push( - tokio::spawn(async { - start_iperf_server().await; - }) - ); tokio::spawn(async move { let (tx, mut rx) = mpsc::channel(1); - + let mut watcher = notify::recommended_watcher(move |res: Result| { if let Ok(event) = res { if event.kind.is_modify() { let _ = tx.blocking_send(()); } } - }).unwrap(); + }) + .unwrap(); - watcher.watch(&config_path, RecursiveMode::NonRecursive).unwrap(); + watcher + .watch(&config_path, RecursiveMode::NonRecursive) + .unwrap(); println!("watching for changes to {:?}", config_path); while rx.recv().await.is_some() { + // for some reason, saving on certain text editors fires several events, + // and this causes us to reload a lot. try to flush some events, add a tiny delay + // to mitigate this + + while rx.try_recv().is_ok() {} + tokio::time::sleep(Duration::from_millis(50)).await; + while rx.try_recv().is_ok() {} + if let Err(e) = load_config(&config_path, state_clone.clone()).await { eprintln!("loading config failed: {}", e); } } }); - loop { tokio::time::sleep(Duration::from_hours(1)).await; } + loop { + tokio::time::sleep(Duration::from_hours(1)).await; + } } async fn load_config(path: &PathBuf, state: Arc>) -> Result<(), Error> { let f = File::open(path)?; let app_config: config::AppConfig = match serde_saphyr::from_reader(f) { Ok(app_config) => app_config, - Err(e) => { eprintln!("error parsing config {}", e); return Ok(()); } + Err(e) => { + eprintln!("error parsing config {}", e); + return Ok(()); + } }; println!( @@ -193,7 +121,7 @@ async fn load_config(path: &PathBuf, state: Arc>) -> Result< app_config.rules.len() ); - let (mut listeners, health_monitors) = match build_lb(app_config) { + let (mut listeners, health_monitors) = match build_lb(&app_config) { Ok(v) => v, Err(e) => { eprintln!("config has logical errors: {}", e); @@ -202,7 +130,8 @@ async fn load_config(path: &PathBuf, state: Arc>) -> Result< }; let mut prog_state = state.lock().unwrap(); - let ports_to_remove: Vec = prog_state.tx_rt_map + let ports_to_remove: Vec = prog_state + .tx_rt_map .keys() .cloned() .filter(|port| !listeners.contains_key(port)) @@ -212,6 +141,38 @@ async fn load_config(path: &PathBuf, state: Arc>) -> Result< prog_state.tx_rt_map.remove(&port); } + if let Some(handle) = prog_state.health_listener.take() { + handle.abort(); + } + let health_map: HashMap>> = health_monitors.clone(); + let health_addr = app_config.healthcheck_addr.clone(); + let health_addr_c = health_addr.clone(); + let health_handle = tokio::spawn(async move { + if let Err(e) = start_healthcheck_listener(&health_addr, health_map).await { + eprintln!("health check listener failed: {}", e); + } + }); + prog_state.health_listener = Some(health_handle); + prog_state.health_listener_addr = Some(health_addr_c); + + // maybe restart iperf server + let iperf_addr = app_config.iperf_addr.clone(); + if prog_state.iperf_server_addr.as_ref() != Some(&iperf_addr) { + if let Some(handle) = prog_state.iperf_server.take() { + handle.abort(); + } + + let iperf_addr_c = iperf_addr.clone(); + let iperf_handle = tokio::spawn(async move { + if let Err(e) = start_iperf_server(iperf_addr.as_str()).await { + eprintln!("iperf server failed: {}", e); + } + }); + + prog_state.iperf_server = Some(iperf_handle); + prog_state.iperf_server_addr = Some(iperf_addr_c); + } + prog_state.healths = health_monitors; for (port, routing_table) in listeners.drain() { if let Some(x) = prog_state.tx_rt_map.get_mut(&port) { @@ -232,7 +193,7 @@ async fn load_config(path: &PathBuf, state: Arc>) -> Result< async fn run_listener( port: u16, mut rx_rt: mpsc::UnboundedReceiver, - mut current_table: RoutingTable + mut current_table: RoutingTable, ) { let addr = format!("0.0.0.0:{}", port); println!("Starting tcp listener on {}", addr);