diff --git a/src/main.rs b/src/main.rs index fde3ec7..adff2c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,48 +1,26 @@ -use std::sync::Arc; -use std::fmt; -use tokio::io; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::Mutex; -use anywho::Error; - -#[derive(Clone, Debug)] -struct Backend { - address: String, +macro_rules! info { + ($($arg:tt)*) => {{ + print!("info: "); + println!($($arg)*); + }}; } -impl fmt::Display for Backend { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.address) - } -} - -impl Backend { - fn new(address: String) -> Self { - Backend { address } - } -} - -async fn proxy_connection(client_stream: TcpStream, backend: &Backend) -> Result<(), io::Error> { - let log_error = |e| { eprintln!("error: something went wrong {}", e); e }; - - println!("info: connecting to backend {}", backend); - let backend_stream = TcpStream::connect(&backend.address).await.map_err(log_error)?; - println!("info: the bluetooth device is connected successfully {}", backend); - - let (mut client_read, mut client_write) = client_stream.into_split(); - let (mut backend_read, mut backend_write) = backend_stream.into_split(); - - let client_to_backend = io::copy(&mut client_read, &mut backend_write); - let backend_to_client = io::copy(&mut backend_read, &mut client_write); - - let _ = tokio::select! { - res_a = client_to_backend => res_a.map_err(log_error)?, - res_b = backend_to_client => res_b.map_err(log_error)?, +macro_rules! error { + ($($arg:tt)*) => { + eprint!("error: "); + eprintln!($($arg)*); }; - - Ok(()) } +mod netutils; + +use anywho::Error; +use netutils::{Backend, tunnel}; +use std::sync::Arc; + +use tokio::net::TcpListener; +use tokio::sync::Mutex; + #[tokio::main] async fn main() -> Result<(), Error> { let backends = Arc::new(vec![ @@ -52,32 +30,26 @@ async fn main() -> Result<(), Error> { let current_index = Arc::new(Mutex::new(0)); - println!("lb starting on 0.0.0.0:8080"); - println!("backends: {:?}", backends); + info!("enginewhy starting on 0.0.0.0:8080"); + info!("backends: {:?}", backends); let listener = TcpListener::bind("0.0.0.0:8080").await?; loop { - let (socket, addr) = listener.accept().await?; - println!("info: new connection from {}", addr); + let (client, addr) = listener.accept().await?; + info!("new connection from {}", addr); - let backends_clone = backends.clone(); - let index_clone = current_index.clone(); + let backend = { + let mut index = current_index.lock().await; + let selected_backend = backends[*index].clone(); + *index = (*index + 1) % backends.len(); + selected_backend + }; - tokio::spawn(async move { - // Round Robin - let backend = { - let mut index = index_clone.lock().await; - let selected_backend = backends_clone[*index].clone(); - *index = (*index + 1) % backends_clone.len(); - selected_backend - }; + info!("routing client {} to backend {}", addr, backend); - println!("info: routing client {} to backend {}", addr, backend); - - if let Err(e) = proxy_connection(socket, &backend).await { - eprintln!("error: proxy failed for {}: {}", addr, e); - } - }); + if let Err(e) = tunnel(client, backend).await { + error!("proxy failed for {}: {}", addr, e); + } } -} \ No newline at end of file +} diff --git a/src/netutils.rs b/src/netutils.rs new file mode 100644 index 0000000..bd40aa4 --- /dev/null +++ b/src/netutils.rs @@ -0,0 +1,52 @@ +use std::fmt; +use tokio::io; +use tokio::net::TcpStream; + +use std::error::Error; + +#[derive(Clone, Debug)] +pub struct Backend { + address: String, +} + +impl Backend { + pub fn new(address: String) -> Self { + Backend { address } + } +} + +impl fmt::Display for Backend { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.address) + } +} + +pub async fn tunnel(client_stream: TcpStream, backend: Backend) -> Result<(), Box> { + let backend_address: String = backend.address.clone(); + + tokio::spawn(async move { + let backend_stream: TcpStream = match TcpStream::connect(&backend_address).await { + Ok(s) => { + info!("connected to backend {backend_address}"); + s + } + Err(e) => { + error!("failed connecting to backend {backend_address}: {e}"); + return; + } + }; + + let (mut read_client, mut write_client) = client_stream.into_split(); + let (mut read_backend, mut write_backend) = backend_stream.into_split(); + + let client_to_backend = + tokio::spawn(async move { io::copy(&mut read_client, &mut write_backend).await }); + + let backend_to_client = + tokio::spawn(async move { io::copy(&mut read_backend, &mut write_client).await }); + + let _ = tokio::join!(client_to_backend, backend_to_client); + }); + + Ok(()) +}