From 3c2b531fc2fb50fc1d4bd7d4764d4f8361f25c2f Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 17 Jan 2024 15:35:33 +0000 Subject: [PATCH] feat: [#625] a new UDP tracker client You can use it with: ``` cargo run --bin udp_tracker_client 144.126.245.19:6969 9c38422213e30bff212b30c360d26f9a02136422 ``` --- src/bin/udp_tracker_client.rs | 110 +++++++++++++++++++ src/shared/bit_torrent/tracker/udp/client.rs | 32 +++++- tests/servers/udp/contract.rs | 2 + 3 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 src/bin/udp_tracker_client.rs diff --git a/src/bin/udp_tracker_client.rs b/src/bin/udp_tracker_client.rs new file mode 100644 index 000000000..5f7195887 --- /dev/null +++ b/src/bin/udp_tracker_client.rs @@ -0,0 +1,110 @@ +use std::env; +use std::net::Ipv4Addr; + +use aquatic_udp_protocol::common::InfoHash; +use aquatic_udp_protocol::{ + AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response, + TransactionId, +}; +use log::{debug, LevelFilter}; +use torrust_tracker::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient}; + +#[tokio::main] +async fn main() { + stdout_config(LevelFilter::Info); + + let args: Vec = env::args().collect(); + + if args.len() != 3 { + eprintln!("Error: invalid number of arguments!"); + eprintln!("Usage: cargo run --bin udp_tracker_client "); + eprintln!("Example: cargo run --bin udp_tracker_client 144.126.245.19:6969 9c38422213e30bff212b30c360d26f9a02136422"); + std::process::exit(1); + } + + let port = 0; + let transaction_id = -888_840_697; + + let remote_socket_addr = &args[1]; + + let bind_to = format!("0.0.0.0:{port}"); + + debug!("Binding to: {bind_to}"); + + let udp_client = UdpClient::bind(&bind_to).await; + + let bound_to = udp_client.socket.local_addr().unwrap(); + + debug!("Bound to: {bound_to}"); + + debug!("Connecting to remote: udp://{remote_socket_addr}"); + + udp_client.connect(remote_socket_addr).await; + + let udp_tracker_client = UdpTrackerClient { udp_client }; + + let connection_id = send_connection_request(TransactionId(transaction_id), &udp_tracker_client).await; + + // Send announce request + + let announce_request = AnnounceRequest { + connection_id: ConnectionId(connection_id.0), + transaction_id: TransactionId(transaction_id), + info_hash: InfoHash([0u8; 20]), + peer_id: PeerId([255u8; 20]), + bytes_downloaded: NumberOfBytes(0i64), + bytes_uploaded: NumberOfBytes(0i64), + bytes_left: NumberOfBytes(0i64), + event: AnnounceEvent::Started, + ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)), + key: PeerKey(0u32), + peers_wanted: NumberOfPeers(1i32), + port: Port(bound_to.port()), + }; + + debug!("Sending announce request with connection id: {connection_id:#?}"); + + udp_tracker_client.send(announce_request.into()).await; + + let response = udp_tracker_client.receive().await; + + println!("response: {response:#?}"); +} + +async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId { + debug!("Sending connection request with transaction id: {transaction_id:#?}"); + + let connect_request = ConnectRequest { transaction_id }; + + client.send(connect_request.into()).await; + + let response = client.receive().await; + + debug!("response: {response:#?}"); + + match response { + Response::Connect(connect_response) => connect_response.connection_id, + _ => panic!("error connecting to udp server. Unexpected response"), + } +} + +fn stdout_config(level: LevelFilter) { + if let Err(_err) = fern::Dispatch::new() + .format(|out, message, record| { + out.finish(format_args!( + "{} [{}][{}] {}", + chrono::Local::now().format("%+"), + record.target(), + record.level(), + message + )); + }) + .level(level) + .chain(std::io::stdout()) + .apply() + { + panic!("Failed to initialize logging.") + } + + debug!("logging initialized."); +} diff --git a/src/shared/bit_torrent/tracker/udp/client.rs b/src/shared/bit_torrent/tracker/udp/client.rs index 00f0b8acf..959001e82 100644 --- a/src/shared/bit_torrent/tracker/udp/client.rs +++ b/src/shared/bit_torrent/tracker/udp/client.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId}; +use log::debug; use tokio::net::UdpSocket; use tokio::time; @@ -19,7 +20,12 @@ impl UdpClient { /// /// Will panic if the local address can't be bound. pub async fn bind(local_address: &str) -> Self { - let socket = UdpSocket::bind(local_address).await.unwrap(); + let valid_socket_addr = local_address + .parse::() + .unwrap_or_else(|_| panic!("{local_address} is not a valid socket address")); + + let socket = UdpSocket::bind(valid_socket_addr).await.unwrap(); + Self { socket: Arc::new(socket), } @@ -29,7 +35,14 @@ impl UdpClient { /// /// Will panic if can't connect to the socket. pub async fn connect(&self, remote_address: &str) { - self.socket.connect(remote_address).await.unwrap(); + let valid_socket_addr = remote_address + .parse::() + .unwrap_or_else(|_| panic!("{remote_address} is not a valid socket address")); + + match self.socket.connect(valid_socket_addr).await { + Ok(()) => debug!("Connected successfully"), + Err(e) => panic!("Failed to connect: {e:?}"), + } } /// # Panics @@ -39,6 +52,8 @@ impl UdpClient { /// - Can't write to the socket. /// - Can't send data. pub async fn send(&self, bytes: &[u8]) -> usize { + debug!(target: "UDP client", "send {bytes:?}"); + self.socket.writable().await.unwrap(); self.socket.send(bytes).await.unwrap() } @@ -50,8 +65,15 @@ impl UdpClient { /// - Can't read from the socket. /// - Can't receive data. pub async fn receive(&self, bytes: &mut [u8]) -> usize { + debug!(target: "UDP client", "receiving ..."); + self.socket.readable().await.unwrap(); - self.socket.recv(bytes).await.unwrap() + + let size = self.socket.recv(bytes).await.unwrap(); + + debug!(target: "UDP client", "{size} bytes received {bytes:?}"); + + size } } @@ -73,6 +95,8 @@ impl UdpTrackerClient { /// /// Will panic if can't write request to bytes. pub async fn send(&self, request: Request) -> usize { + debug!(target: "UDP tracker client", "send request {request:?}"); + // Write request into a buffer let request_buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(request_buffer); @@ -99,6 +123,8 @@ impl UdpTrackerClient { let payload_size = self.udp_client.receive(&mut response_buffer).await; + debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}"); + Response::from_bytes(&response_buffer[..payload_size], true).unwrap() } } diff --git a/tests/servers/udp/contract.rs b/tests/servers/udp/contract.rs index 9ac585190..0eea650b8 100644 --- a/tests/servers/udp/contract.rs +++ b/tests/servers/udp/contract.rs @@ -118,6 +118,8 @@ mod receiving_an_announce_request { let response = client.receive().await; + println!("test response {response:?}"); + assert!(is_ipv4_announce_response(&response)); } }