diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index d8b984ad..3816131e 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -58,6 +58,11 @@ impl TorrentStatsProvider for () { type Sender = tokio::sync::mpsc::Sender; +enum SupportedTracker { + Udp(Url), + Http(Url), +} + impl TrackerComms { pub fn start( info_hash: Id20, @@ -70,9 +75,16 @@ impl TrackerComms { let trackers = trackers .into_iter() .filter_map(|t| match Url::parse(&t) { - Ok(parsed) => Some(parsed), + Ok(parsed) => match parsed.scheme() { + "http" | "https" => Some(SupportedTracker::Http(parsed)), + "udp" => Some(SupportedTracker::Udp(parsed)), + _ => { + debug!("unsuppoted tracker URL: {}", t); + None + } + }, Err(e) => { - debug!("error parsing tracker URL: {}", e); + debug!("error parsing tracker URL {}: {}", t, e); None } }) @@ -85,7 +97,6 @@ impl TrackerComms { let s = async_stream::stream! { use futures::StreamExt; - let mut rx_done = false; let comms = Arc::new(Self { info_hash, peer_id, @@ -96,19 +107,13 @@ impl TrackerComms { }); let mut futures = FuturesUnordered::new(); for tracker in trackers { - if let Ok(fut) = comms.add_tracker(tracker) { - futures.push(fut); - } + futures.push(comms.add_tracker(tracker)) } - if futures.is_empty() { - return; - } - while !(futures.is_empty() && rx_done) { + while !(futures.is_empty()) { tokio::select! { - addr = rx.recv(), if !rx_done => { - match addr { - Some(addr) => yield addr, - None => rx_done = true + addr = rx.recv() => { + if let Some(addr) = addr { + yield addr; } } e = futures.next(), if !futures.is_empty() => { @@ -125,34 +130,30 @@ impl TrackerComms { fn add_tracker( &self, - url: Url, - ) -> anyhow::Result< - Either< - impl std::future::Future> + '_ + Send, - impl std::future::Future> + '_ + Send, - >, + url: SupportedTracker, + ) -> Either< + impl std::future::Future> + '_ + Send, + impl std::future::Future> + '_ + Send, > { let info_hash = self.info_hash; - if url.scheme() == "http" || url.scheme() == "https" { - let span = error_span!( - parent: None, - "http_tracker", - tracker = %url, - info_hash = ?info_hash - ); - Ok(self - .task_single_tracker_monitor_http(url) - .instrument(span) - .left_future()) - } else if url.scheme() == "udp" { - let span = - error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); - Ok(self - .task_single_tracker_monitor_udp(url) - .instrument(span) - .right_future()) - } else { - bail!("unsupported tracker url {}", url) + match url { + SupportedTracker::Udp(url) => { + let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); + self.task_single_tracker_monitor_udp(url) + .instrument(span) + .right_future() + } + SupportedTracker::Http(url) => { + let span = error_span!( + parent: None, + "http_tracker", + tracker = %url, + info_hash = ?info_hash + ); + self.task_single_tracker_monitor_http(url) + .instrument(span) + .left_future() + } } }