diff --git a/src/main.rs b/src/main.rs index 1903762..c2925ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ mod netutils; use anywho::Error; -use netutils::{Backend, proxy, proxy_connection}; +use netutils::{Backend, tunnel}; use std::sync::Arc; use tokio::net::TcpListener; @@ -16,32 +16,26 @@ async fn main() -> Result<(), Error> { let current_index = Arc::new(Mutex::new(0)); - println!("lb starting on 0.0.0.0:8080"); + println!("enginewhy starting on 0.0.0.0:8080"); println!("backends: {:?}", backends); let listener = TcpListener::bind("0.0.0.0:8080").await?; loop { - let (socket, addr) = listener.accept().await?; + let (client, addr) = listener.accept().await?; println!("info: new connection from {}", addr); - let backends_clone = backends.clone(); - let index_clone = current_index.clone(); + let backend = { + let mut index = current_index.lock().await; + let selected_backend = backends[*index].clone(); + *index = (*index + 1) % backends.len(); + selected_backend + }; - tokio::spawn(async move { - // Round Robin - let backend = { - let mut index = index_clone.lock().await; - let selected_backend = backends_clone[*index].clone(); - *index = (*index + 1) % backends_clone.len(); - selected_backend - }; + println!("info: routing client {} to backend {}", addr, backend); - println!("info: routing client {} to backend {}", addr, backend); - - if let Err(e) = proxy_connection(socket, &backend).await { - eprintln!("error: proxy failed for {}: {}", addr, e); - } - }); + if let Err(e) = tunnel(client, backend).await { + eprintln!("error: proxy failed for {}: {}", addr, e); + } } } diff --git a/src/netutils.rs b/src/netutils.rs index 1914e08..87d5f24 100644 --- a/src/netutils.rs +++ b/src/netutils.rs @@ -1,6 +1,6 @@ use std::fmt; use tokio::io; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpStream; use std::error::Error; @@ -21,57 +21,43 @@ impl fmt::Display for Backend { } } -pub async fn proxy(client_addr: String, server_addr: String) -> Result<(), Box> { - let listener = TcpListener::bind(client_addr.clone()).await?; - println!("Opened {client_addr} -> {server_addr}"); - loop { - let server_addr = server_addr.clone(); - let (client, _) = listener.accept().await?; - tokio::spawn(async move { - let server = TcpStream::connect(server_addr).await.unwrap(); +pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box> { + let backend_address: String = backend.address.clone(); + tokio::spawn(async move { + let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await { + Ok(s) => { + println!("Connected to backend {backend_address}"); + s + } + Err(e) => { + eprintln!("Failed connecting to backend {backend_address}: {e}"); + return; + } + }; - let (mut read_client, mut write_client) = client.into_split(); - let (mut read_server, mut write_server) = server.into_split(); + let (mut read_client, mut write_client) = client_stream.into_split(); + let (mut read_backend, mut write_backend) = backend_stream.into_split(); - let forward_to_server = - tokio::spawn(async move { io::copy(&mut read_client, &mut write_server).await }); - - let forward_to_client = - tokio::spawn(async move { io::copy(&mut read_server, &mut write_client).await }); - - let _ = tokio::join!(forward_to_server, forward_to_client); + let client_to_backend = tokio::spawn(async move { + match io::copy(&mut read_client, &mut write_backend) + .await + .unwrap() + { + n => println!("{n}B ==> backend"), + } }); - } -} -pub async fn proxy_connection( - client_stream: TcpStream, - backend: &Backend, -) -> Result<(), io::Error> { - let log_error = |e| { - eprintln!("error: something went wrong {}", e); - e - }; + let backend_to_client = tokio::spawn(async move { + match io::copy(&mut read_backend, &mut write_client) + .await + .unwrap() + { + n => println!("{n}B ==> client"), + } + }); - println!("info: connecting to backend {}", backend); - let backend_stream = TcpStream::connect(&backend.address) - .await - .map_err(log_error)?; - println!( - "info: the bluetooth device is connected successfully {}", - backend - ); - - let (mut client_read, mut client_write) = client_stream.into_split(); - let (mut backend_read, mut backend_write) = backend_stream.into_split(); - - let client_to_backend = io::copy(&mut client_read, &mut backend_write); - let backend_to_client = io::copy(&mut backend_read, &mut client_write); - - let _ = tokio::select! { - res_a = client_to_backend => res_a.map_err(log_error)?, - res_b = backend_to_client => res_b.map_err(log_error)?, - }; + let _ = tokio::join!(client_to_backend, backend_to_client); + }); Ok(()) }