Skip to content

Commit

Permalink
protocols/mdns: Optimise InterfaceState::poll for low latency (#2939)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger authored Oct 4, 2022
1 parent 1b79324 commit a905a36
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 130 deletions.
5 changes: 4 additions & 1 deletion protocols/mdns/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
- Update to `libp2p-core` `v0.37.0`.

- Update to `libp2p-swarm` `v0.40.0`.


- Fix a bug that could cause a delay of ~10s until peers would get discovered when using the tokio runtime. See [PR 2939].

[PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918
[PR 2939]: https://github.com/libp2p/rust-libp2p/pull/2939

# 0.40.0

Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where
// Emit discovered event.
let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
for iface_state in self.iface_states.values_mut() {
while let Some((peer, addr, expiration)) = iface_state.poll(cx, params) {
while let Poll::Ready((peer, addr, expiration)) = iface_state.poll(cx, params) {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
Expand Down
165 changes: 80 additions & 85 deletions protocols/mdns/src/behaviour/iface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use self::dns::{build_query, build_query_response, build_service_discovery_respo
use self::query::MdnsPacket;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig;
use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::PollParameters;
use socket2::{Domain, Socket, Type};
use std::{
collections::VecDeque,
io, iter,
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -145,106 +145,101 @@ where
self.timeout = T::interval_at(Instant::now(), self.query_interval);
}

fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
log::trace!("received packet on iface {} {:?}", self.addr, packet);
match packet {
MdnsPacket::Query(query) => {
self.reset_timer();
log::trace!("sending response on iface {}", self.addr);
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
) {
self.send_buffer.push_back(packet);
}
pub fn poll(
&mut self,
cx: &mut Context,
params: &impl PollParameters,
) -> Poll<(PeerId, Multiaddr, Instant)> {
loop {
// 1st priority: Low latency: Create packet ASAP after timeout.
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
}
MdnsPacket::Response(response) => {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(response.remote_addr().ip());
let obs_port = Protocol::Udp(response.remote_addr().port());
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();

for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
// 2nd priority: Keep local buffers small: Send packets to remote.
if let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => {
log::trace!("sent packet on iface {}", self.addr);
continue;
}

let new_expiration = Instant::now() + peer.ttl();

for addr in peer.addresses() {
if let Some(new_addr) = address_translation(addr, &observed) {
self.discovered.push_back((
*peer.id(),
new_addr.clone(),
new_expiration,
));
}

self.discovered
.push_back((*peer.id(), addr.clone(), new_expiration));
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
continue;
}
Poll::Pending => {
self.send_buffer.push_front(packet);
}
}
}
MdnsPacket::ServiceDiscovery(disc) => {
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
self.send_buffer.push_back(resp);

// 3rd priority: Keep local buffers small: Return discovered addresses.
if let Some(discovered) = self.discovered.pop_front() {
return Poll::Ready(discovered);
}
}
}

pub fn poll(
&mut self,
cx: &mut Context,
params: &impl PollParameters,
) -> Option<(PeerId, Multiaddr, Instant)> {
// Poll receive socket.
while let Poll::Ready(data) =
Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer)
{
match data {
Ok((len, from)) => {
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{
self.inject_mdns_packet(packet, params);
}
// 4th priority: Remote work: Answer incoming requests.
match Pin::new(&mut self.recv_socket)
.poll_read(cx, &mut self.recv_buffer)
.map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
{
Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
self.reset_timer();
log::trace!(
"received query from {} on {}",
query.remote_addr(),
self.addr
);

self.send_buffer.extend(build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
));
continue;
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No more bytes available on the socket to read
break;
Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
log::trace!(
"received response from {} on {}",
response.remote_addr(),
self.addr
);

self.discovered.extend(
response.extract_discovered(Instant::now(), *params.local_peer_id()),
);
continue;
}
Err(err) => {
log::error!("failed reading datagram: {}", err);
Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
log::trace!(
"received service discovery from {} on {}",
disc.remote_addr(),
self.addr
);

self.send_buffer
.push_back(build_service_discovery_response(disc.query_id(), self.ttl));
continue;
}
Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No more bytes available on the socket to read
}
}
}

// Send responses.
while let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr),
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
log::error!("failed reading datagram: {}", err);
}
Poll::Pending => {
self.send_buffer.push_front(packet);
break;
Poll::Ready(Ok(Err(err))) => {
log::debug!("Parsing mdns packet failed: {:?}", err);
}
Poll::Ready(Ok(Ok(None))) | Poll::Pending => {}
}
}

if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
return Poll::Pending;
}

// Emit discovered event.
self.discovered.pop_front()
}
}
111 changes: 68 additions & 43 deletions protocols/mdns/src/behaviour/iface/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use super::dns;
use crate::{META_QUERY_SERVICE, SERVICE_NAME};
use dns_parser::{Packet, RData};
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
PeerId,
};
use std::time::Instant;
use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration};

/// A valid mDNS packet received by the service.
Expand All @@ -39,44 +41,40 @@ pub enum MdnsPacket {
}

impl MdnsPacket {
pub fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option<MdnsPacket> {
match Packet::parse(buf) {
Ok(packet) => {
if packet.header.query {
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
let query = MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
});
Some(query)
} else if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
let discovery = MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
});
Some(discovery)
} else {
None
}
} else {
let resp = MdnsPacket::Response(MdnsResponse::new(packet, from));
Some(resp)
}
}
Err(err) => {
log::debug!("Parsing mdns packet failed: {:?}", err);
None
}
pub fn new_from_bytes(
buf: &[u8],
from: SocketAddr,
) -> Result<Option<MdnsPacket>, dns_parser::Error> {
let packet = Packet::parse(buf)?;

if !packet.header.query {
return Ok(Some(MdnsPacket::Response(MdnsResponse::new(packet, from))));
}

if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
return Ok(Some(MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
})));
}

if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
return Ok(Some(MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
})));
}

Ok(None)
}
}

Expand Down Expand Up @@ -167,18 +165,45 @@ impl MdnsResponse {
MdnsResponse { peers, from }
}

/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
pub fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
pub fn extract_discovered(
&self,
now: Instant,
local_peer_id: PeerId,
) -> impl Iterator<Item = (PeerId, Multiaddr, Instant)> + '_ {
self.discovered_peers()
.filter(move |peer| peer.id() != &local_peer_id)
.flat_map(move |peer| {
let observed = self.observed_address();
let new_expiration = now + peer.ttl();

peer.addresses().iter().filter_map(move |address| {
let new_addr = address_translation(address, &observed)?;

Some((*peer.id(), new_addr, new_expiration))
})
})
}

/// Source address of the packet.
#[inline]
pub fn remote_addr(&self) -> &SocketAddr {
&self.from
}

fn observed_address(&self) -> Multiaddr {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(self.remote_addr().ip());
let obs_port = Protocol::Udp(self.remote_addr().port());

Multiaddr::empty().with(obs_ip).with(obs_port)
}

/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
}
}

impl fmt::Debug for MdnsResponse {
Expand Down

0 comments on commit a905a36

Please sign in to comment.