Skip to content

Commit

Permalink
Nothing
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Feb 27, 2024
1 parent 3cecb1a commit 6fafdc1
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions crates/tracker_comms/src/tracker_comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ impl TorrentStatsProvider for () {

type Sender = tokio::sync::mpsc::Sender<SocketAddr>;

enum SupportedTracker {
Udp(Url),
Http(Url),
}

impl TrackerComms {
pub fn start(
info_hash: Id20,
Expand All @@ -70,9 +75,16 @@ impl TrackerComms {
let trackers = trackers
.into_iter()
.filter_map(|t| match Url::parse(&t) {
Ok(parsed) => Some(parsed),
Ok(parsed) => match parsed.scheme() {
"http" | "https" => Some(SupportedTracker::Http(parsed)),
"udp" => Some(SupportedTracker::Udp(parsed)),
_ => {
debug!("unsuppoted tracker URL: {}", t);
None
}
},
Err(e) => {
debug!("error parsing tracker URL: {}", e);
debug!("error parsing tracker URL {}: {}", t, e);
None
}
})
Expand All @@ -85,7 +97,6 @@ impl TrackerComms {

let s = async_stream::stream! {
use futures::StreamExt;
let mut rx_done = false;
let comms = Arc::new(Self {
info_hash,
peer_id,
Expand All @@ -96,19 +107,13 @@ impl TrackerComms {
});
let mut futures = FuturesUnordered::new();
for tracker in trackers {
if let Ok(fut) = comms.add_tracker(tracker) {
futures.push(fut);
}
futures.push(comms.add_tracker(tracker))
}
if futures.is_empty() {
return;
}
while !(futures.is_empty() && rx_done) {
while !(futures.is_empty()) {
tokio::select! {
addr = rx.recv(), if !rx_done => {
match addr {
Some(addr) => yield addr,
None => rx_done = true
addr = rx.recv() => {
if let Some(addr) = addr {
yield addr;
}
}
e = futures.next(), if !futures.is_empty() => {
Expand All @@ -125,34 +130,30 @@ impl TrackerComms {

fn add_tracker(
&self,
url: Url,
) -> anyhow::Result<
Either<
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
>,
url: SupportedTracker,
) -> Either<
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
> {
let info_hash = self.info_hash;
if url.scheme() == "http" || url.scheme() == "https" {
let span = error_span!(
parent: None,
"http_tracker",
tracker = %url,
info_hash = ?info_hash
);
Ok(self
.task_single_tracker_monitor_http(url)
.instrument(span)
.left_future())
} else if url.scheme() == "udp" {
let span =
error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash);
Ok(self
.task_single_tracker_monitor_udp(url)
.instrument(span)
.right_future())
} else {
bail!("unsupported tracker url {}", url)
match url {
SupportedTracker::Udp(url) => {
let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash);
self.task_single_tracker_monitor_udp(url)
.instrument(span)
.right_future()
}
SupportedTracker::Http(url) => {
let span = error_span!(
parent: None,
"http_tracker",
tracker = %url,
info_hash = ?info_hash
);
self.task_single_tracker_monitor_http(url)
.instrument(span)
.left_future()
}
}
}

Expand Down

0 comments on commit 6fafdc1

Please sign in to comment.