diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 01fce5d..2bec2f6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,7 +19,7 @@ jobs: os: [ubuntu-20.04, windows-latest, macos-latest] steps: - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@1.60.0 + - uses: dtolnay/rust-toolchain@1.61.0 with: components: rustfmt, clippy - name: Run rustfmt and fail if any warnings diff --git a/Cargo.toml b/Cargo.toml index f40eea9..359bc2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "mdns-sd" version = "0.10.1" authors = ["keepsimple "] edition = "2018" -rust-version = "1.60.0" +rust-version = "1.61.0" license = "Apache-2.0 OR MIT" repository = "https://github.com/keepsimple1/mdns-sd" documentation = "https://docs.rs/mdns-sd" diff --git a/src/dns_parser.rs b/src/dns_parser.rs index a945a6f..6d40dd3 100644 --- a/src/dns_parser.rs +++ b/src/dns_parser.rs @@ -103,10 +103,6 @@ impl DnsRecord { } } - pub(crate) fn get_created(&self) -> u64 { - self.created - } - pub(crate) fn get_expire_time(&self) -> u64 { self.expires } diff --git a/src/service_daemon.rs b/src/service_daemon.rs index e604f70..169bd76 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -1,6 +1,6 @@ //! Service daemon for mDNS Service Discovery. -// What DNS-based Service Discovery works in a nutshell: +// How DNS-based Service Discovery works in a nutshell: // // (excerpt from RFC 6763) // .... that a particular service instance can be @@ -73,6 +73,8 @@ const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251); const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb); const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); +const RESOLVE_WAIT_IN_MILLIS: u64 = 500; + /// Response status code for the service `unregister` call. #[derive(Debug)] pub enum UnregisterStatus { @@ -475,10 +477,6 @@ impl ServiceDaemon { } } - // Send out additional queries for unresolved instances, where - // the early responses did not have SRV records. - zc.query_missing_srv(); - // process commands from the command channel while let Ok(command) = receiver.try_recv() { if matches!(command, Command::Exit(_)) { @@ -649,6 +647,17 @@ impl ServiceDaemon { } }, + Command::Resolve(instance, try_count) => { + let pending_query = zc.query_missing_srv(&instance); + let max_try = 3; + if pending_query && try_count < max_try { + // Note that if the current try already succeeds, the next retransmission + // will be no-op as the cache has been updated. + let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS; + zc.add_retransmission(next_time, Command::Resolve(instance, try_count + 1)); + } + } + Command::GetMetrics(resp_s) => match resp_s.send(zc.counters.clone()) { Ok(()) => debug!("Sent metrics to the client"), Err(e) => error!("Failed to send metrics: {}", e), @@ -749,7 +758,9 @@ fn new_socket(addr: SocketAddr, non_block: bool) -> Result { Ok(fd) } +/// Specify a UNIX timestamp in millis to run `command` for the next time. struct ReRun { + /// UNIX timestamp in millis. next_time: u64, command: Command, } @@ -895,6 +906,9 @@ struct Zeroconf { timers: Vec, status: DaemonStatus, + + /// Service instances that are pending for resolving SRV and TXT. + pending_resolves: HashSet, } impl Zeroconf { @@ -943,6 +957,7 @@ impl Zeroconf { signal_sock, timers, status, + pending_resolves: HashSet::new(), }) } @@ -1415,28 +1430,28 @@ impl Zeroconf { true } - /// Sends TYPE_ANY query for instances that're missing SRV records. - fn query_missing_srv(&mut self) { - let now = current_time_millis(); - let wait_in_millis = 800; // The threshold for deeming SRV missing. + /// Sends TYPE_ANY query for instances who are missing SRV records. + /// Returns true, if sent query. Returns false if SRV already exists. + fn query_missing_srv(&mut self, instance: &str) -> bool { + if self.cache.srv.contains_key(instance) { + return false; + } - for records in self.cache.ptr.values() { - for record in records.iter() { - if let Some(ptr) = record.any().downcast_ref::() { - if !self.cache.srv.contains_key(&ptr.alias) - && valid_instance_name(&ptr.alias) - && now > ptr.get_record().get_created() + wait_in_millis - { - self.send_query(&ptr.alias, TYPE_ANY); - } - } - } + if !valid_instance_name(instance) { + debug!("instance name {} not valid", instance); + return false; } + + self.send_query(instance, TYPE_ANY); + true } /// Checks if `ty_domain` has records in the cache. If yes, sends the /// cached records via `sender`. fn query_cache(&mut self, ty_domain: &str, sender: Sender) { + let mut resolved: HashSet = HashSet::new(); + let mut unresolved: HashSet = HashSet::new(); + if let Some(records) = self.cache.ptr.get(ty_domain) { for record in records.iter() { if let Some(ptr) = record.any().downcast_ref::() { @@ -1460,14 +1475,33 @@ impl Zeroconf { } if info.is_ready() { + resolved.insert(ptr.alias.clone()); match sender.send(ServiceEvent::ServiceResolved(info)) { Ok(()) => debug!("sent service resolved"), Err(e) => error!("failed to send service resolved: {}", e), } + } else { + unresolved.insert(ptr.alias.clone()); } } } } + + for instance in resolved.drain() { + self.pending_resolves.remove(&instance); + } + + for instance in unresolved.drain() { + self.add_pending_resolve(instance); + } + } + + fn add_pending_resolve(&mut self, instance: String) { + if !self.pending_resolves.contains(&instance) { + let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS; + self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1)); + self.pending_resolves.insert(instance); + } } fn create_service_info_from_cache( @@ -1625,7 +1659,15 @@ impl Zeroconf { // instance. For example, a regular service type PTR and a sub-type // service type PTR can both point to the same service instance. // This loop automatically handles the sub-type PTRs. + let mut resolved: HashSet = HashSet::new(); + let mut unresolved: HashSet = HashSet::new(); + for (ty_domain, records) in self.cache.ptr.iter() { + if !self.queriers.contains_key(ty_domain) { + // No need to resolve if not in our queries. + continue; + } + for record in records.iter() { if let Some(dns_ptr) = record.any().downcast_ref::() { if updated_instances.contains(&dns_ptr.alias) { @@ -1633,22 +1675,28 @@ impl Zeroconf { self.create_service_info_from_cache(ty_domain, &dns_ptr.alias) { if info.is_ready() { + resolved.insert(dns_ptr.alias.clone()); call_listener( &self.queriers, ty_domain, ServiceEvent::ServiceResolved(info), ); + } else { + unresolved.insert(dns_ptr.alias.clone()); } } - } else { - // SRV record is missing, might need to send query again. - if !self.cache.srv.contains_key(&dns_ptr.alias) { - self.timers.push(now + 1_000); - } } } } } + + for instance in resolved.drain() { + self.pending_resolves.remove(&instance); + } + + for instance in unresolved.drain() { + self.add_pending_resolve(instance); + } } fn handle_query(&mut self, msg: DnsIncoming, ip: &IpAddr) { @@ -1880,6 +1928,10 @@ enum Command { /// Stop browsing a service type StopBrowse(String), // (ty_domain) + /// Send query to resolve a service instance. + /// This is used when a PTR record exists but SRV & TXT records are missing. + Resolve(String, u16), // (service_instance_fullname, try_count) + /// Read the current values of the counters GetMetrics(Sender), @@ -1908,6 +1960,7 @@ impl fmt::Display for Command { Command::StopBrowse(_) => write!(f, "Command StopBrowse"), Command::Unregister(_, _) => write!(f, "Command Unregister"), Command::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"), + Command::Resolve(_, _) => write!(f, "Command Resolve"), } } }