Skip to content

Commit

Permalink
feat(mdns): adaptive initial interval peer discovery
Browse files Browse the repository at this point in the history
Peer discovery with mDNS can be very slow, particularly if the first mDNS query is lost. This patch resolves it by adjusting the timer with an adaptive initial interval. We start with a very short timer (500 ms). If a peer is discovered before the end of the timer, then the timer is reset to the normal query interval value (300s), otherwise the timer's value is multiplied by 2 until it reaches the normal query interval value.

Related: #3323.
Resolves: #3319.

Pull-Request: #3975.
  • Loading branch information
stormshield-pj50 authored May 25, 2023
1 parent 137443b commit 67b26cc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
2 changes: 2 additions & 0 deletions protocols/mdns/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
- Raise MSRV to 1.65.
See [PR 3715].
- Remove deprecated `Mdns` prefixed items. See [PR 3699].
- Faster peer discovery with adaptive initial interval. See [PR 3975].

[PR 3975]: https://github.com/libp2p/rust-libp2p/pull/3975
[PR 3621]: https://github.com/libp2p/rust-libp2p/pull/3621
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3699]: https://github.com/libp2p/rust-libp2p/pull/3699
Expand Down
55 changes: 50 additions & 5 deletions protocols/mdns/src/behaviour/iface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,30 @@ use std::{
time::{Duration, Instant},
};

/// Initial interval for starting probe
const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);

#[derive(Debug, Clone)]
enum ProbeState {
Probing(Duration),
Finished(Duration),
}

impl Default for ProbeState {
fn default() -> Self {
ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
}
}

impl ProbeState {
fn interval(&self) -> &Duration {
match self {
ProbeState::Probing(query_interval) => query_interval,
ProbeState::Finished(query_interval) => query_interval,
}
}
}

/// An mDNS instance for a networking interface. To discover all peers when having multiple
/// interfaces an [`InterfaceState`] is required for each interface.
#[derive(Debug)]
Expand Down Expand Up @@ -67,7 +91,7 @@ pub(crate) struct InterfaceState<U, T> {
discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
/// TTL
ttl: Duration,

probe_state: ProbeState,
local_peer_id: PeerId,
}

Expand Down Expand Up @@ -134,19 +158,22 @@ where
send_buffer: Default::default(),
discovered: Default::default(),
query_interval,
timeout: T::interval_at(Instant::now(), query_interval),
timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
multicast_addr,
ttl: config.ttl,
probe_state: Default::default(),
local_peer_id,
})
}

pub(crate) fn reset_timer(&mut self) {
self.timeout = T::interval(self.query_interval);
log::trace!("reset timer on {:#?} {:#?}", self.addr, self.probe_state);
let interval = *self.probe_state.interval();
self.timeout = T::interval(interval);
}

pub(crate) fn fire_timer(&mut self) {
self.timeout = T::interval_at(Instant::now(), self.query_interval);
self.timeout = T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL);
}

pub(crate) fn poll(
Expand All @@ -159,6 +186,19 @@ where
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
log::trace!("tick on {:#?} {:#?}", self.addr, self.probe_state);

// Stop to probe when the initial interval reach the query interval
if let ProbeState::Probing(interval) = self.probe_state {
let interval = interval * 2;
self.probe_state = if interval >= self.query_interval {
ProbeState::Finished(self.query_interval)
} else {
ProbeState::Probing(interval)
};
}

self.reset_timer();
}

// 2nd priority: Keep local buffers small: Send packets to remote.
Expand Down Expand Up @@ -193,7 +233,6 @@ where
.map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
{
Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
self.reset_timer();
log::trace!(
"received query from {} on {}",
query.remote_addr(),
Expand All @@ -217,6 +256,12 @@ where

self.discovered
.extend(response.extract_discovered(Instant::now(), self.local_peer_id));

// Stop probing when we have a valid response
if !self.discovered.is_empty() {
self.probe_state = ProbeState::Finished(self.query_interval);
self.reset_timer();
}
continue;
}
Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
Expand Down

0 comments on commit 67b26cc

Please sign in to comment.