From 8b63fd7d608639e24aea5bfd47e3cf977af4ab76 Mon Sep 17 00:00:00 2001 From: keepsimple1 Date: Sun, 3 Nov 2024 21:27:54 -0800 Subject: [PATCH] feat: support name probing and conflict resolution (#265) * update rustc version * add a new DaemonEvent::Respond * export RR type definitions --- .github/workflows/build.yml | 2 +- Cargo.toml | 3 +- examples/register.rs | 22 +- src/dns_cache.rs | 36 +- src/dns_parser.rs | 433 +++++++++++++--- src/lib.rs | 8 +- src/service_daemon.rs | 954 +++++++++++++++++++++++++++++------- src/service_info.rs | 310 +++++++++++- tests/mdns_test.rs | 346 ++++++++++++- 9 files changed, 1842 insertions(+), 272 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aa6cae3..ba34c8a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,7 +19,7 @@ jobs: os: [ubuntu-20.04, windows-latest, macos-latest] steps: - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@1.63.0 + - uses: dtolnay/rust-toolchain@1.65.0 with: components: rustfmt, clippy - name: Run rustfmt and fail if any warnings diff --git a/Cargo.toml b/Cargo.toml index b10befa..775d29b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "mdns-sd" version = "0.11.5" authors = ["keepsimple "] edition = "2018" -rust-version = "1.63.0" +rust-version = "1.65.0" license = "Apache-2.0 OR MIT" repository = "https://github.com/keepsimple1/mdns-sd" documentation = "https://docs.rs/mdns-sd" @@ -17,6 +17,7 @@ logging = ["log"] default = ["async", "logging"] [dependencies] +fastrand = "2.1" flume = { version = "0.11", default-features = false } # channel between threads if-addrs = { version = "0.13", features = ["link-local"] } # get local IP addresses log = { version = "0.4", optional = true } # logging diff --git a/examples/register.rs b/examples/register.rs index cc1281f..c2e4f1b 100644 --- a/examples/register.rs +++ b/examples/register.rs @@ -2,11 +2,11 @@ //! //! Run with: //! -//! cargo run --example register +//! cargo run --example register [options] //! //! Example: //! -//! cargo run --example register _my-hello._udp test1 +//! cargo run --example register _my-hello._udp instance1 host1 //! //! Options: //! "--unregister": automatically unregister after 2 seconds. @@ -16,7 +16,9 @@ use mdns_sd::{DaemonEvent, IfKind, ServiceDaemon, ServiceInfo}; use std::{env, thread, time::Duration}; fn main() { - env_logger::init(); + // setup env_logger with more precise timestamp. + let mut builder = env_logger::Builder::from_default_env(); + builder.format_timestamp_millis().init(); // Simple command line options. let args: Vec = env::args().collect(); @@ -52,11 +54,18 @@ fn main() { return; } }; + let hostname = match args.get(3) { + Some(arg) => arg, + None => { + print_usage(); + return; + } + }; // With `enable_addr_auto()`, we can give empty addrs and let the lib find them. // If the caller knows specific addrs to use, then assign the addrs here. let my_addrs = ""; - let service_hostname = format!("{}{}", instance_name, &service_type); + let service_hostname = format!("{}.local.", hostname); let port = 3456; // The key string in TXT properties is case insensitive. Only the first @@ -106,10 +115,11 @@ fn main() { fn print_usage() { println!("Usage:"); - println!("cargo run --example register [--unregister]"); + println!("cargo run --example register [options]"); println!("Options:"); println!("--unregister: automatically unregister after 2 seconds"); + println!("--disable-ipv6: not to use IPv6 interfaces."); println!(); println!("For example:"); - println!("cargo run --example register _my-hello._udp test1"); + println!("cargo run --example register _my-hello._udp instance1 host1"); } diff --git a/src/dns_cache.rs b/src/dns_cache.rs index e2d56fd..a87ba5c 100644 --- a/src/dns_cache.rs +++ b/src/dns_cache.rs @@ -7,7 +7,7 @@ use crate::log::debug; use crate::{ dns_parser::{ current_time_millis, split_sub_domain, DnsAddress, DnsPointer, DnsRecordBox, DnsSrv, - TYPE_A, TYPE_AAAA, TYPE_NSEC, TYPE_PTR, TYPE_SRV, TYPE_TXT, + RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_NSEC, RR_TYPE_PTR, RR_TYPE_SRV, RR_TYPE_TXT, }, service_info::valid_two_addrs_on_intf, }; @@ -124,7 +124,7 @@ impl DnsCache { // If it is PTR with subtype, store a mapping from the instance fullname // to the subtype in this cache. - if incoming.get_type() == TYPE_PTR { + if incoming.get_type() == RR_TYPE_PTR { let (_, subtype_opt) = split_sub_domain(&entry_name); if let Some(subtype) = subtype_opt { if let Some(ptr) = incoming.any().downcast_ref::() { @@ -137,11 +137,11 @@ impl DnsCache { // get the existing records for the type. let record_vec = match incoming.get_type() { - TYPE_PTR => self.ptr.entry(entry_name).or_default(), - TYPE_SRV => self.srv.entry(entry_name).or_default(), - TYPE_TXT => self.txt.entry(entry_name).or_default(), - TYPE_A | TYPE_AAAA => self.addr.entry(entry_name).or_default(), - TYPE_NSEC => self.nsec.entry(entry_name).or_default(), + RR_TYPE_PTR => self.ptr.entry(entry_name).or_default(), + RR_TYPE_SRV => self.srv.entry(entry_name).or_default(), + RR_TYPE_TXT => self.txt.entry(entry_name).or_default(), + RR_TYPE_A | RR_TYPE_AAAA => self.addr.entry(entry_name).or_default(), + RR_TYPE_NSEC => self.nsec.entry(entry_name).or_default(), _ => return None, }; @@ -168,7 +168,7 @@ impl DnsCache { should_flush = true; // additional checks for address records. - if rtype == TYPE_A || rtype == TYPE_AAAA { + if rtype == RR_TYPE_A || rtype == RR_TYPE_AAAA { if let Some(addr) = r.any().downcast_ref::() { if let Some(addr_b) = incoming.any().downcast_ref::() { should_flush = @@ -215,10 +215,10 @@ impl DnsCache { let mut found = false; let record_name = record.get_name(); let record_vec = match record.get_type() { - TYPE_PTR => self.ptr.get_mut(record_name), - TYPE_SRV => self.srv.get_mut(record_name), - TYPE_TXT => self.txt.get_mut(record_name), - TYPE_A | TYPE_AAAA => self.addr.get_mut(record_name), + RR_TYPE_PTR => self.ptr.get_mut(record_name), + RR_TYPE_SRV => self.srv.get_mut(record_name), + RR_TYPE_TXT => self.txt.get_mut(record_name), + RR_TYPE_A | RR_TYPE_AAAA => self.addr.get_mut(record_name), _ => return found, }; if let Some(record_vec) = record_vec { @@ -277,7 +277,7 @@ impl DnsCache { srv_records.retain(|srv| { let expired = srv.get_record().is_expired(now); if expired { - debug!("expired SRV: {}: {}", ty_domain, srv.get_name()); + debug!("expired SRV: {}: {:?}", ty_domain, srv); expired_instances .entry(ty_domain.to_string()) .or_insert_with(HashSet::new) @@ -299,7 +299,7 @@ impl DnsCache { let expired = x.get_record().is_expired(now); if expired { if let Some(dns_ptr) = x.any().downcast_ref::() { - debug!("expired PTR: {:?}", dns_ptr); + debug!("expired PTR: domain:{ty_domain} record: {:?}", dns_ptr); expired_instances .entry(ty_domain.to_string()) .or_insert_with(HashSet::new) @@ -460,10 +460,10 @@ impl DnsCache { now: u64, ) -> Vec<&'a DnsRecordBox> { let records_opt = match qtype { - TYPE_PTR => self.get_ptr(name), - TYPE_SRV => self.get_srv(name), - TYPE_A | TYPE_AAAA => self.get_addr(name), - TYPE_TXT => self.get_txt(name), + RR_TYPE_PTR => self.get_ptr(name), + RR_TYPE_SRV => self.get_srv(name), + RR_TYPE_A | RR_TYPE_AAAA => self.get_addr(name), + RR_TYPE_TXT => self.get_txt(name), _ => None, }; diff --git a/src/dns_parser.rs b/src/dns_parser.rs index 9b5810e..15a1c35 100644 --- a/src/dns_parser.rs +++ b/src/dns_parser.rs @@ -6,7 +6,10 @@ #[cfg(feature = "logging")] use crate::log::debug; -use crate::{service_info::decode_txt, Error, Result, ServiceInfo}; +use crate::{ + service_info::{decode_txt, valid_ip_on_intf, DnsRegistry}, + Error, Result, ServiceInfo, +}; use if_addrs::Interface; use std::{ any::Any, @@ -19,15 +22,48 @@ use std::{ time::SystemTime, }; -pub const TYPE_A: u16 = 1; // IPv4 address -pub const TYPE_CNAME: u16 = 5; -pub const TYPE_PTR: u16 = 12; -pub const TYPE_HINFO: u16 = 13; -pub const TYPE_TXT: u16 = 16; -pub const TYPE_AAAA: u16 = 28; // IPv6 address -pub const TYPE_SRV: u16 = 33; -pub const TYPE_NSEC: u16 = 47; // Negative responses -pub const TYPE_ANY: u16 = 255; +/// DNS record type for IPv4 address +pub const RR_TYPE_A: u16 = 1; + +/// DNS record type for Canonical Name +pub const RR_TYPE_CNAME: u16 = 5; + +/// DNS record type for Pointer +pub const RR_TYPE_PTR: u16 = 12; + +/// DNS record type for Host Info +pub const RR_TYPE_HINFO: u16 = 13; + +/// DNS record type for Text (properties) +pub const RR_TYPE_TXT: u16 = 16; + +/// DNS record type for IPv6 address +pub const RR_TYPE_AAAA: u16 = 28; + +/// DNS record type for Service +pub const RR_TYPE_SRV: u16 = 33; + +/// DNS record type for Negative Responses +pub const RR_TYPE_NSEC: u16 = 47; + +/// DNS record type for any records (wildcard) +pub const RR_TYPE_ANY: u16 = 255; + +/// Returns the name string of a `rr_type`. +pub(crate) const fn rr_type_name(rr_type: u16) -> &'static str { + match rr_type { + RR_TYPE_A => "TYPE_A", + RR_TYPE_CNAME => "TYPE_CNAME", + RR_TYPE_PTR => "TYPE_PTR", + RR_TYPE_HINFO => "TYPE_HINFO", + RR_TYPE_TXT => "TYPE_TXT", + RR_TYPE_AAAA => "TYPE_AAAA", + RR_TYPE_SRV => "TYPE_SRV", + RR_TYPE_NSEC => "TYPE_NSEC", + RR_TYPE_ANY => "TYPE_ANY", + _ => "unknown", + } +} pub const CLASS_IN: u16 = 1; pub const CLASS_MASK: u16 = 0x7FFF; @@ -73,8 +109,8 @@ const U16_SIZE: usize = 2; #[inline] pub const fn ip_address_to_type(address: &IpAddr) -> u16 { match address { - IpAddr::V4(_) => TYPE_A, - IpAddr::V6(_) => TYPE_AAAA, + IpAddr::V4(_) => RR_TYPE_A, + IpAddr::V6(_) => RR_TYPE_AAAA, } } @@ -116,6 +152,9 @@ pub struct DnsRecord { /// Support re-query an instance before its PTR record expires. /// See https://datatracker.ietf.org/doc/html/rfc6762#section-5.2 refresh: u64, // UNIX time in millis + + /// If conflict resolution decides to change the name, this is the new one. + new_name: Option, } impl DnsRecord { @@ -135,6 +174,7 @@ impl DnsRecord { created, expires, refresh, + new_name: None, } } @@ -230,6 +270,27 @@ impl DnsRecord { self.ttl -= (elapsed / 1000) as u32; } } + + pub(crate) fn set_new_name(&mut self, new_name: String) { + if new_name == self.entry.name { + self.new_name = None; + } else { + self.new_name = Some(new_name); + } + } + + pub(crate) fn get_new_name(&self) -> Option<&str> { + self.new_name.as_deref() + } + + /// Return the new name if exists, otherwise the regular name in DnsEntry. + pub(crate) fn get_name(&self) -> &str { + self.new_name.as_deref().unwrap_or(&self.entry.name) + } + + pub(crate) fn get_original_name(&self) -> &str { + &self.entry.name + } } impl PartialEq for DnsRecord { @@ -247,6 +308,41 @@ pub(crate) trait DnsRecordExt: fmt::Debug { /// Returns whether `other` record is considered the same except TTL. fn matches(&self, other: &dyn DnsRecordExt) -> bool; + /// Returns whether `other` record has the same rdata. + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool; + + /// Returns the result based on a byte-level comparison of `rdata`. + /// If `other` is not valid, returns `Greater`. + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering; + + /// Returns the result based on "lexicographically later" defined below. + fn compare(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + /* + RFC 6762: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2 + + ... The determination of "lexicographically later" is performed by first + comparing the record class (excluding the cache-flush bit described + in Section 10.2), then the record type, then raw comparison of the + binary content of the rdata without regard for meaning or structure. + If the record classes differ, then the numerically greater class is + considered "lexicographically later". Otherwise, if the record types + differ, then the numerically greater type is considered + "lexicographically later". If the rrtype and rrclass both match, + then the rdata is compared. ... + */ + match self.get_class().cmp(&other.get_class()) { + cmp::Ordering::Equal => match self.get_type().cmp(&other.get_type()) { + cmp::Ordering::Equal => self.compare_rdata(other), + not_equal => not_equal, + }, + not_equal => not_equal, + } + } + + /// Returns a human-readable string of rdata. + fn rdata_print(&self) -> String; + + /// Returns the class only, excluding class_flush / unique bit. fn get_class(&self) -> u16 { self.get_record().entry.class } @@ -255,9 +351,11 @@ pub(crate) trait DnsRecordExt: fmt::Debug { self.get_record().entry.cache_flush } + /// Return the new name if exists, otherwise the regular name in DnsEntry. fn get_name(&self) -> &str { - self.get_record().entry.name.as_str() + self.get_record().get_name() } + fn get_type(&self) -> u16 { self.get_record().entry.ty } @@ -326,6 +424,11 @@ impl DnsAddress { let record = DnsRecord::new(name, ty, class, ttl); Self { record, address } } + + /// Returns whether this address is in the same subnet of `intf`. + pub(crate) fn in_subnet(&self, intf: &Interface) -> bool { + valid_ip_on_intf(&self.address, intf) + } } impl DnsRecordExt for DnsAddress { @@ -355,6 +458,25 @@ impl DnsRecordExt for DnsAddress { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_a) = other.any().downcast_ref::() { + return self.address == other_a.address; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + if let Some(other_a) = other.any().downcast_ref::() { + self.address.cmp(&other_a.address) + } else { + cmp::Ordering::Greater + } + } + + fn rdata_print(&self) -> String { + format!("{}", self.address) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -398,6 +520,25 @@ impl DnsRecordExt for DnsPointer { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_ptr) = other.any().downcast_ref::() { + return self.alias == other_ptr.alias; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + if let Some(other_ptr) = other.any().downcast_ref::() { + self.alias.cmp(&other_ptr.alias) + } else { + cmp::Ordering::Greater + } + } + + fn rdata_print(&self) -> String { + self.alias.clone() + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -425,7 +566,7 @@ impl DnsSrv { port: u16, host: String, ) -> Self { - let record = DnsRecord::new(name, TYPE_SRV, class, ttl); + let record = DnsRecord::new(name, RR_TYPE_SRV, class, ttl); Self { record, priority, @@ -467,6 +608,55 @@ impl DnsRecordExt for DnsSrv { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_srv) = other.any().downcast_ref::() { + return self.host == other_srv.host + && self.port == other_srv.port + && self.weight == other_srv.weight + && self.priority == other_srv.priority; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + let Some(other_srv) = other.any().downcast_ref::() else { + return cmp::Ordering::Greater; + }; + + // 1. compare `priority` + match self + .priority + .to_be_bytes() + .cmp(&other_srv.priority.to_be_bytes()) + { + cmp::Ordering::Equal => { + // 2. compare `weight` + match self + .weight + .to_be_bytes() + .cmp(&other_srv.weight.to_be_bytes()) + { + cmp::Ordering::Equal => { + // 3. compare `port`. + match self.port.to_be_bytes().cmp(&other_srv.port.to_be_bytes()) { + cmp::Ordering::Equal => self.host.cmp(&other_srv.host), + not_equal => not_equal, + } + } + not_equal => not_equal, + } + } + not_equal => not_equal, + } + } + + fn rdata_print(&self) -> String { + format!( + "priority: {}, weight: {}, port: {}, host: {}", + self.priority, self.weight, self.port, self.host + ) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -492,7 +682,7 @@ pub struct DnsTxt { impl DnsTxt { pub(crate) fn new(name: &str, class: u16, ttl: u32, text: Vec) -> Self { - let record = DnsRecord::new(name, TYPE_TXT, class, ttl); + let record = DnsRecord::new(name, RR_TYPE_TXT, class, ttl); Self { record, text } } } @@ -507,7 +697,6 @@ impl DnsRecordExt for DnsTxt { } fn write(&self, packet: &mut DnsOutPacket) { - debug!("writing text length {}", &self.text.len()); packet.write_bytes(&self.text); } @@ -522,6 +711,25 @@ impl DnsRecordExt for DnsTxt { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_txt) = other.any().downcast_ref::() { + return self.text == other_txt.text; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + if let Some(other_txt) = other.any().downcast_ref::() { + self.text.cmp(&other_txt.text) + } else { + cmp::Ordering::Greater + } + } + + fn rdata_print(&self) -> String { + format!("{:?}", decode_txt(&self.text)) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -581,6 +789,28 @@ impl DnsRecordExt for DnsHostInfo { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_hinfo) = other.any().downcast_ref::() { + return self.cpu == other_hinfo.cpu && self.os == other_hinfo.os; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + if let Some(other_hinfo) = other.any().downcast_ref::() { + match self.cpu.cmp(&other_hinfo.cpu) { + cmp::Ordering::Equal => self.os.cmp(&other_hinfo.os), + ordering => ordering, + } + } else { + cmp::Ordering::Greater + } + } + + fn rdata_print(&self) -> String { + format!("cpu: {}, os: {}", self.cpu, self.os) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -600,7 +830,7 @@ pub struct DnsNSec { impl DnsNSec { fn new(name: &str, class: u16, ttl: u32, next_domain: String, type_bitmap: Vec) -> Self { - let record = DnsRecord::new(name, TYPE_NSEC, class, ttl); + let record = DnsRecord::new(name, RR_TYPE_NSEC, class, ttl); Self { record, next_domain, @@ -664,6 +894,33 @@ impl DnsRecordExt for DnsNSec { false } + fn rrdata_match(&self, other: &dyn DnsRecordExt) -> bool { + if let Some(other_record) = other.any().downcast_ref::() { + return self.next_domain == other_record.next_domain + && self.type_bitmap == other_record.type_bitmap; + } + false + } + + fn compare_rdata(&self, other: &dyn DnsRecordExt) -> cmp::Ordering { + if let Some(other_nsec) = other.any().downcast_ref::() { + match self.next_domain.cmp(&other_nsec.next_domain) { + cmp::Ordering::Equal => self.type_bitmap.cmp(&other_nsec.type_bitmap), + ordering => ordering, + } + } else { + cmp::Ordering::Greater + } + } + + fn rdata_print(&self) -> String { + format!( + "next_domain: {}, type_bitmap len: {}", + self.next_domain, + self.type_bitmap.len() + ) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) } @@ -714,7 +971,7 @@ impl DnsOutPacket { let start_size = self.size; let record = record_ext.get_record(); - self.write_name(&record.entry.name); + self.write_name(record.get_name()); self.write_short(record.entry.ty); if record.entry.cache_flush { // check "multicast" @@ -901,9 +1158,9 @@ pub(crate) struct DnsOutgoing { multicast: bool, pub(crate) questions: Vec, pub(crate) answers: Vec<(DnsRecordBox, u64)>, - pub(crate) authorities: Vec, + pub(crate) authorities: Vec, pub(crate) additionals: Vec, - pub(crate) known_answer_count: i64, + pub(crate) known_answer_count: i64, // for internal maintenance only } impl DnsOutgoing { @@ -967,8 +1224,12 @@ impl DnsOutgoing { } /// A workaround as Rust doesn't allow us to pass DnsRecordBox in as `impl DnsRecordExt` - pub(crate) fn add_additional_answer_box(&mut self, answer_box: DnsRecordBox) { - self.additionals.push(answer_box); + pub(crate) fn add_answer_box(&mut self, answer_box: DnsRecordBox) { + self.answers.push((answer_box, 0)); + } + + pub(crate) fn add_authority(&mut self, record: DnsRecordBox) { + self.authorities.push(record); } /// Returns true if `answer` is added to the outgoing msg. @@ -996,7 +1257,6 @@ impl DnsOutgoing { answer: impl DnsRecordExt + Send + 'static, now: u64, ) -> bool { - debug!("Check for add_answer_at_time"); if now == 0 || !answer.get_record().is_expired(now) { debug!("add_answer push: {:?}", &answer); self.answers.push((Box::new(answer), now)); @@ -1016,6 +1276,7 @@ impl DnsOutgoing { msg: &DnsIncoming, service: &ServiceInfo, intf: &Interface, + dns_registry: &DnsRegistry, ) { let intf_addrs = service.get_addrs_on_intf(intf); if intf_addrs.is_empty() { @@ -1023,14 +1284,25 @@ impl DnsOutgoing { return; } + // check if we changed our name due to conflicts. + let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) { + Some(new_name) => new_name, + None => service.get_fullname(), + }; + + let hostname = match dns_registry.name_changes.get(service.get_hostname()) { + Some(new_name) => new_name, + None => service.get_hostname(), + }; + let ptr_added = self.add_answer( msg, DnsPointer::new( service.get_type(), - TYPE_PTR, + RR_TYPE_PTR, CLASS_IN, service.get_other_ttl(), - service.get_fullname().to_string(), + service_fullname.to_string(), ), ); @@ -1043,27 +1315,27 @@ impl DnsOutgoing { debug!("Adding subdomain {}", sub); self.add_additional_answer(DnsPointer::new( sub, - TYPE_PTR, + RR_TYPE_PTR, CLASS_IN, service.get_other_ttl(), - service.get_fullname().to_string(), + service_fullname.to_string(), )); } // Add recommended additional answers according to // https://tools.ietf.org/html/rfc6763#section-12.1. self.add_additional_answer(DnsSrv::new( - service.get_fullname(), + service_fullname, CLASS_IN | CLASS_CACHE_FLUSH, service.get_host_ttl(), service.get_priority(), service.get_weight(), service.get_port(), - service.get_hostname().to_string(), + hostname.to_string(), )); self.add_additional_answer(DnsTxt::new( - service.get_fullname(), + service_fullname, CLASS_IN | CLASS_CACHE_FLUSH, service.get_host_ttl(), service.generate_txt(), @@ -1071,7 +1343,7 @@ impl DnsOutgoing { for address in intf_addrs { self.add_additional_answer(DnsAddress::new( - service.get_hostname(), + hostname, ip_address_to_type(&address), CLASS_IN | CLASS_CACHE_FLUSH, service.get_host_ttl(), @@ -1115,7 +1387,7 @@ impl DnsOutgoing { } for auth in self.authorities.iter() { - auth_count += u16::from(packet.write_record(auth, 0)); + auth_count += u16::from(packet.write_record(auth.as_ref(), 0)); } for addi in self.additionals.iter() { @@ -1173,9 +1445,9 @@ pub struct DnsIncoming { offset: usize, data: Vec, pub(crate) questions: Vec, - /// This field includes records in the `answers` section - /// and in the `additionals` section. pub(crate) answers: Vec, + pub(crate) authorities: Vec, + pub(crate) additional: Vec, pub(crate) id: u16, flags: u16, pub(crate) num_questions: u16, @@ -1191,6 +1463,8 @@ impl DnsIncoming { data, questions: Vec::new(), answers: Vec::new(), + authorities: Vec::new(), + additional: Vec::new(), id: 0, flags: 0, num_questions: 0, @@ -1199,9 +1473,31 @@ impl DnsIncoming { num_additionals: 0, }; + /* + RFC 1035 section 4.1: https://datatracker.ietf.org/doc/html/rfc1035#section-4.1 + ... + All communications inside of the domain protocol are carried in a single + format called a message. The top level format of message is divided + into 5 sections (some of which are empty in certain cases) shown below: + + +---------------------+ + | Header | + +---------------------+ + | Question | the question for the name server + +---------------------+ + | Answer | RRs answering the question + +---------------------+ + | Authority | RRs pointing toward an authority + +---------------------+ + | Additional | RRs holding additional information + +---------------------+ + */ incoming.read_header()?; incoming.read_questions()?; - incoming.read_others()?; + incoming.read_answers()?; + incoming.read_authorities()?; + incoming.read_additional()?; + Ok(incoming) } @@ -1266,14 +1562,25 @@ impl DnsIncoming { Ok(()) } - /// Decodes all answers, authorities and additionals. - fn read_others(&mut self) -> Result<()> { - let n = self - .num_answers - .checked_add(self.num_authorities) - .and_then(|x| x.checked_add(self.num_additionals)) - .ok_or_else(|| Error::Msg("read_others: overflow".to_string()))?; - debug!("read_others: {}", n); + fn read_answers(&mut self) -> Result<()> { + self.answers = self.read_rr_records(self.num_answers)?; + Ok(()) + } + + fn read_authorities(&mut self) -> Result<()> { + self.authorities = self.read_rr_records(self.num_authorities)?; + Ok(()) + } + + fn read_additional(&mut self) -> Result<()> { + self.additional = self.read_rr_records(self.num_additionals)?; + Ok(()) + } + + /// Decodes a sequence of RR records (in answers, authorities and additionals). + fn read_rr_records(&mut self, count: u16) -> Result> { + debug!("read_rr_records: {}", count); + let mut rr_records = Vec::new(); // RFC 1035: https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.1 // @@ -1302,7 +1609,7 @@ impl DnsIncoming { // Muse have at least TYPE, CLASS, TTL, RDLENGTH fields: 10 bytes. const RR_HEADER_REMAIN: usize = 10; - for _ in 0..n { + for _ in 0..count { let name = self.read_name()?; let slice = &self.data[self.offset..]; @@ -1340,20 +1647,20 @@ impl DnsIncoming { // decode RDATA based on the record type. let rec: Option = match ty { - TYPE_CNAME | TYPE_PTR => Some(Box::new(DnsPointer::new( + RR_TYPE_CNAME | RR_TYPE_PTR => Some(Box::new(DnsPointer::new( &name, ty, class, ttl, self.read_name()?, ))), - TYPE_TXT => Some(Box::new(DnsTxt::new( + RR_TYPE_TXT => Some(Box::new(DnsTxt::new( &name, class, ttl, self.read_vec(rdata_len), ))), - TYPE_SRV => Some(Box::new(DnsSrv::new( + RR_TYPE_SRV => Some(Box::new(DnsSrv::new( &name, class, ttl, @@ -1362,7 +1669,7 @@ impl DnsIncoming { self.read_u16()?, self.read_name()?, ))), - TYPE_HINFO => Some(Box::new(DnsHostInfo::new( + RR_TYPE_HINFO => Some(Box::new(DnsHostInfo::new( &name, ty, class, @@ -1370,21 +1677,21 @@ impl DnsIncoming { self.read_char_string(), self.read_char_string(), ))), - TYPE_A => Some(Box::new(DnsAddress::new( + RR_TYPE_A => Some(Box::new(DnsAddress::new( &name, ty, class, ttl, self.read_ipv4().into(), ))), - TYPE_AAAA => Some(Box::new(DnsAddress::new( + RR_TYPE_AAAA => Some(Box::new(DnsAddress::new( &name, ty, class, ttl, self.read_ipv6().into(), ))), - TYPE_NSEC => Some(Box::new(DnsNSec::new( + RR_TYPE_NSEC => Some(Box::new(DnsNSec::new( &name, class, ttl, @@ -1401,18 +1708,18 @@ impl DnsIncoming { // sanity check. if self.offset != next_offset { return Err(Error::Msg(format!( - "read_others: decode offset error for RData type {} record: {:?} offset: {} expected offset: {}", + "read_rr_records: decode offset error for RData type {} record: {:?} offset: {} expected offset: {}", ty, &rec, self.offset, next_offset, ))); } if let Some(record) = rec { - debug!("read_others: {:?}", &record); - self.answers.push(record); + debug!("read_rr_records: {:?}", &record); + rr_records.push(record); } } - Ok(()) + Ok(rr_records) } fn read_char_string(&mut self) -> String { @@ -1648,14 +1955,14 @@ mod tests { use super::{ current_time_millis, get_expiration_time, DnsIncoming, DnsNSec, DnsOutgoing, DnsPointer, DnsRecordExt, DnsSrv, DnsTxt, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_QR_QUERY, - FLAGS_QR_RESPONSE, MSG_HEADER_LEN, TYPE_A, TYPE_AAAA, TYPE_PTR, + FLAGS_QR_RESPONSE, MSG_HEADER_LEN, RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_PTR, }; #[test] fn test_read_name_invalid_length() { let name = "test_read"; let mut out = DnsOutgoing::new(FLAGS_QR_QUERY); - out.add_question(name, TYPE_PTR); + out.add_question(name, RR_TYPE_PTR); let data = out.to_data_on_wire().remove(0); // construct invalid data. @@ -1694,7 +2001,7 @@ mod tests { fn test_read_name_compression_loop() { let name = "test_loop"; let mut out = DnsOutgoing::new(FLAGS_QR_QUERY); - out.add_question(name, TYPE_PTR); + out.add_question(name, RR_TYPE_PTR); let mut data = out.to_data_on_wire().remove(0); let name_length_offset = 12; // start of the name in the message. @@ -1843,8 +2150,8 @@ mod tests { ); let absent_types = nsec._types(); assert_eq!(absent_types.len(), 2); - assert_eq!(absent_types[0], TYPE_A); - assert_eq!(absent_types[1], TYPE_AAAA); + assert_eq!(absent_types[0], RR_TYPE_A); + assert_eq!(absent_types[1], RR_TYPE_AAAA); } #[test] @@ -1877,7 +2184,7 @@ mod tests { #[test] fn test_packet_size() { let mut outgoing = DnsOutgoing::new(FLAGS_QR_QUERY); - outgoing.add_question("test_packet_size", TYPE_PTR); + outgoing.add_question("test_packet_size", RR_TYPE_PTR); let packet = outgoing.to_packets().remove(0); println!("packet size: {}", packet.size); @@ -1891,12 +2198,12 @@ mod tests { fn test_querier_known_answer_multi_packet() { let mut query = DnsOutgoing::new(FLAGS_QR_QUERY); let name = "test_multi_packet._udp.local."; - query.add_question(name, TYPE_PTR); + query.add_question(name, RR_TYPE_PTR); let known_answer_count = 400; for i in 0..known_answer_count { let alias = format!("instance{}.{}", i, name); - let answer = DnsPointer::new(name, TYPE_PTR, CLASS_IN, 0, alias); + let answer = DnsPointer::new(name, RR_TYPE_PTR, CLASS_IN, 0, alias); query.add_additional_answer(answer); } diff --git a/src/lib.rs b/src/lib.rs index 9f1af4d..e898ffb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,10 +151,14 @@ mod error; mod service_daemon; mod service_info; +pub use dns_parser::{ + RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_ANY, RR_TYPE_CNAME, RR_TYPE_HINFO, RR_TYPE_NSEC, RR_TYPE_PTR, + RR_TYPE_SRV, RR_TYPE_TXT, +}; pub use error::{Error, Result}; pub use service_daemon::{ - DaemonEvent, DaemonStatus, HostnameResolutionEvent, IfKind, Metrics, ServiceDaemon, - ServiceEvent, UnregisterStatus, SERVICE_NAME_LEN_MAX_DEFAULT, + DaemonEvent, DaemonStatus, DnsNameChange, HostnameResolutionEvent, IfKind, Metrics, + ServiceDaemon, ServiceEvent, UnregisterStatus, SERVICE_NAME_LEN_MAX_DEFAULT, }; pub use service_info::{AsIpAddrs, IntoTxtProperties, ServiceInfo, TxtProperties, TxtProperty}; diff --git a/src/service_daemon.rs b/src/service_daemon.rs index dc6fae8..2f37ee2 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -29,17 +29,17 @@ // in Service Discovery, the basic data structure is "Service Info". One Service Info // corresponds to a set of DNS Resource Records. #[cfg(feature = "logging")] -use crate::log::{debug, error, warn}; +use crate::log::{debug, error, info, warn}; use crate::{ dns_cache::DnsCache, dns_parser::{ - current_time_millis, ip_address_to_type, split_sub_domain, DnsAddress, DnsIncoming, - DnsOutgoing, DnsPointer, DnsRecordExt, DnsSrv, DnsTxt, CLASS_CACHE_FLUSH, CLASS_IN, - FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE, TYPE_A, TYPE_AAAA, TYPE_ANY, - TYPE_PTR, TYPE_SRV, TYPE_TXT, + current_time_millis, ip_address_to_type, rr_type_name, split_sub_domain, DnsAddress, + DnsIncoming, DnsOutgoing, DnsPointer, DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, + CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE, + RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_ANY, RR_TYPE_PTR, RR_TYPE_SRV, RR_TYPE_TXT, }, error::{Error, Result}, - service_info::ServiceInfo, + service_info::{DnsRegistry, Probe, ServiceInfo, ServiceStatus}, Receiver, }; use flume::{bounded, Sender, TrySendError}; @@ -593,6 +593,9 @@ impl ServiceDaemon { zc.resolve_updated_instances(instance_set); } + // Send out probing queries. + zc.probing_handler(); + // check IP changes. if now > next_ip_check { next_ip_check = now + IP_CHECK_INTERVAL_MILLIS; @@ -622,9 +625,9 @@ impl ServiceDaemon { zc.increase_counter(Counter::Register, 1); } - Command::RegisterResend(fullname) => { - debug!("announce service: {}", &fullname); - zc.exec_command_register_resend(fullname); + Command::RegisterResend(fullname, intf) => { + debug!("register-resend service: {fullname} on {:?}", &intf.addr); + zc.exec_command_register_resend(fullname, intf); } Command::Unregister(fullname, resp_s) => { @@ -889,8 +892,12 @@ struct Zeroconf { /// Local registered services, keyed by service full names. my_services: HashMap, + /// Received DNS records. cache: DnsCache, + /// Registered service records. + dns_registry_map: HashMap, + /// Active "Browse" commands. service_queriers: HashMap>, // @@ -941,6 +948,8 @@ impl Zeroconf { // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names, // or the same interface name with different IP addrs. let mut intf_socks = HashMap::new(); + let mut dns_registry_map = HashMap::new(); + for intf in my_ifaddrs { let sock = match new_socket_bind(&intf) { Ok(s) => s, @@ -950,6 +959,8 @@ impl Zeroconf { } }; + dns_registry_map.insert(intf.clone(), DnsRegistry::new()); + intf_socks.insert(intf, sock); } @@ -967,6 +978,7 @@ impl Zeroconf { poll_id_count: 0, my_services: HashMap::new(), cache: DnsCache::new(), + dns_registry_map, hostname_resolvers: HashMap::new(), service_queriers: HashMap::new(), retransmissions: Vec::new(), @@ -1025,15 +1037,6 @@ impl Zeroconf { }); } - /// Add `addr` in my services that enabled `addr_auto`. - fn add_addr_in_my_services(&mut self, addr: IpAddr) { - for (_, service_info) in self.my_services.iter_mut() { - if service_info.is_addr_auto() { - service_info.insert_ipaddr(addr); - } - } - } - /// Remove `addr` in my services that enabled `addr_auto`. fn del_addr_in_my_services(&mut self, addr: &IpAddr) { for (_, service_info) in self.my_services.iter_mut() { @@ -1194,9 +1197,36 @@ impl Zeroconf { return; } - self.intf_socks.insert(intf, sock); + info!("add new interface {}: {new_ip}", intf.name); + let dns_registry = match self.dns_registry_map.get_mut(&intf) { + Some(registry) => registry, + None => self + .dns_registry_map + .entry(intf.clone()) + .or_insert_with(DnsRegistry::new), + }; - self.add_addr_in_my_services(new_ip); + for (_, service_info) in self.my_services.iter_mut() { + if service_info.is_addr_auto() { + service_info.insert_ipaddr(new_ip); + + if announce_service_on_intf(dns_registry, service_info, &intf, &sock) { + info!( + "Announce service {} on {}", + service_info.get_fullname(), + intf.ip() + ); + service_info.set_status(&intf, ServiceStatus::Announced); + } else { + for timer in dns_registry.new_timers.drain(..) { + self.timers.push(Reverse(timer)); + } + service_info.set_status(&intf, ServiceStatus::Probing); + } + } + } + + self.intf_socks.insert(intf, sock); // Notify the monitors. self.notify_monitors(DaemonEvent::IpAdd(new_ip)); @@ -1225,9 +1255,9 @@ impl Zeroconf { } } - debug!("register service {:?}", &info); + info!("register service {:?}", &info); - let outgoing_addrs = self.send_unsolicited_response(&info); + let outgoing_addrs = self.send_unsolicited_response(&mut info); if !outgoing_addrs.is_empty() { self.notify_monitors(DaemonEvent::Announce( info.get_fullname().to_string(), @@ -1235,120 +1265,199 @@ impl Zeroconf { )); } - // RFC 6762 section 8.3. - // ..The Multicast DNS responder MUST send at least two unsolicited - // responses, one second apart. - let next_time = current_time_millis() + 1000; - // The key has to be lower case letter as DNS record name is case insensitive. // The info will have the original name. let service_fullname = info.get_fullname().to_lowercase(); - self.add_retransmission(next_time, Command::RegisterResend(service_fullname.clone())); self.my_services.insert(service_fullname, info); } /// Sends out announcement of `info` on every valid interface. /// Returns the list of interface IPs that sent out the announcement. - fn send_unsolicited_response(&self, info: &ServiceInfo) -> Vec { + fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec { let mut outgoing_addrs = Vec::new(); // Send the announcement on one interface per ip version. let mut multicast_sent_trackers = HashSet::new(); + let mut outgoing_intfs = Vec::new(); + for (intf, sock) in self.intf_socks.iter() { if let Some(tracker) = multicast_send_tracker(intf) { if multicast_sent_trackers.contains(&tracker) { continue; // No need to send again on the same interface with same ip version. } } - if self.broadcast_service_on_intf(info, intf, sock) { + + let dns_registry = match self.dns_registry_map.get_mut(intf) { + Some(registry) => registry, + None => self + .dns_registry_map + .entry(intf.clone()) + .or_insert_with(DnsRegistry::new), + }; + + if announce_service_on_intf(dns_registry, info, intf, sock) { if let Some(tracker) = multicast_send_tracker(intf) { multicast_sent_trackers.insert(tracker); } outgoing_addrs.push(intf.ip()); + outgoing_intfs.push(intf.clone()); + + error!("Announce service {} on {}", info.get_fullname(), intf.ip()); + + info.set_status(intf, ServiceStatus::Announced); + } else { + for timer in dns_registry.new_timers.drain(..) { + self.timers.push(Reverse(timer)); + } + info.set_status(intf, ServiceStatus::Probing); } } + // RFC 6762 section 8.3. + // ..The Multicast DNS responder MUST send at least two unsolicited + // responses, one second apart. + let next_time = current_time_millis() + 1000; + for intf in outgoing_intfs { + self.add_retransmission( + next_time, + Command::RegisterResend(info.get_fullname().to_string(), intf), + ); + } + outgoing_addrs } - /// Send an unsolicited response for owned service via `intf_sock`. - /// Returns true if sent out successfully. - fn broadcast_service_on_intf( - &self, - info: &ServiceInfo, - intf: &Interface, - sock: &Socket, - ) -> bool { - let service_fullname = info.get_fullname(); - debug!("broadcast service {}", service_fullname); - let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA); - out.add_answer_at_time( - DnsPointer::new( - info.get_type(), - TYPE_PTR, - CLASS_IN, - info.get_other_ttl(), - info.get_fullname().to_string(), - ), - 0, - ); + /// Send probings or finish them if expired. Notify waiting services. + fn probing_handler(&mut self) { + let now = current_time_millis(); - if let Some(sub) = info.get_subtype() { - debug!("Adding subdomain {}", sub); - out.add_answer_at_time( - DnsPointer::new( - sub, - TYPE_PTR, - CLASS_IN, - info.get_other_ttl(), - info.get_fullname().to_string(), - ), - 0, - ); - } + for (intf, sock) in self.intf_socks.iter() { + let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else { + continue; + }; - out.add_answer_at_time( - DnsSrv::new( - info.get_fullname(), - CLASS_IN | CLASS_CACHE_FLUSH, - info.get_host_ttl(), - info.get_priority(), - info.get_weight(), - info.get_port(), - info.get_hostname().to_string(), - ), - 0, - ); - out.add_answer_at_time( - DnsTxt::new( - info.get_fullname(), - CLASS_IN | CLASS_CACHE_FLUSH, - info.get_other_ttl(), - info.generate_txt(), - ), - 0, - ); + let mut expired_probe_names = Vec::new(); + let mut out = DnsOutgoing::new(FLAGS_QR_QUERY); - let intf_addrs = info.get_addrs_on_intf(intf); - if intf_addrs.is_empty() { - debug!("No valid addrs to add on intf {:?}", &intf); - return false; - } - for address in intf_addrs { - out.add_answer_at_time( - DnsAddress::new( - info.get_hostname(), - ip_address_to_type(&address), - CLASS_IN | CLASS_CACHE_FLUSH, - info.get_host_ttl(), - address, - ), - 0, - ); - } + for (name, probe) in dns_registry.probing.iter_mut() { + if now >= probe.next_send { + if probe.expired(now) { + // move the record to active + expired_probe_names.push(name.clone()); + } else { + out.add_question(name, RR_TYPE_ANY); + + /* + RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2 + ... + for tiebreaking to work correctly in all + cases, the Authority Section must contain *all* the records and + proposed rdata being probed for uniqueness. + */ + for record in probe.records.iter() { + out.add_authority(record.clone()); + } - send_dns_outgoing(&out, intf, sock); - true + probe.update_next_send(now); + + // add timer + self.timers.push(Reverse(probe.next_send)); + } + } + } + + // send probing. + if !out.questions.is_empty() { + info!("sending out probing of {} questions", out.questions.len()); + send_dns_outgoing(&out, intf, sock); + } + + let mut waiting_services = HashSet::new(); + + for name in expired_probe_names { + let Some(probe) = dns_registry.probing.remove(&name) else { + continue; + }; + + // send notifications about name changes + for record in probe.records.iter() { + if let Some(new_name) = record.get_record().get_new_name() { + dns_registry + .name_changes + .insert(name.clone(), new_name.to_string()); + + let event = DnsNameChange { + original: record.get_record().get_original_name().to_string(), + new_name: new_name.to_string(), + rr_type: record.get_type(), + intf_name: intf.name.to_string(), + }; + notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event)); + } + } + + // move RR from probe to active. + info!( + "probe of '{name}' finished: move {} records to active. ({} waiting services)", + probe.records.len(), + probe.waiting_services.len(), + ); + + // Move records to active and plan to wake up services if records are not empty. + if !probe.records.is_empty() { + match dns_registry.active.get_mut(&name) { + Some(records) => { + records.extend(probe.records); + } + None => { + dns_registry.active.insert(name, probe.records); + } + } + + waiting_services.extend(probe.waiting_services); + } + } + + // wake up services waiting. + for service_name in waiting_services { + info!( + "try to announce service {service_name} on intf {}", + intf.ip() + ); + if let Some(info) = self.my_services.get_mut(&service_name) { + if info.get_status(intf) == ServiceStatus::Announced { + info!("service {} already announced", info.get_fullname()); + continue; + } + + if announce_service_on_intf(dns_registry, info, intf, sock) { + let next_time = now + 1000; + let command = + Command::RegisterResend(info.get_fullname().to_string(), intf.clone()); + self.retransmissions.push(ReRun { next_time, command }); + self.timers.push(Reverse(next_time)); + + let fullname = match dns_registry.name_changes.get(&service_name) { + Some(new_name) => new_name.to_string(), + None => service_name.to_string(), + }; + + let mut hostname = info.get_hostname(); + if let Some(new_name) = dns_registry.name_changes.get(hostname) { + hostname = new_name; + } + + info!("wake up: announce service {} on {}", fullname, intf.ip()); + notify_monitors( + &mut self.monitors, + DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())), + ); + + info.set_status(intf, ServiceStatus::Announced); + } + } + } + } } fn unregister_service(&self, info: &ServiceInfo, intf: &Interface, sock: &Socket) -> Vec { @@ -1356,7 +1465,7 @@ impl Zeroconf { out.add_answer_at_time( DnsPointer::new( info.get_type(), - TYPE_PTR, + RR_TYPE_PTR, CLASS_IN, 0, info.get_fullname().to_string(), @@ -1367,7 +1476,13 @@ impl Zeroconf { if let Some(sub) = info.get_subtype() { debug!("Adding subdomain {}", sub); out.add_answer_at_time( - DnsPointer::new(sub, TYPE_PTR, CLASS_IN, 0, info.get_fullname().to_string()), + DnsPointer::new( + sub, + RR_TYPE_PTR, + CLASS_IN, + 0, + info.get_fullname().to_string(), + ), 0, ); } @@ -1443,10 +1558,17 @@ impl Zeroconf { out.add_question(name, *qtype); for record in self.cache.get_known_answers(name, *qtype, now) { + /* + RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1 + ... + When a Multicast DNS querier sends a query to which it already knows + some answers, it populates the Answer Section of the DNS query + message with those answers. + */ debug!("add known answer: {:?}", record); let mut new_record = record.clone(); new_record.get_record_mut().update_ttl(now); - out.add_additional_answer_box(new_record); + out.add_answer_box(new_record); } } @@ -1490,7 +1612,7 @@ impl Zeroconf { } }; - debug!("received {} bytes from IP: {}", sz, intf.ip()); + debug!("received {} bytes at IP: {}", sz, intf.ip()); // If sz is 0, it means sock reached End-of-File. if sz == 0 { @@ -1539,13 +1661,13 @@ impl Zeroconf { for record in records { if let Some(srv) = record.any().downcast_ref::() { if self.cache.get_addr(&srv.host).is_none() { - self.send_query_vec(&[(&srv.host, TYPE_A), (&srv.host, TYPE_AAAA)]); + self.send_query_vec(&[(&srv.host, RR_TYPE_A), (&srv.host, RR_TYPE_AAAA)]); return true; } } } } else { - self.send_query(instance, TYPE_ANY); + self.send_query(instance, RR_TYPE_ANY); return true; } @@ -1694,18 +1816,18 @@ impl Zeroconf { debug!( "handle_response: {} answers {} authorities {} additionals", &msg.answers.len(), - &msg.num_authorities, - &msg.num_additionals + &msg.authorities.len(), + &msg.additional.len() ); let now = current_time_millis(); // remove records that are expired. - msg.answers.retain(|record| { + let mut record_predicate = |record: &DnsRecordBox| { if !record.get_record().is_expired(now) { return true; } - debug!("record is expired, removing it from cache."); + info!("record is expired, removing it from cache."); if self.cache.remove(record) { // for PTR records, send event to listeners if let Some(dns_ptr) = record.any().downcast_ref::() { @@ -1720,7 +1842,13 @@ impl Zeroconf { } } false - }); + }; + msg.answers.retain(&mut record_predicate); + msg.authorities.retain(&mut record_predicate); + msg.additional.retain(&mut record_predicate); + + // check possible conflicts and handle them. + self.conflict_handler(&msg, intf); /// Represents a DNS record change that involves one service instance. struct InstanceChange { @@ -1737,7 +1865,12 @@ impl Zeroconf { // other. let mut changes = Vec::new(); let mut timers = Vec::new(); - for record in msg.answers { + for record in msg + .answers + .into_iter() + .chain(msg.authorities.into_iter()) + .chain(msg.additional.into_iter()) + { match self.cache.add_or_update(intf, record, &mut timers) { Some((dns_record, true)) => { timers.push(dns_record.get_record().get_expire_time()); @@ -1745,7 +1878,7 @@ impl Zeroconf { let ty = dns_record.get_type(); let name = dns_record.get_name(); - if ty == TYPE_PTR { + if ty == RR_TYPE_PTR { if self.service_queriers.contains_key(name) { timers.push(dns_record.get_record().get_refresh_time()); } @@ -1785,7 +1918,7 @@ impl Zeroconf { // Go through remaining changes to see if any hostname resolutions were found or updated. changes .iter() - .filter(|change| change.ty == TYPE_A || change.ty == TYPE_AAAA) + .filter(|change| change.ty == RR_TYPE_A || change.ty == RR_TYPE_AAAA) .map(|change| change.name.clone()) .collect::>() .iter() @@ -1802,10 +1935,10 @@ impl Zeroconf { let mut updated_instances = HashSet::new(); for update in changes { match update.ty { - TYPE_PTR | TYPE_SRV | TYPE_TXT => { + RR_TYPE_PTR | RR_TYPE_SRV | RR_TYPE_TXT => { updated_instances.insert(update.name); } - TYPE_A | TYPE_AAAA => { + RR_TYPE_A | RR_TYPE_AAAA => { let instances = self.cache.get_instances_on_host(&update.name); updated_instances.extend(instances); } @@ -1816,6 +1949,108 @@ impl Zeroconf { self.resolve_updated_instances(updated_instances); } + fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) { + let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else { + return; + }; + + for answer in msg.answers.iter() { + let mut new_records = Vec::new(); + + let name = answer.get_name(); + let Some(probe) = dns_registry.probing.get_mut(name) else { + continue; + }; + + // check against possible multicast forwarding + if answer.get_type() == RR_TYPE_A || answer.get_type() == RR_TYPE_AAAA { + if let Some(answer_addr) = answer.any().downcast_ref::() { + if !answer_addr.in_subnet(intf) { + info!( + "conflict handler: answer addr {:?} not in the subnet of {:?}", + answer_addr, intf + ); + continue; + } + } + } + + probe.records.retain(|record| { + if record.get_type() == answer.get_type() + && record.get_class() == answer.get_class() + && !record.rrdata_match(answer.as_ref()) + { + info!( + "found conflict name: '{name}' record: {}: {} PEER: {}", + rr_type_name(record.get_type()), + record.rdata_print(), + answer.rdata_print() + ); + + // create a new name for this record + // then remove the old record in probing. + let mut new_record = record.clone(); + let new_name = match record.get_type() { + RR_TYPE_A => hostname_change(name), + RR_TYPE_AAAA => hostname_change(name), + _ => name_change(name), + }; + new_record.get_record_mut().set_new_name(new_name); + new_records.push(new_record); + return false; // old record is dropped from the probe. + } + + true + }); + + // ????? + // if probe.records.is_empty() { + // dns_registry.probing.remove(name); + // } + + // Probing again with the new names. + let create_time = current_time_millis() + fastrand::u64(0..250); + + let waiting_services = probe.waiting_services.clone(); + + for record in new_records { + if dns_registry.update_hostname(name, record.get_name(), create_time) { + self.timers.push(Reverse(create_time)); + } + + // remember the name changes (note: `name` might not be the original, it could be already changed once.) + dns_registry.name_changes.insert( + record.get_record().get_original_name().to_string(), + record.get_name().to_string(), + ); + + let new_probe = match dns_registry.probing.get_mut(record.get_name()) { + Some(p) => p, + None => { + let new_probe = dns_registry + .probing + .entry(record.get_name().to_string()) + .or_insert_with(|| { + info!("conflict handler: new probe of {}", record.get_name()); + Probe::new(create_time) + }); + self.timers.push(Reverse(new_probe.next_send)); + new_probe + } + }; + + info!( + "insert record with new name '{}' {} into probe", + record.get_name(), + rr_type_name(record.get_type()) + ); + new_probe.insert_record(record); + + new_probe.waiting_services.extend(waiting_services.clone()); + } + } + } + /// Resolve the updated (including new) instances. /// /// Note: it is possible that more than 1 PTR pointing to the same @@ -1875,25 +2110,34 @@ impl Zeroconf { // See https://datatracker.ietf.org/doc/html/rfc6763#section-9 const META_QUERY: &str = "_services._dns-sd._udp.local."; + let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else { + error!("missing dns registry for intf {}", intf.ip()); + return; + }; + for question in msg.questions.iter() { debug!("query question: {:?}", &question); let qtype = question.entry.ty; - if qtype == TYPE_PTR { + if qtype == RR_TYPE_PTR { for service in self.my_services.values() { + if service.get_status(intf) != ServiceStatus::Announced { + continue; + } + if question.entry.name == service.get_type() || service .get_subtype() .as_ref() .map_or(false, |v| v == &question.entry.name) { - out.add_answer_with_additionals(&msg, service, intf); + out.add_answer_with_additionals(&msg, service, intf, dns_registry); } else if question.entry.name == META_QUERY { let ptr_added = out.add_answer( &msg, DnsPointer::new( &question.entry.name, - TYPE_PTR, + RR_TYPE_PTR, CLASS_IN, service.get_other_ttl(), service.get_type().to_string(), @@ -1905,16 +2149,69 @@ impl Zeroconf { } } } else { - if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY { + // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2) + if qtype == RR_TYPE_ANY && msg.num_authorities > 0 { + let probe_name = &question.entry.name; + + if let Some(probe) = dns_registry.probing.get_mut(probe_name) { + let now = current_time_millis(); + + // Only do tiebreaking if probe already started. + // This check also helps avoid redo tiebreaking if start time + // was postponed. + if probe.start_time < now { + let incoming_records: Vec<_> = msg + .authorities + .iter() + .filter(|r| r.get_name() == probe_name) + .collect(); + + /* + RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2 + ... + if the host finds that its own data is lexicographically later, it + simply ignores the other host's probe. If the host finds that its + own data is lexicographically earlier, then it defers to the winning + host by waiting one second, and then begins probing for this record + again. + */ + match probe.tiebreaking(&incoming_records) { + cmp::Ordering::Less => { + info!( + "tiebreaking '{}': LOST, will wait for one second", + probe_name + ); + probe.start_time = now + 1000; // wait and restart. + probe.next_send = now + 1000; + } + ordering => { + info!("tiebreaking '{}': {:?}", probe_name, ordering); + } + } + } + } + } + + if qtype == RR_TYPE_A || qtype == RR_TYPE_AAAA || qtype == RR_TYPE_ANY { for service in self.my_services.values() { - if service.get_hostname().to_lowercase() - == question.entry.name.to_lowercase() - { + if service.get_status(intf) != ServiceStatus::Announced { + continue; + } + + let service_hostname = + match dns_registry.name_changes.get(service.get_hostname()) { + Some(new_name) => new_name, + None => service.get_hostname(), + }; + + if service_hostname.to_lowercase() == question.entry.name.to_lowercase() { let intf_addrs = service.get_addrs_on_intf(intf); - if intf_addrs.is_empty() && (qtype == TYPE_A || qtype == TYPE_AAAA) { + if intf_addrs.is_empty() + && (qtype == RR_TYPE_A || qtype == RR_TYPE_AAAA) + { let t = match qtype { - TYPE_A => "TYPE_A", - TYPE_AAAA => "TYPE_AAAA", + RR_TYPE_A => "TYPE_A", + RR_TYPE_AAAA => "TYPE_AAAA", _ => "invalid_type", }; debug!( @@ -1939,13 +2236,28 @@ impl Zeroconf { } } - let name_to_find = question.entry.name.to_lowercase(); - let service = match self.my_services.get(&name_to_find) { - Some(s) => s, - None => continue, + let query_name = question.entry.name.to_lowercase(); + let service_opt = self + .my_services + .iter() + .find(|(k, _v)| { + let service_name = match dns_registry.name_changes.get(k.as_str()) { + Some(new_name) => new_name, + None => k, + }; + service_name == &query_name + }) + .map(|(_, v)| v); + + let Some(service) = service_opt else { + continue; }; - if qtype == TYPE_SRV || qtype == TYPE_ANY { + if service.get_status(intf) != ServiceStatus::Announced { + continue; + } + + if qtype == RR_TYPE_SRV || qtype == RR_TYPE_ANY { out.add_answer( &msg, DnsSrv::new( @@ -1960,7 +2272,7 @@ impl Zeroconf { ); } - if qtype == TYPE_TXT || qtype == TYPE_ANY { + if qtype == RR_TYPE_TXT || qtype == RR_TYPE_ANY { out.add_answer( &msg, DnsTxt::new( @@ -1972,7 +2284,7 @@ impl Zeroconf { ); } - if qtype == TYPE_SRV { + if qtype == RR_TYPE_SRV { let intf_addrs = service.get_addrs_on_intf(intf); if intf_addrs.is_empty() { error!( @@ -1999,6 +2311,7 @@ impl Zeroconf { send_dns_outgoing(&out, intf, sock); self.increase_counter(Counter::Respond, 1); + self.notify_monitors(DaemonEvent::Respond(intf.ip())); } self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count); @@ -2042,7 +2355,7 @@ impl Zeroconf { instance_name.to_string(), ); match sender.send(event) { - Ok(()) => debug!("Sent ServiceRemoved to listener successfully"), + Ok(()) => info!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"), Err(e) => error!("Failed to send event: {}", e), } } @@ -2084,7 +2397,7 @@ impl Zeroconf { self.query_cache_for_service(&ty, &listener); } - self.send_query(&ty, TYPE_PTR); + self.send_query(&ty, RR_TYPE_PTR); self.increase_counter(Counter::Browse, 1); let next_time = current_time_millis() + (next_delay * 1000) as u64; @@ -2118,7 +2431,7 @@ impl Zeroconf { self.query_cache_for_hostname(&hostname, listener.clone()); } - self.send_query_vec(&[(&hostname, TYPE_A), (&hostname, TYPE_AAAA)]); + self.send_query_vec(&[(&hostname, RR_TYPE_A), (&hostname, RR_TYPE_AAAA)]); self.increase_counter(Counter::ResolveHostname, 1); let now = current_time_millis(); @@ -2259,20 +2572,42 @@ impl Zeroconf { } } - fn exec_command_register_resend(&mut self, fullname: String) { - match self.my_services.get(&fullname) { - Some(info) => { - let outgoing_addrs = self.send_unsolicited_response(info); - if !outgoing_addrs.is_empty() { - self.notify_monitors(DaemonEvent::Announce( - fullname, - format!("{:?}", &outgoing_addrs), - )); - } - self.increase_counter(Counter::RegisterResend, 1); + fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) { + let Some(info) = self.my_services.get_mut(&fullname) else { + debug!("announce: cannot find such service {}", &fullname); + return; + }; + + let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else { + return; + }; + + let Some(sock) = self.intf_socks.get(&intf) else { + return; + }; + + if announce_service_on_intf(dns_registry, info, &intf, sock) { + let mut hostname = info.get_hostname(); + if let Some(new_name) = dns_registry.name_changes.get(hostname) { + hostname = new_name; } - None => debug!("announce: cannot find such service {}", &fullname), + let service_name = match dns_registry.name_changes.get(&fullname) { + Some(new_name) => new_name.to_string(), + None => fullname, + }; + + info!("resend: announce service {} on {}", service_name, intf.ip()); + + notify_monitors( + &mut self.monitors, + DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())), + ); + info.set_status(&intf, ServiceStatus::Announced); + } else { + error!("register-resend should not fail"); } + + self.increase_counter(Counter::RegisterResend, 1); } /// Refresh cached service records with active queriers @@ -2286,7 +2621,7 @@ impl Zeroconf { let refreshed_timers = self.cache.refresh_due_ptr(ty_domain); if !refreshed_timers.is_empty() { debug!("sending refresh query for PTR: {}", ty_domain); - self.send_query(ty_domain, TYPE_PTR); + self.send_query(ty_domain, RR_TYPE_PTR); query_ptr_count += 1; new_timers.extend(refreshed_timers); } @@ -2294,14 +2629,14 @@ impl Zeroconf { let (instances, timers) = self.cache.refresh_due_srv(ty_domain); for instance in instances.iter() { debug!("sending refresh query for SRV: {}", instance); - self.send_query(instance, TYPE_SRV); + self.send_query(instance, RR_TYPE_SRV); query_srv_count += 1; } new_timers.extend(timers); let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain); for hostname in hostnames.iter() { debug!("sending refresh queries for A and AAAA: {}", hostname); - self.send_query_vec(&[(hostname, TYPE_A), (hostname, TYPE_AAAA)]); + self.send_query_vec(&[(hostname, RR_TYPE_A), (hostname, RR_TYPE_AAAA)]); query_addr_count += 2; } new_timers.extend(timers); @@ -2366,6 +2701,38 @@ pub enum DaemonEvent { /// Daemon detected a IP address removed from the host. IpDel(IpAddr), + + /// Daemon resolved a name conflict by changing one of its names. + /// see [DnsNameChange] for more details. + NameChange(DnsNameChange), + + /// Send out a multicast response via an IP address. + Respond(IpAddr), +} + +/// Represents a name change due to a name conflict resolution. +/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9) +#[derive(Clone, Debug)] +pub struct DnsNameChange { + /// The original name set in `ServiceInfo` by the user. + pub original: String, + + /// A new name is created by appending a suffix after the original name. + /// + /// - for a service instance name, the suffix is `(N)`, where N starts at 2. + /// - for a host name, the suffix is `-N`, where N starts at 2. + /// + /// For example: + /// + /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp` + /// - Host name `foo.local.` becomes `foo-2.local.` + pub new_name: String, + + /// The value is one of `RR_TYPE_` constants. + pub rr_type: u16, + + /// The interface where the name conflict and its change happened. + pub intf_name: String, } /// Commands supported by the daemon @@ -2384,7 +2751,7 @@ enum Command { Unregister(String, Sender), // (fullname) /// Announce again a service to local network - RegisterResend(String), // (fullname) + RegisterResend(String, Interface), // (fullname) /// Resend unregister packet. UnregisterResend(Vec, Interface), // (packet content) @@ -2423,7 +2790,7 @@ impl fmt::Display for Command { Self::GetMetrics(_) => write!(f, "Command GetMetrics"), Self::Monitor(_) => write!(f, "Command Monitor"), Self::Register(_) => write!(f, "Command Register"), - Self::RegisterResend(_) => write!(f, "Command RegisterResend"), + Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"), Self::SetOption(_) => write!(f, "Command SetOption"), Self::StopBrowse(_) => write!(f, "Command StopBrowse"), Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"), @@ -2566,7 +2933,7 @@ fn my_ip_interfaces() -> Vec { fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &Socket) -> Vec> { let qtype = if out.is_query() { "query" } else { "response" }; debug!( - "Multicasting {}: {} questions {} answers {} authorities {} additional", + "send outgoing {}: {} questions {} answers {} authorities {} additional", qtype, out.questions.len(), out.answers.len(), @@ -2615,15 +2982,242 @@ fn valid_instance_name(name: &str) -> bool { name.split('.').count() >= 5 } +fn notify_monitors(monitors: &mut Vec>, event: DaemonEvent) { + monitors.retain(|sender| { + if let Err(e) = sender.try_send(event.clone()) { + error!("notify_monitors: try_send: {}", &e); + if matches!(e, TrySendError::Disconnected(_)) { + return false; // This monitor is dropped. + } + } + true + }); +} + +/// Check if all unique records passed "probing", and if yes, create a packet +/// to announce the service. +fn prepare_announce( + info: &ServiceInfo, + intf: &Interface, + dns_registry: &mut DnsRegistry, +) -> Option { + let intf_addrs = info.get_addrs_on_intf(intf); + if intf_addrs.is_empty() { + debug!("No valid addrs to add on intf {:?}", &intf); + return None; + } + + // check if we changed our name due to conflicts. + let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) { + Some(new_name) => new_name, + None => info.get_fullname(), + }; + + info!( + "prepare to announce service {service_fullname} on {}: {}", + &intf.name, + &intf.ip() + ); + + let mut probing_count = 0; + let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA); + let create_time = current_time_millis() + fastrand::u64(0..250); + + out.add_answer_at_time( + DnsPointer::new( + info.get_type(), + RR_TYPE_PTR, + CLASS_IN, + info.get_other_ttl(), + service_fullname.to_string(), + ), + 0, + ); + + if let Some(sub) = info.get_subtype() { + debug!("Adding subdomain {}", sub); + out.add_answer_at_time( + DnsPointer::new( + sub, + RR_TYPE_PTR, + CLASS_IN, + info.get_other_ttl(), + service_fullname.to_string(), + ), + 0, + ); + } + + // SRV records. + let hostname = match dns_registry.name_changes.get(info.get_hostname()) { + Some(new_name) => new_name.to_string(), + None => info.get_hostname().to_string(), + }; + + let mut srv = DnsSrv::new( + info.get_fullname(), + CLASS_IN | CLASS_CACHE_FLUSH, + info.get_host_ttl(), + info.get_priority(), + info.get_weight(), + info.get_port(), + hostname, + ); + + if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) { + srv.get_record_mut().set_new_name(new_name.to_string()); + } + + if !info.requires_probe() + || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time) + { + out.add_answer_at_time(srv, 0); + } else { + probing_count += 1; + } + + // TXT records. + + let mut txt = DnsTxt::new( + info.get_fullname(), + CLASS_IN | CLASS_CACHE_FLUSH, + info.get_other_ttl(), + info.generate_txt(), + ); + + if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) { + txt.get_record_mut().set_new_name(new_name.to_string()); + } + + if !info.requires_probe() + || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time) + { + out.add_answer_at_time(txt, 0); + } else { + probing_count += 1; + } + + // Address records. (A and AAAA) + + let hostname = info.get_hostname(); + for address in intf_addrs { + let mut dns_addr = DnsAddress::new( + hostname, + ip_address_to_type(&address), + CLASS_IN | CLASS_CACHE_FLUSH, + info.get_host_ttl(), + address, + ); + + if let Some(new_name) = dns_registry.name_changes.get(hostname) { + dns_addr.get_record_mut().set_new_name(new_name.to_string()); + } + + if !info.requires_probe() + || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time) + { + out.add_answer_at_time(dns_addr, 0); + } else { + probing_count += 1; + } + } + + if probing_count > 0 { + return None; + } + + Some(out) +} + +/// Send an unsolicited response for owned service via `intf` and `sock`. +/// Returns true if sent out successfully. +fn announce_service_on_intf( + dns_registry: &mut DnsRegistry, + info: &ServiceInfo, + intf: &Interface, + sock: &Socket, +) -> bool { + if let Some(out) = prepare_announce(info, intf, dns_registry) { + send_dns_outgoing(&out, intf, sock); + return true; + } + false +} + +/// Returns a new name based on the `original` to avoid conflicts. +/// If the name already contains a number in parentheses, increments that number. +/// +/// Examples: +/// - `foo.local.` becomes `foo (2).local.` +/// - `foo (2).local.` becomes `foo (3).local.` +/// - `foo (9)` becomes `foo (10)` +fn name_change(original: &str) -> String { + let mut parts: Vec<_> = original.split('.').collect(); + let Some(first_part) = parts.get_mut(0) else { + return format!("{original} (2)"); + }; + + let mut new_name = format!("{} (2)", first_part); + + // check if there is already has `()` suffix. + if let Some(paren_pos) = first_part.rfind(" (") { + // Check if there's a closing parenthesis + if let Some(end_paren) = first_part[paren_pos..].find(')') { + let absolute_end_pos = paren_pos + end_paren; + // Only process if the closing parenthesis is the last character + if absolute_end_pos == first_part.len() - 1 { + let num_start = paren_pos + 2; // Skip " (" + // Try to parse the number between parentheses + if let Ok(number) = first_part[num_start..absolute_end_pos].parse::() { + let base_name = &first_part[..paren_pos]; + new_name = format!("{} ({})", base_name, number + 1) + } + } + } + } + + *first_part = &new_name; + parts.join(".") +} + +/// Returns a new name based on the `original` to avoid conflicts. +/// If the name already contains a hyphenated number, increments that number. +/// +/// Examples: +/// - `foo.local.` becomes `foo-2.local.` +/// - `foo-2.local.` becomes `foo-3.local.` +/// - `foo` becomes `foo-2` +fn hostname_change(original: &str) -> String { + let mut parts: Vec<_> = original.split('.').collect(); + let Some(first_part) = parts.get_mut(0) else { + return format!("{original}-2"); + }; + + let mut new_name = format!("{}-2", first_part); + + // check if there is already a `-` suffix + if let Some(hyphen_pos) = first_part.rfind('-') { + // Try to parse everything after the hyphen as a number + if let Ok(number) = first_part[hyphen_pos + 1..].parse::() { + let base_name = &first_part[..hyphen_pos]; + new_name = format!("{}-{}", base_name, number + 1); + } + } + + *first_part = &new_name; + parts.join(".") +} + #[cfg(test)] mod tests { use super::{ - check_domain_suffix, check_service_name_length, my_ip_interfaces, new_socket_bind, - send_dns_outgoing, valid_instance_name, HostnameResolutionEvent, ServiceDaemon, - ServiceEvent, ServiceInfo, GROUP_ADDR_V4, MDNS_PORT, + check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces, + name_change, new_socket_bind, send_dns_outgoing, valid_instance_name, + HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4, + MDNS_PORT, }; use crate::{ - dns_parser::{DnsOutgoing, DnsPointer, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE, TYPE_PTR}, + dns_parser::{DnsOutgoing, DnsPointer, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE, RR_TYPE_PTR}, service_daemon::check_hostname, }; use std::{ @@ -2691,7 +3285,7 @@ mod tests { } #[test] - fn service_with_temporarily_invalidated_ptr() { + fn test_service_with_temporarily_invalidated_ptr() { // Create a daemon let d = ServiceDaemon::new().expect("Failed to create daemon"); @@ -2758,7 +3352,7 @@ mod tests { // Invalidate the ptr from the service to the host. let invalidate_ptr_packet = DnsPointer::new( my_service.get_type(), - TYPE_PTR, + RR_TYPE_PTR, CLASS_IN, 0, my_service.get_fullname().to_string(), @@ -2810,7 +3404,7 @@ mod tests { // let fullname = my_service.get_fullname().to_string(); // set SRV to expire soon. - let new_ttl = 2; // for testing only. + let new_ttl = 3; // for testing only. my_service._set_host_ttl(new_ttl); // register my service @@ -2820,7 +3414,7 @@ mod tests { let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client"); let browse_chan = mdns_client.browse(service_type).unwrap(); - let timeout = Duration::from_secs(1); + let timeout = Duration::from_secs(2); let mut resolved = false; while let Ok(event) = browse_chan.recv_timeout(timeout) { @@ -2949,7 +3543,7 @@ mod tests { ) .unwrap(); - let new_ttl = 2; // for testing only. + let new_ttl = 3; // for testing only. my_service._set_other_ttl(new_ttl); // register my service @@ -2977,7 +3571,7 @@ mod tests { assert!(resolved); // wait over 80% of TTL, and refresh PTR should be sent out. - let timeout = Duration::from_millis(1800); + let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100); while let Ok(event) = browse_chan.recv_timeout(timeout) { println!("event: {:?}", &event); } @@ -2992,4 +3586,28 @@ mod tests { mdns_server.shutdown().unwrap(); mdns_client.shutdown().unwrap(); } + + #[test] + fn test_name_change() { + assert_eq!(name_change("foo.local."), "foo (2).local."); + assert_eq!(name_change("foo (2).local."), "foo (3).local."); + assert_eq!(name_change("foo (9).local."), "foo (10).local."); + assert_eq!(name_change("foo"), "foo (2)"); + assert_eq!(name_change("foo (2)"), "foo (3)"); + assert_eq!(name_change(""), " (2)"); + + // Additional edge cases + assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number + assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis + assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number + } + + #[test] + fn test_hostname_change() { + assert_eq!(hostname_change("foo.local."), "foo-2.local."); + assert_eq!(hostname_change("foo"), "foo-2"); + assert_eq!(hostname_change("foo-2.local."), "foo-3.local."); + assert_eq!(hostname_change("foo-9"), "foo-10"); + assert_eq!(hostname_change("test-42.domain."), "test-43.domain."); + } } diff --git a/src/service_info.rs b/src/service_info.rs index aacdd46..7684271 100644 --- a/src/service_info.rs +++ b/src/service_info.rs @@ -1,10 +1,14 @@ //! Define `ServiceInfo` to represent a service and its operations. #[cfg(feature = "logging")] -use crate::log::error; -use crate::{dns_parser::split_sub_domain, Error, Result}; +use crate::log::{error, info}; +use crate::{ + dns_parser::{rr_type_name, split_sub_domain, DnsRecordBox, DnsRecordExt, DnsSrv, RR_TYPE_SRV}, + Error, Result, +}; use if_addrs::{IfAddr, Interface}; use std::{ + cmp, collections::{HashMap, HashSet}, convert::TryInto, fmt, @@ -38,6 +42,18 @@ pub struct ServiceInfo { weight: u16, txt_properties: TxtProperties, addr_auto: bool, // Let the system update addresses automatically. + + status: HashMap, + + /// Whether we need to probe names before announcing this service. + requires_probe: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum ServiceStatus { + Probing, + Announced, + Unknown, } impl ServiceInfo { @@ -121,6 +137,8 @@ impl ServiceInfo { weight: 0, txt_properties, addr_auto: false, + status: HashMap::new(), + requires_probe: true, }; Ok(this) @@ -140,6 +158,21 @@ impl ServiceInfo { self.addr_auto } + /// Set whether this service info requires name probing for potential name conflicts. + /// + /// By default, it is true (i.e. requires probing) for every service info. You + /// set it to `false` only when you are sure there are no conflicts, or for testing purposes. + pub fn set_requires_probe(&mut self, enable: bool) { + self.requires_probe = enable; + } + + /// Returns whether this service info requires name probing for potential name conflicts. + /// + /// By default, it returns true for every service info. + pub const fn requires_probe(&self) -> bool { + self.requires_probe + } + /// Returns the service type including the domain label. /// /// For example: "_my-service._udp.local.". @@ -315,6 +348,24 @@ impl ServiceInfo { pub(crate) fn _set_other_ttl(&mut self, ttl: u32) { self.other_ttl = ttl; } + + pub(crate) fn set_status(&mut self, intf: &Interface, status: ServiceStatus) { + match self.status.get_mut(intf) { + Some(service_status) => { + *service_status = status; + } + None => { + self.status.entry(intf.clone()).or_insert(status); + } + } + } + + pub(crate) fn get_status(&self, intf: &Interface) -> ServiceStatus { + self.status + .get(intf) + .cloned() + .unwrap_or(ServiceStatus::Unknown) + } } /// Removes potentially duplicated ".local." at the end of "hostname". @@ -747,6 +798,261 @@ pub fn valid_two_addrs_on_intf(addr_a: &IpAddr, addr_b: &IpAddr, intf: &Interfac } } +/// A probing for a particular name. +#[derive(Debug)] +pub(crate) struct Probe { + /// All records probing for the same name. + pub(crate) records: Vec, + + /// The fullnames of services that are probing these records. + /// These are the original service names, will not change per conflicts. + pub(crate) waiting_services: HashSet, + + /// The time (T) to send the first query . + pub(crate) start_time: u64, + + /// The time to send the next (including the first) query. + pub(crate) next_send: u64, +} + +impl Probe { + pub(crate) fn new(start_time: u64) -> Self { + // RFC 6762: https://datatracker.ietf.org/doc/html/rfc6762#section-8.1: + // + // "250 ms after the first query, the host should send a second; then, + // 250 ms after that, a third. If, by 250 ms after the third probe, no + // conflicting Multicast DNS responses have been received, the host may + // move to the next step, announcing. " + let next_send = start_time; + + Self { + records: Vec::new(), + waiting_services: HashSet::new(), + start_time, + next_send, + } + } + + /// Add a new record with the same probing name in a sorted order. + pub(crate) fn insert_record(&mut self, record: DnsRecordBox) { + /* + RFC 6762: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2.1 + + " The records are sorted using the same lexicographical order as + described above, that is, if the record classes differ, the record + with the lower class number comes first. If the classes are the same + but the rrtypes differ, the record with the lower rrtype number comes + first." + */ + let insert_position = self + .records + .binary_search_by( + |existing| match existing.get_class().cmp(&record.get_class()) { + std::cmp::Ordering::Equal => existing.get_type().cmp(&record.get_type()), + other => other, + }, + ) + .unwrap_or_else(|pos| pos); + + self.records.insert(insert_position, record); + } + + /// Compares with `incoming` records. Returns `Less` if we yield. + pub(crate) fn tiebreaking(&self, incoming: &[&DnsRecordBox]) -> cmp::Ordering { + /* + RFC 6762: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2 + + " If the host finds that its + own data is lexicographically earlier, then it defers to the winning + host by waiting one second, and then begins probing for this record + again." + */ + let min_len = self.records.len().min(incoming.len()); + + // Compare elements up to the length of the shorter vector + for (i, incoming_record) in incoming.iter().enumerate().take(min_len) { + match self.records[i].compare(incoming_record.as_ref()) { + cmp::Ordering::Equal => continue, + other => return other, + } + } + + self.records.len().cmp(&incoming.len()) + } + + pub(crate) fn update_next_send(&mut self, now: u64) { + self.next_send = now + 250; + } + + /// Returns whether this probe is finished. + pub(crate) fn expired(&self, now: u64) -> bool { + // The 2nd query is T + 250ms, the 3rd query is T + 500ms, + // The expire time is T + 750ms + now >= self.start_time + 750 + } +} + +/// DNS records of all the registered services. +pub(crate) struct DnsRegistry { + /// keyed by the name of all related records. + /* + When a host is probing for a group of related records with the same + name (e.g., the SRV and TXT record describing a DNS-SD service), only + a single question need be placed in the Question Section, since query + type "ANY" (255) is used, which will elicit answers for all records + with that name. However, for tiebreaking to work correctly in all + cases, the Authority Section must contain *all* the records and + proposed rdata being probed for uniqueness. + */ + pub(crate) probing: HashMap, + + /// Already done probing, or no need to probe. + pub(crate) active: HashMap>, + + /// timers of the newly added probes. + pub(crate) new_timers: Vec, + + /// Mapping from original names to new names. + pub(crate) name_changes: HashMap, +} + +impl DnsRegistry { + pub(crate) fn new() -> Self { + Self { + probing: HashMap::new(), + active: HashMap::new(), + new_timers: Vec::new(), + name_changes: HashMap::new(), + } + } + + pub(crate) fn is_probing_done( + &mut self, + answer: &T, + service_name: &str, + start_time: u64, + ) -> bool + where + T: DnsRecordExt + Send + 'static, + { + if let Some(active_records) = self.active.get(answer.get_name()) { + for record in active_records.iter() { + if answer.matches(record.as_ref()) { + info!( + "found active record {} {}", + rr_type_name(answer.get_type()), + answer.get_name(), + ); + return true; + } + } + } + + let probe = self + .probing + .entry(answer.get_name().to_string()) + .or_insert_with(|| { + info!("new probe of {}", answer.get_name()); + Probe::new(start_time) + }); + + self.new_timers.push(probe.next_send); + + for record in probe.records.iter() { + if answer.matches(record.as_ref()) { + info!( + "found existing record {} in probe of '{}'", + rr_type_name(answer.get_type()), + answer.get_name(), + ); + probe.waiting_services.insert(service_name.to_string()); + return false; // Found existing probe for the same record. + } + } + + info!( + "insert record {} into probe of {}", + rr_type_name(answer.get_type()), + answer.get_name(), + ); + probe.insert_record(answer.clone_box()); + probe.waiting_services.insert(service_name.to_string()); + + false + } + + /// check all records in "probing" and "active": + /// if the record is SRV, and hostname is set to original, remove it. + /// and create a new SRV with "host" set to "new_name" and put into "probing". + pub(crate) fn update_hostname( + &mut self, + original: &str, + new_name: &str, + probe_time: u64, + ) -> bool { + let mut found_records = Vec::new(); + let mut new_timer_added = false; + + for (_name, probe) in self.probing.iter_mut() { + probe.records.retain(|record| { + if record.get_type() == RR_TYPE_SRV { + if let Some(srv) = record.any().downcast_ref::() { + if srv.host == original { + let mut new_record = srv.clone(); + new_record.host = new_name.to_string(); + found_records.push(new_record); + return false; + } + } + } + true + }); + } + + for (_name, records) in self.active.iter_mut() { + records.retain(|record| { + if record.get_type() == RR_TYPE_SRV { + if let Some(srv) = record.any().downcast_ref::() { + if srv.host == original { + let mut new_record = srv.clone(); + new_record.host = new_name.to_string(); + found_records.push(new_record); + return false; + } + } + } + true + }); + } + + for record in found_records { + let probe = match self.probing.get_mut(record.get_name()) { + Some(p) => { + p.start_time = probe_time; // restart this probe. + p + } + None => { + let new_probe = self + .probing + .entry(record.get_name().to_string()) + .or_insert_with(|| Probe::new(probe_time)); + new_timer_added = true; + new_probe + } + }; + + info!( + "insert record {} with new hostname {new_name} into probe for: {}", + rr_type_name(record.get_type()), + record.get_name() + ); + probe.insert_record(Box::new(record)); + } + + new_timer_added + } +} + #[cfg(test)] mod tests { use super::{ diff --git a/tests/mdns_test.rs b/tests/mdns_test.rs index 620f53b..e5571f6 100644 --- a/tests/mdns_test.rs +++ b/tests/mdns_test.rs @@ -7,7 +7,7 @@ use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::thread::sleep; use std::time::{Duration, SystemTime}; -// use test_log::test; // commented out for debugging a flaky test in CI. +use test_log::test; /// This test covers: /// register(announce), browse(query), response, unregister, shutdown. @@ -186,7 +186,7 @@ fn integration_success() { println!("metrics: {:?}", &metrics); assert_eq!(metrics["register"], 1); assert_eq!(metrics["unregister"], 1); - assert_eq!(metrics["register-resend"], 1); + assert!(metrics["register-resend"] >= 1); println!("unique interface set: {:?}", unique_intf_idx_ip_ver_set); assert_eq!( @@ -540,7 +540,7 @@ fn service_with_named_interface_only() { // Browse again. let browse_chan = d.browse(my_ty_domain).unwrap(); - let timeout = Duration::from_secs(2); + let timeout = Duration::from_secs(3); let mut resolved = false; while let Ok(event) = browse_chan.recv_timeout(timeout) { @@ -745,14 +745,14 @@ fn subtype() { let d = ServiceDaemon::new().expect("Failed to create daemon"); // Register a service with a subdomain - let subtype_domain = "_directory._sub._test._tcp.local."; - let ty_domain = "_test._tcp.local."; + let subtype_domain = "_directory._sub._test-subtype._tcp.local."; + let ty_domain = "_test-subtype._tcp.local."; let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); let instance_name = now.as_micros().to_string(); // Create a unique name. let host_ipv4 = my_ip_interfaces()[0].ip().to_string(); - let host_name = "my_host.local."; + let host_name = "subtype_host.local."; let port = 5201; let my_service = ServiceInfo::new( subtype_domain, @@ -800,7 +800,7 @@ fn subtype() { /// Verify service name has to be valid. #[test] -fn service_name_check() { +fn test_service_name_check() { // Create a daemon for the server. let server_daemon = ServiceDaemon::new().expect("Failed to create server daemon"); let monitor = server_daemon.monitor().unwrap(); @@ -809,7 +809,7 @@ fn service_name_check() { let host_ipv4 = ""; let host_name = "my_host.local."; let port = 5200; - let my_service = ServiceInfo::new( + let mut my_service = ServiceInfo::new( service_name_too_long, "my_instance", host_name, @@ -819,6 +819,9 @@ fn service_name_check() { ) .expect("valid service info") .enable_addr_auto(); + + my_service.set_requires_probe(false); + let result = server_daemon.register(my_service.clone()); assert!(result.is_ok()); @@ -930,10 +933,13 @@ fn instance_name_two_dots() { assert!(result.is_ok()); // Verify that the service was published successfully. - let event = monitor.recv_timeout(Duration::from_millis(500)).unwrap(); + let publish_timeout = 1000; + let event = monitor + .recv_timeout(Duration::from_millis(publish_timeout)) + .unwrap(); assert!(matches!(event, DaemonEvent::Announce(_, _))); - // Browseing the service. + // Browse the service. let receiver = server_daemon.browse(service_type).unwrap(); let mut resolved = false; let timeout = Duration::from_secs(2); @@ -1163,7 +1169,7 @@ fn hostname_resolution_timeout() { d.shutdown().unwrap(); } -#[test_log::test] +#[test] fn test_cache_flush_record() { // Create a daemon let server = ServiceDaemon::new().expect("Failed to create server"); @@ -1456,6 +1462,324 @@ fn test_domain_suffix_in_browse() { mdns_client.shutdown().unwrap(); } +#[test] +fn test_name_conflict_resolution() { + // This test registers two services using the same names, but different IP addresses. + let ty_domain = "_conflict-test._udp.local."; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let instance_name = now.as_micros().to_string(); // Create a unique name. + let host_name = "conflict_host.local."; + let port = 5200; + + // Register the first service. + let server1 = ServiceDaemon::new().expect("failed to start server1"); + + // Get a single IPv4 address + let ip_addr1 = my_ip_interfaces() + .iter() + .find(|iface| iface.ip().is_ipv4()) + .map(|iface| iface.ip()) + .unwrap(); + + // Publish the service on server1 + let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None) + .expect("valid service info"); + server1 + .register(service1) + .expect("Failed to register service1"); + + // wait for the service announced. + sleep(Duration::from_secs(1)); + + // Register the second service. + let server2 = ServiceDaemon::new().expect("failed to start server2"); + + // Modify the IPv4 address for the service. + let IpAddr::V4(ipv4) = ip_addr1 else { + assert!(false); + return; + }; + let bytes = ipv4.octets(); + let ip_addr2 = IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3] + 1)); + + let service2 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr2, port, None) + .expect("failed to create ServiceInfo for service2"); + server2 + .register(service2) + .expect("failed to register service2"); + + // Verify name change event for the second service, due to the name conflict. + let server2_monitor = server2.monitor().unwrap(); + let timeout = Duration::from_secs(2); + let mut name_changed = false; + while let Ok(event) = server2_monitor.recv_timeout(timeout) { + match event { + DaemonEvent::NameChange(change) => { + println!("server2 daemon event: {:?}", change); + name_changed = true; + break; + } + other => println!("server2 other event: {:?}", other), + } + } + assert!(name_changed); + + // Verify both services are resolved. + let client = ServiceDaemon::new().expect("failed to create mdns client"); + let receiver = client.browse(ty_domain).unwrap(); + + let timeout = Duration::from_secs(3); + let mut service_names = HashSet::new(); + + while let Ok(event) = receiver.recv_timeout(timeout) { + match event { + ServiceEvent::ServiceResolved(info) => { + println!( + "Resolved a service: {} host {} IP {:?}", + info.get_fullname(), + info.get_hostname(), + info.get_addresses_v4() + ); + + service_names.insert(info.get_fullname().to_string()); + + // Find and verify name conflict resolution. + + if info.get_fullname().contains("(2)") { + assert_eq!(info.get_hostname(), "conflict_host-2.local."); + break; + } + } + _ => {} + } + } + + // Verify that we have resolve two services instead of one. + assert_eq!(service_names.len(), 2); +} + +#[test] +fn test_name_tiebreaking() { + // This test registers two services using the same names, but different IP addresses, + // same as `test_name_conflict_resolution`, the only difference being that two servers + // do the probing at the same time. Hence tiebreaking. Server2 should win. + + let ty_domain = "_tiebreaking._udp.local."; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let instance_name = now.as_micros().to_string(); // Create a unique name. + let host_name = "tiebreaking_host.local."; + let port = 5200; + + // Register the first service. + let server1 = ServiceDaemon::new().expect("failed to start server1"); + + // Get a single IPv4 address + let ip_addr1 = my_ip_interfaces() + .iter() + .find(|iface| iface.ip().is_ipv4()) + .map(|iface| iface.ip()) + .unwrap(); + + // Publish the service on server1 + let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None) + .expect("valid service info"); + server1 + .register(service1) + .expect("Failed to register service1"); + + // Register the second service immediately to trigger tiebreaking. + let server2 = ServiceDaemon::new().expect("failed to start server2"); + + // Modify the IPv4 address for the service. + let IpAddr::V4(ipv4_2) = ip_addr1 else { + assert!(false); + return; + }; + let bytes = ipv4_2.octets(); + let ip_addr2 = IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3] + 1)); + + let service2 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr2, port, None) + .expect("failed to create ServiceInfo for service2"); + server2 + .register(service2) + .expect("failed to register service2"); + + // Verify name change event for the first service, per tiebreaking rules. + let server1_monitor = server1.monitor().unwrap(); + let timeout = Duration::from_secs(2); + let mut name_changed = false; + + while let Ok(event) = server1_monitor.recv_timeout(timeout) { + match event { + DaemonEvent::NameChange(change) => { + println!("server1 daemon event: {:?}", change); + name_changed = true; + break; + } + other => println!("server1 other event: {:?}", other), + } + } + assert!(name_changed); + + // Verify both services are resolved. + let client = ServiceDaemon::new().expect("failed to create mdns client"); + let receiver = client.browse(ty_domain).unwrap(); + + let timeout = Duration::from_secs(3); + let mut resolved_services = vec![]; + + while let Ok(event) = receiver.recv_timeout(timeout) { + match event { + ServiceEvent::ServiceResolved(info) => { + println!( + "Resolved a service: {} host {} IP {:?}", + info.get_fullname(), + info.get_hostname(), + info.get_addresses_v4() + ); + + resolved_services.push(info); + if resolved_services.len() == 2 { + break; + } + } + _ => {} + } + } + + // Verify that we have resolve two services instead of one. + assert_eq!(resolved_services.len(), 2); + + // Verify that server2 was resolved first, i.e. won the tiebreaking. + let first_addrs = resolved_services[0].get_addresses(); + assert_eq!(first_addrs.iter().next().unwrap(), &ip_addr2); +} + +#[test] +fn test_name_conflict_3() { + // Similar to `test_name_conflict_resolution` but with 3 servers. + let ty_domain = "_conflict-3._udp.local."; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let instance_name = now.as_micros().to_string(); // Create a unique name. + let host_name = "conflict3_host.local."; + let port = 5200; + + // Register the first service. + let server1 = ServiceDaemon::new().expect("failed to start server1"); + + // Get a single IPv4 address + let ip_addr1 = my_ip_interfaces() + .iter() + .find(|iface| iface.ip().is_ipv4()) + .map(|iface| iface.ip()) + .unwrap(); + + // Publish the service on server1 + let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None) + .expect("valid service info"); + server1 + .register(service1) + .expect("Failed to register service1"); + + // wait for the service announced. + sleep(Duration::from_secs(1)); + + // Register the second service. + let server2 = ServiceDaemon::new().expect("failed to start server2"); + + // Modify the IPv4 address for the service. + let IpAddr::V4(ipv4) = ip_addr1 else { + assert!(false); + return; + }; + let bytes = ipv4.octets(); + let ip_addr2 = IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3] + 1)); + + let info2 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr2, port, None) + .expect("failed to create ServiceInfo for service2"); + server2 + .register(info2) + .expect("failed to register service2"); + + // Verify name change event for the second service, due to the name conflict. + let server2_monitor = server2.monitor().unwrap(); + let timeout = Duration::from_secs(2); + let mut name_changed = false; + while let Ok(event) = server2_monitor.recv_timeout(timeout) { + match event { + DaemonEvent::NameChange(change) => { + println!("server2 daemon event: {:?}", change); + name_changed = true; + } + other => println!("server2 other event: {:?}", other), + } + } + assert!(name_changed); + + // Register the third service + let server3 = ServiceDaemon::new().expect("failed to start server2"); + + // Modify the IPv4 address for the service. + let ip_addr3 = IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3] + 2)); + + let info3 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr3, port, None) + .expect("failed to create ServiceInfo for service2"); + + server3 + .register(info3) + .expect("failed to register service2"); + + let server3_monitor = server3.monitor().unwrap(); + let timeout = Duration::from_secs(3); + name_changed = false; + while let Ok(event) = server3_monitor.recv_timeout(timeout) { + match event { + DaemonEvent::NameChange(change) => { + println!("server3 daemon event: {:?}", change); + name_changed = true; + break; + } + other => println!("server3 other event: {:?}", other), + } + } + assert!(name_changed); + + // Verify all services are resolved. + let client = ServiceDaemon::new().expect("failed to create mdns client"); + let receiver = client.browse(ty_domain).unwrap(); + + let timeout = Duration::from_secs(3); + let mut service_names = HashSet::new(); + + while let Ok(event) = receiver.recv_timeout(timeout) { + match event { + ServiceEvent::ServiceResolved(info) => { + println!( + "Resolved a service: {} host {} IP {:?}", + info.get_fullname(), + info.get_hostname(), + info.get_addresses_v4() + ); + + service_names.insert(info.get_fullname().to_string()); + if service_names.len() >= 3 { + break; + } + } + _ => {} + } + } + + // Verify that we have resolve two services instead of one. + assert_eq!(service_names.len(), 3); +} + /// A helper function to include a timestamp for println. fn timed_println(msg: String) { let now = SystemTime::now();