Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query for unresolved instances only when needed #157

Merged
merged 8 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
os: [ubuntu-20.04, windows-latest, macos-latest]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.60.0
- uses: dtolnay/rust-toolchain@1.61.0
with:
components: rustfmt, clippy
- name: Run rustfmt and fail if any warnings
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "mdns-sd"
version = "0.10.1"
authors = ["keepsimple <[email protected]>"]
edition = "2018"
rust-version = "1.60.0"
rust-version = "1.61.0"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/keepsimple1/mdns-sd"
documentation = "https://docs.rs/mdns-sd"
Expand Down
4 changes: 0 additions & 4 deletions src/dns_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ impl DnsRecord {
}
}

pub(crate) fn get_created(&self) -> u64 {
self.created
}

pub(crate) fn get_expire_time(&self) -> u64 {
self.expires
}
Expand Down
103 changes: 78 additions & 25 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Service daemon for mDNS Service Discovery.

// What DNS-based Service Discovery works in a nutshell:
// How DNS-based Service Discovery works in a nutshell:
//
// (excerpt from RFC 6763)
// .... that a particular service instance can be
Expand Down Expand Up @@ -73,6 +73,8 @@ const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);

const RESOLVE_WAIT_IN_MILLIS: u64 = 500;

/// Response status code for the service `unregister` call.
#[derive(Debug)]
pub enum UnregisterStatus {
Expand Down Expand Up @@ -475,10 +477,6 @@ impl ServiceDaemon {
}
}

// Send out additional queries for unresolved instances, where
// the early responses did not have SRV records.
zc.query_missing_srv();

// process commands from the command channel
while let Ok(command) = receiver.try_recv() {
if matches!(command, Command::Exit(_)) {
Expand Down Expand Up @@ -649,6 +647,17 @@ impl ServiceDaemon {
}
},

Command::Resolve(instance, try_count) => {
let pending_query = zc.query_missing_srv(&instance);
let max_try = 3;
if pending_query && try_count < max_try {
// Note that if the current try already succeeds, the next retransmission
// will be no-op as the cache has been updated.
let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
zc.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
}
}

Command::GetMetrics(resp_s) => match resp_s.send(zc.counters.clone()) {
Ok(()) => debug!("Sent metrics to the client"),
Err(e) => error!("Failed to send metrics: {}", e),
Expand Down Expand Up @@ -749,7 +758,9 @@ fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
Ok(fd)
}

/// Specify a UNIX timestamp in millis to run `command` for the next time.
struct ReRun {
/// UNIX timestamp in millis.
next_time: u64,
command: Command,
}
Expand Down Expand Up @@ -895,6 +906,9 @@ struct Zeroconf {
timers: Vec<u64>,

status: DaemonStatus,

/// Service instances that are pending for resolving SRV and TXT.
pending_resolves: HashSet<String>,
}

impl Zeroconf {
Expand Down Expand Up @@ -943,6 +957,7 @@ impl Zeroconf {
signal_sock,
timers,
status,
pending_resolves: HashSet::new(),
})
}

Expand Down Expand Up @@ -1415,28 +1430,28 @@ impl Zeroconf {
true
}

/// Sends TYPE_ANY query for instances that're missing SRV records.
fn query_missing_srv(&mut self) {
let now = current_time_millis();
let wait_in_millis = 800; // The threshold for deeming SRV missing.
/// Sends TYPE_ANY query for instances who are missing SRV records.
/// Returns true, if sent query. Returns false if SRV already exists.
fn query_missing_srv(&mut self, instance: &str) -> bool {
if self.cache.srv.contains_key(instance) {
return false;
}

for records in self.cache.ptr.values() {
for record in records.iter() {
if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
if !self.cache.srv.contains_key(&ptr.alias)
&& valid_instance_name(&ptr.alias)
&& now > ptr.get_record().get_created() + wait_in_millis
{
self.send_query(&ptr.alias, TYPE_ANY);
}
}
}
if !valid_instance_name(instance) {
debug!("instance name {} not valid", instance);
return false;
}

self.send_query(instance, TYPE_ANY);
true
}

/// Checks if `ty_domain` has records in the cache. If yes, sends the
/// cached records via `sender`.
fn query_cache(&mut self, ty_domain: &str, sender: Sender<ServiceEvent>) {
let mut resolved: HashSet<String> = HashSet::new();
let mut unresolved: HashSet<String> = HashSet::new();

if let Some(records) = self.cache.ptr.get(ty_domain) {
for record in records.iter() {
if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
Expand All @@ -1460,14 +1475,33 @@ impl Zeroconf {
}

if info.is_ready() {
resolved.insert(ptr.alias.clone());
match sender.send(ServiceEvent::ServiceResolved(info)) {
Ok(()) => debug!("sent service resolved"),
Err(e) => error!("failed to send service resolved: {}", e),
}
} else {
unresolved.insert(ptr.alias.clone());
}
}
}
}

for instance in resolved.drain() {
self.pending_resolves.remove(&instance);
}

for instance in unresolved.drain() {
self.add_pending_resolve(instance);
}
}

fn add_pending_resolve(&mut self, instance: String) {
if !self.pending_resolves.contains(&instance) {
let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
self.pending_resolves.insert(instance);
}
}

fn create_service_info_from_cache(
Expand Down Expand Up @@ -1625,30 +1659,44 @@ impl Zeroconf {
// instance. For example, a regular service type PTR and a sub-type
// service type PTR can both point to the same service instance.
// This loop automatically handles the sub-type PTRs.
let mut resolved: HashSet<String> = HashSet::new();
let mut unresolved: HashSet<String> = HashSet::new();

for (ty_domain, records) in self.cache.ptr.iter() {
if !self.queriers.contains_key(ty_domain) {
// No need to resolve if not in our queries.
continue;
}

for record in records.iter() {
if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
if updated_instances.contains(&dns_ptr.alias) {
if let Ok(info) =
self.create_service_info_from_cache(ty_domain, &dns_ptr.alias)
{
if info.is_ready() {
resolved.insert(dns_ptr.alias.clone());
call_listener(
&self.queriers,
ty_domain,
ServiceEvent::ServiceResolved(info),
);
} else {
unresolved.insert(dns_ptr.alias.clone());
}
}
} else {
// SRV record is missing, might need to send query again.
if !self.cache.srv.contains_key(&dns_ptr.alias) {
self.timers.push(now + 1_000);
}
}
}
}
}

for instance in resolved.drain() {
self.pending_resolves.remove(&instance);
}

for instance in unresolved.drain() {
self.add_pending_resolve(instance);
}
}

fn handle_query(&mut self, msg: DnsIncoming, ip: &IpAddr) {
Expand Down Expand Up @@ -1880,6 +1928,10 @@ enum Command {
/// Stop browsing a service type
StopBrowse(String), // (ty_domain)

/// Send query to resolve a service instance.
/// This is used when a PTR record exists but SRV & TXT records are missing.
Resolve(String, u16), // (service_instance_fullname, try_count)

/// Read the current values of the counters
GetMetrics(Sender<Metrics>),

Expand Down Expand Up @@ -1908,6 +1960,7 @@ impl fmt::Display for Command {
Command::StopBrowse(_) => write!(f, "Command StopBrowse"),
Command::Unregister(_, _) => write!(f, "Command Unregister"),
Command::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
Command::Resolve(_, _) => write!(f, "Command Resolve"),
}
}
}
Expand Down