From 19cd5b7f2a8e276014d5eb89ac8ebcca492a2e0d Mon Sep 17 00:00:00 2001 From: Jeremy Janella Date: Sat, 6 Dec 2025 00:21:53 -0500 Subject: [PATCH 1/3] feat: modularized proxy --- src/main.rs | 50 +++++--------------------------- src/netutils.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 43 deletions(-) create mode 100644 src/netutils.rs diff --git a/src/main.rs b/src/main.rs index fde3ec7..1903762 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,47 +1,11 @@ -use std::sync::Arc; -use std::fmt; -use tokio::io; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::Mutex; +mod netutils; + use anywho::Error; +use netutils::{Backend, proxy, proxy_connection}; +use std::sync::Arc; -#[derive(Clone, Debug)] -struct Backend { - address: String, -} - -impl fmt::Display for Backend { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.address) - } -} - -impl Backend { - fn new(address: String) -> Self { - Backend { address } - } -} - -async fn proxy_connection(client_stream: TcpStream, backend: &Backend) -> Result<(), io::Error> { - let log_error = |e| { eprintln!("error: something went wrong {}", e); e }; - - println!("info: connecting to backend {}", backend); - let backend_stream = TcpStream::connect(&backend.address).await.map_err(log_error)?; - println!("info: the bluetooth device is connected successfully {}", backend); - - let (mut client_read, mut client_write) = client_stream.into_split(); - let (mut backend_read, mut backend_write) = backend_stream.into_split(); - - let client_to_backend = io::copy(&mut client_read, &mut backend_write); - let backend_to_client = io::copy(&mut backend_read, &mut client_write); - - let _ = tokio::select! { - res_a = client_to_backend => res_a.map_err(log_error)?, - res_b = backend_to_client => res_b.map_err(log_error)?, - }; - - Ok(()) -} +use tokio::net::TcpListener; +use tokio::sync::Mutex; #[tokio::main] async fn main() -> Result<(), Error> { @@ -80,4 +44,4 @@ async fn main() -> Result<(), Error> { } }); } -} \ No newline at end of file +} diff --git a/src/netutils.rs b/src/netutils.rs new file mode 100644 index 0000000..1914e08 --- /dev/null +++ b/src/netutils.rs @@ -0,0 +1,77 @@ +use std::fmt; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; + +use std::error::Error; + +#[derive(Clone, Debug)] +pub struct Backend { + address: String, +} + +impl Backend { + pub fn new(address: String) -> Self { + Backend { address } + } +} + +impl fmt::Display for Backend { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.address) + } +} + +pub async fn proxy(client_addr: String, server_addr: String) -> Result<(), Box> { + let listener = TcpListener::bind(client_addr.clone()).await?; + println!("Opened {client_addr} -> {server_addr}"); + loop { + let server_addr = server_addr.clone(); + let (client, _) = listener.accept().await?; + tokio::spawn(async move { + let server = TcpStream::connect(server_addr).await.unwrap(); + + let (mut read_client, mut write_client) = client.into_split(); + let (mut read_server, mut write_server) = server.into_split(); + + let forward_to_server = + tokio::spawn(async move { io::copy(&mut read_client, &mut write_server).await }); + + let forward_to_client = + tokio::spawn(async move { io::copy(&mut read_server, &mut write_client).await }); + + let _ = tokio::join!(forward_to_server, forward_to_client); + }); + } +} + +pub async fn proxy_connection( + client_stream: TcpStream, + backend: &Backend, +) -> Result<(), io::Error> { + let log_error = |e| { + eprintln!("error: something went wrong {}", e); + e + }; + + println!("info: connecting to backend {}", backend); + let backend_stream = TcpStream::connect(&backend.address) + .await + .map_err(log_error)?; + println!( + "info: the bluetooth device is connected successfully {}", + backend + ); + + let (mut client_read, mut client_write) = client_stream.into_split(); + let (mut backend_read, mut backend_write) = backend_stream.into_split(); + + let client_to_backend = io::copy(&mut client_read, &mut backend_write); + let backend_to_client = io::copy(&mut backend_read, &mut client_write); + + let _ = tokio::select! { + res_a = client_to_backend => res_a.map_err(log_error)?, + res_b = backend_to_client => res_b.map_err(log_error)?, + }; + + Ok(()) +} From 606880f92888fea44ba9803d90a91b39fcff6b83 Mon Sep 17 00:00:00 2001 From: Jeremy Janella Date: Sat, 6 Dec 2025 01:29:47 -0500 Subject: [PATCH 2/3] feat: merged repos --- src/main.rs | 32 ++++++++------------ src/netutils.rs | 80 ++++++++++++++++++++----------------------------- 2 files changed, 46 insertions(+), 66 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1903762..c2925ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ mod netutils; use anywho::Error; -use netutils::{Backend, proxy, proxy_connection}; +use netutils::{Backend, tunnel}; use std::sync::Arc; use tokio::net::TcpListener; @@ -16,32 +16,26 @@ async fn main() -> Result<(), Error> { let current_index = Arc::new(Mutex::new(0)); - println!("lb starting on 0.0.0.0:8080"); + println!("enginewhy starting on 0.0.0.0:8080"); println!("backends: {:?}", backends); let listener = TcpListener::bind("0.0.0.0:8080").await?; loop { - let (socket, addr) = listener.accept().await?; + let (client, addr) = listener.accept().await?; println!("info: new connection from {}", addr); - let backends_clone = backends.clone(); - let index_clone = current_index.clone(); + let backend = { + let mut index = current_index.lock().await; + let selected_backend = backends[*index].clone(); + *index = (*index + 1) % backends.len(); + selected_backend + }; - tokio::spawn(async move { - // Round Robin - let backend = { - let mut index = index_clone.lock().await; - let selected_backend = backends_clone[*index].clone(); - *index = (*index + 1) % backends_clone.len(); - selected_backend - }; + println!("info: routing client {} to backend {}", addr, backend); - println!("info: routing client {} to backend {}", addr, backend); - - if let Err(e) = proxy_connection(socket, &backend).await { - eprintln!("error: proxy failed for {}: {}", addr, e); - } - }); + if let Err(e) = tunnel(client, backend).await { + eprintln!("error: proxy failed for {}: {}", addr, e); + } } } diff --git a/src/netutils.rs b/src/netutils.rs index 1914e08..87d5f24 100644 --- a/src/netutils.rs +++ b/src/netutils.rs @@ -1,6 +1,6 @@ use std::fmt; use tokio::io; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpStream; use std::error::Error; @@ -21,57 +21,43 @@ impl fmt::Display for Backend { } } -pub async fn proxy(client_addr: String, server_addr: String) -> Result<(), Box> { - let listener = TcpListener::bind(client_addr.clone()).await?; - println!("Opened {client_addr} -> {server_addr}"); - loop { - let server_addr = server_addr.clone(); - let (client, _) = listener.accept().await?; - tokio::spawn(async move { - let server = TcpStream::connect(server_addr).await.unwrap(); +pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box> { + let backend_address: String = backend.address.clone(); + tokio::spawn(async move { + let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await { + Ok(s) => { + println!("Connected to backend {backend_address}"); + s + } + Err(e) => { + eprintln!("Failed connecting to backend {backend_address}: {e}"); + return; + } + }; - let (mut read_client, mut write_client) = client.into_split(); - let (mut read_server, mut write_server) = server.into_split(); + let (mut read_client, mut write_client) = client_stream.into_split(); + let (mut read_backend, mut write_backend) = backend_stream.into_split(); - let forward_to_server = - tokio::spawn(async move { io::copy(&mut read_client, &mut write_server).await }); - - let forward_to_client = - tokio::spawn(async move { io::copy(&mut read_server, &mut write_client).await }); - - let _ = tokio::join!(forward_to_server, forward_to_client); + let client_to_backend = tokio::spawn(async move { + match io::copy(&mut read_client, &mut write_backend) + .await + .unwrap() + { + n => println!("{n}B ==> backend"), + } }); - } -} -pub async fn proxy_connection( - client_stream: TcpStream, - backend: &Backend, -) -> Result<(), io::Error> { - let log_error = |e| { - eprintln!("error: something went wrong {}", e); - e - }; + let backend_to_client = tokio::spawn(async move { + match io::copy(&mut read_backend, &mut write_client) + .await + .unwrap() + { + n => println!("{n}B ==> client"), + } + }); - println!("info: connecting to backend {}", backend); - let backend_stream = TcpStream::connect(&backend.address) - .await - .map_err(log_error)?; - println!( - "info: the bluetooth device is connected successfully {}", - backend - ); - - let (mut client_read, mut client_write) = client_stream.into_split(); - let (mut backend_read, mut backend_write) = backend_stream.into_split(); - - let client_to_backend = io::copy(&mut client_read, &mut backend_write); - let backend_to_client = io::copy(&mut backend_read, &mut client_write); - - let _ = tokio::select! { - res_a = client_to_backend => res_a.map_err(log_error)?, - res_b = backend_to_client => res_b.map_err(log_error)?, - }; + let _ = tokio::join!(client_to_backend, backend_to_client); + }); Ok(()) } From 4cdf2db0c9068350ad9d4c80d87e4c732c89389e Mon Sep 17 00:00:00 2001 From: Jeremy Janella Date: Sat, 6 Dec 2025 02:12:53 -0500 Subject: [PATCH 3/3] feat: improved logging --- src/main.rs | 24 +++++++++++++++++++----- src/netutils.rs | 25 +++++++------------------ 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index c2925ae..adff2c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,17 @@ +macro_rules! info { + ($($arg:tt)*) => {{ + print!("info: "); + println!($($arg)*); + }}; +} + +macro_rules! error { + ($($arg:tt)*) => { + eprint!("error: "); + eprintln!($($arg)*); + }; +} + mod netutils; use anywho::Error; @@ -16,14 +30,14 @@ async fn main() -> Result<(), Error> { let current_index = Arc::new(Mutex::new(0)); - println!("enginewhy starting on 0.0.0.0:8080"); - println!("backends: {:?}", backends); + info!("enginewhy starting on 0.0.0.0:8080"); + info!("backends: {:?}", backends); let listener = TcpListener::bind("0.0.0.0:8080").await?; loop { let (client, addr) = listener.accept().await?; - println!("info: new connection from {}", addr); + info!("new connection from {}", addr); let backend = { let mut index = current_index.lock().await; @@ -32,10 +46,10 @@ async fn main() -> Result<(), Error> { selected_backend }; - println!("info: routing client {} to backend {}", addr, backend); + info!("routing client {} to backend {}", addr, backend); if let Err(e) = tunnel(client, backend).await { - eprintln!("error: proxy failed for {}: {}", addr, e); + error!("proxy failed for {}: {}", addr, e); } } } diff --git a/src/netutils.rs b/src/netutils.rs index 87d5f24..bd40aa4 100644 --- a/src/netutils.rs +++ b/src/netutils.rs @@ -23,14 +23,15 @@ impl fmt::Display for Backend { pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box> { let backend_address: String = backend.address.clone(); + tokio::spawn(async move { let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await { Ok(s) => { - println!("Connected to backend {backend_address}"); + info!("connected to backend {backend_address}"); s } Err(e) => { - eprintln!("Failed connecting to backend {backend_address}: {e}"); + error!("failed connecting to backend {backend_address}: {e}"); return; } }; @@ -38,23 +39,11 @@ pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Bo let (mut read_client, mut write_client) = client_stream.into_split(); let (mut read_backend, mut write_backend) = backend_stream.into_split(); - let client_to_backend = tokio::spawn(async move { - match io::copy(&mut read_client, &mut write_backend) - .await - .unwrap() - { - n => println!("{n}B ==> backend"), - } - }); + let client_to_backend = + tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await }); - let backend_to_client = tokio::spawn(async move { - match io::copy(&mut read_backend, &mut write_client) - .await - .unwrap() - { - n => println!("{n}B ==> client"), - } - }); + let backend_to_client = + tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await }); let _ = tokio::join!(client_to_backend, backend_to_client); });