too much changes, idek anymore
This commit is contained in:
@@ -1 +1,17 @@
|
||||
// Physical server health statistics, used for certain load balancing algorithms
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ServerMetrics {
|
||||
pub cpu: f64,
|
||||
pub mem: f64,
|
||||
pub net: f64,
|
||||
pub io: f64,
|
||||
}
|
||||
|
||||
impl ServerMetrics {
|
||||
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
|
||||
self.cpu = cpu;
|
||||
self.mem = mem;
|
||||
self.net = net;
|
||||
self.io = io;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,24 +5,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
// Physical server health statistics, used for certain load balancing algorithms
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ServerMetrics {
|
||||
pub cpu: f64,
|
||||
pub mem: f64,
|
||||
pub net: f64,
|
||||
pub io: f64,
|
||||
}
|
||||
|
||||
impl ServerMetrics {
|
||||
pub fn update(&mut self, cpu: f64, mem: f64, net: f64, io: f64) {
|
||||
self.cpu = cpu;
|
||||
self.mem = mem;
|
||||
self.net = net;
|
||||
self.io = io;
|
||||
}
|
||||
}
|
||||
use crate::backend::health::ServerMetrics;
|
||||
|
||||
// A possible endpoint for a proxied connection.
|
||||
// Note that multiple may live on the same server, hence the Arc<RwLock<ServerMetric>>
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::backend::{Backend, BackendPool, ServerMetrics};
|
||||
use crate::backend::{Backend, BackendPool};
|
||||
use crate::backend::health::ServerMetrics;
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use rand::prelude::*;
|
||||
use rand::rngs::SmallRng;
|
||||
use std::fmt::Debug;
|
||||
use std::fs::Metadata;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AdaptiveNode {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::backend::{Backend, BackendPool};
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SourceIPHash {
|
||||
|
||||
@@ -1 +1 @@
|
||||
use super::*;
|
||||
// use super::*;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::backend::{Backend, BackendPool};
|
||||
use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc};
|
||||
|
||||
// only the main thread for receiving connections should be
|
||||
// doing the load balancing. alternatively, each thread
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crate::backend::*;
|
||||
use crate::backend::health::*;
|
||||
use crate::balancer::Balancer;
|
||||
use crate::balancer::adaptive_weight::AdaptiveWeightBalancer;
|
||||
use crate::balancer::round_robin::RoundRobinBalancer;
|
||||
@@ -26,7 +27,7 @@ fn parse_client(s: &str) -> (IpCidr, u16) {
|
||||
|
||||
pub type PortListeners = HashMap<u16, RoutingTable>;
|
||||
|
||||
pub fn build_lb(config: AppConfig) -> PortListeners {
|
||||
pub fn build_lb(config: AppConfig) -> (PortListeners, HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) {
|
||||
let mut healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>> = HashMap::new();
|
||||
let mut backends: HashMap<String, Arc<Backend>> = HashMap::new();
|
||||
|
||||
@@ -116,5 +117,5 @@ pub fn build_lb(config: AppConfig) -> PortListeners {
|
||||
.sort_by(|(a, _), (b, _)| a.network_length().cmp(&b.network_length()));
|
||||
}
|
||||
|
||||
listeners
|
||||
(listeners, healths)
|
||||
}
|
||||
|
||||
80
src/main.rs
80
src/main.rs
@@ -7,10 +7,74 @@ use crate::balancer::{Balancer, ConnectionInfo};
|
||||
use crate::proxy::tcp::proxy_tcp_connection;
|
||||
use std::fs::File;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use crate::backend::health::ServerMetrics;
|
||||
use rperf3::{Server, Config};
|
||||
use std::io::Read;
|
||||
|
||||
|
||||
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
async fn start_iperf_server() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = Config::server(5001);
|
||||
let server = Server::new(config);
|
||||
server.run().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_connection(mut stream: TcpStream, healths: &HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
|
||||
loop {
|
||||
let mut buffer = [0u8; 512];
|
||||
let bytes_read = stream.read(&mut buffer).await?;
|
||||
let receiving_data = String::from_utf8((buffer[..bytes_read]).to_vec())
|
||||
.unwrap();
|
||||
|
||||
let parsed_recdata : Value = serde_json::from_str(&receiving_data).unwrap();
|
||||
let server_ip: IpAddr = parsed_recdata["server_ip"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
healths.get(&server_ip)
|
||||
.unwrap()
|
||||
.write()
|
||||
.unwrap()
|
||||
.update(
|
||||
parsed_recdata["cpu"].as_f64().unwrap(),
|
||||
parsed_recdata["mem"].as_f64().unwrap(),
|
||||
parsed_recdata["net"].as_f64().unwrap(),
|
||||
parsed_recdata["io"].as_f64().unwrap(),
|
||||
);
|
||||
}
|
||||
#[warn(unreachable_code)]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_healthcheck_listener(addr: &str, healths: HashMap<IpAddr, Arc<RwLock<ServerMetrics>>>) -> std::io::Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
println!("TCP server listening on {}", addr);
|
||||
loop {
|
||||
let (stream, remote_addr) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = handle_connection(stream, &healths).await {
|
||||
eprintln!("connection handler error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let f = File::open("config.yaml").expect("couldn't open config.yaml");
|
||||
@@ -22,7 +86,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
app_config.rules.len()
|
||||
);
|
||||
|
||||
let listeners = config::loader::build_lb(app_config);
|
||||
let (listeners, healths) = config::loader::build_lb(app_config);
|
||||
|
||||
if listeners.is_empty() {
|
||||
eprintln!("its a lawless land");
|
||||
@@ -31,6 +95,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let mut handles = Vec::new();
|
||||
|
||||
handles.push(
|
||||
tokio::spawn(async {
|
||||
start_healthcheck_listener("127.0.0.1:8080", healths).await.unwrap();
|
||||
})
|
||||
);
|
||||
|
||||
handles.push(
|
||||
tokio::spawn(async {
|
||||
start_iperf_server().await;
|
||||
})
|
||||
);
|
||||
|
||||
for (port, mut routing_table) in listeners {
|
||||
handles.push(tokio::spawn(async move {
|
||||
let addr = format!("0.0.0.0:{}", port);
|
||||
|
||||
Reference in New Issue
Block a user