diff --git a/Makefile b/Makefile index eb5b73cf..4cd643d9 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ webui-dev: webui-deps # NOTE: on LG TV using hostname is unstable for some reason, use IP address. export RQBIT_UPNP_SERVER_ENABLE ?= true export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev -export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030 +export RQBIT_HTTP_API_LISTEN_ADDR ?= [::]:3030 export RQBIT_FASTRESUME = true CARGO_RUN_FLAGS ?= RQBIT_OUTPUT_FOLDER ?= /tmp/scratch diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index b9e2d0c4..1c0dee28 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1198,17 +1198,13 @@ impl Session { peer_rx, ); - { - let span = managed_torrent.shared.span.clone(); - let _ = span.enter(); - - managed_torrent - .start(peer_rx, opts.paused) - .context("error starting torrent")?; - } + let _e = managed_torrent.shared.span.clone().entered(); + managed_torrent + .start(peer_rx, opts.paused) + .context("error starting torrent")?; if let Some(name) = managed_torrent.shared().info.name.as_ref() { - info!(?name, id, "added torrent"); + info!(?name, "added torrent"); } Ok(AddTorrentResponse::Added(id, managed_torrent)) diff --git a/crates/upnp-serve/src/constants.rs b/crates/upnp-serve/src/constants.rs index ad43f984..c14fe7c2 100644 --- a/crates/upnp-serve/src/constants.rs +++ b/crates/upnp-serve/src/constants.rs @@ -1,5 +1,5 @@ -pub const UPNP_KIND_ROOT_DEVICE: &str = "upnp:rootdevice"; -pub const UPNP_KIND_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1"; +pub const UPNP_DEVICE_ROOT: &str = "upnp:rootdevice"; +pub const UPNP_DEVICE_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1"; pub const SOAP_ACTION_CONTENT_DIRECTORY_BROWSE: &[u8] = b"\"urn:schemas-upnp-org:service:ContentDirectory:1#Browse\""; diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 64b7c72d..7fa3cfbe 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -1,20 +1,23 @@ use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + collections::HashSet, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, time::Duration, }; use anyhow::{bail, Context}; use bstr::BStr; +use librqbit_upnp::ipv6_is_link_local; use network_interface::NetworkInterfaceConfig; -use tokio::net::UdpSocket; +use parking_lot::Mutex; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; -use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE}; +use crate::constants::{UPNP_DEVICE_MEDIASERVER, UPNP_DEVICE_ROOT}; -const UPNP_PORT: u16 = 1900; -const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); -const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT); +const SSDP_PORT: u16 = 1900; +const SSDM_MCAST_IPV4: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); +const SSDP_MCAST_IPV6_LINK_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xc); +const SSDP_MCAST_IPV6_SITE_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff05, 0, 0, 0, 0, 0, 0, 0xc); const NTS_ALIVE: &str = "ssdp:alive"; const NTS_BYEBYE: &str = "ssdp:byebye"; @@ -30,6 +33,7 @@ pub enum SsdpMessage<'a, 'h> { #[derive(Debug)] pub struct SsdpMSearchRequest<'a> { + #[allow(dead_code)] pub host: &'a BStr, pub man: &'a BStr, pub st: &'a BStr, @@ -37,13 +41,10 @@ pub struct SsdpMSearchRequest<'a> { impl<'a> SsdpMSearchRequest<'a> { fn matches_media_server(&self) -> bool { - if self.host != "239.255.255.250:1900" { - return false; - } if self.man != "\"ssdp:discover\"" { return false; } - if self.st == UPNP_KIND_ROOT_DEVICE || self.st == UPNP_KIND_MEDIASERVER { + if self.st == UPNP_DEVICE_ROOT || self.st == UPNP_DEVICE_MEDIASERVER { return true; } false @@ -101,69 +102,174 @@ pub struct SsdpRunnerOptions { pub shutdown: CancellationToken, } +struct UdpSocket { + sock2: socket2::Socket, + tokio: tokio::net::UdpSocket, +} + pub struct SsdpRunner { opts: SsdpRunnerOptions, - socket: UdpSocket, + socket_v4: Option, + socket_v6: Option, } -impl SsdpRunner { - pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result { - let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT); - let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None) - .context("error creating socket")?; - #[cfg(not(target_os = "windows"))] - sock.set_reuse_port(true) - .context("error setting SO_REUSEPORT")?; - sock.set_reuse_address(true) - .context("error setting SO_REUSEADDR")?; - - trace!(addr=?bind_addr, "binding UDP"); - sock.bind(&bind_addr.into()) - .context(bind_addr) - .context("error binding")?; - - sock.set_nonblocking(true)?; - let socket = tokio::net::UdpSocket::from_std(sock.into()) - .context("error converting socket2 socket to tokio")?; - - let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED); - let all_multicast_membership_ips = network_interface::NetworkInterface::show() - .into_iter() - .flatten() - .flat_map(|nic| nic.addr.into_iter()) - .filter_map(|addr| { - let ip = addr.ip(); - match ip { - std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => { - Some(addr) - } - _ => None, +fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result { + let domain = if bind_addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, None) + .context(bind_addr) + .context("error creating socket")?; + #[cfg(not(target_os = "windows"))] + sock.set_reuse_port(true) + .context("error setting SO_REUSEPORT")?; + sock.set_reuse_address(true) + .context("error setting SO_REUSEADDR")?; + + trace!(addr=?bind_addr, "binding UDP"); + sock.bind(&bind_addr.into()) + .context(bind_addr) + .context("error binding")?; + + sock.set_nonblocking(true)?; + + let sock_clone = sock.try_clone().context("can't clone socket")?; + + let tokio_socket = tokio::net::UdpSocket::from_std(sock_clone.into()) + .context("error converting socket2 socket to tokio")?; + + Ok(UdpSocket { + sock2: sock, + tokio: tokio_socket, + }) +} + +async fn bind_v4_socket() -> anyhow::Result { + let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, SSDP_PORT); + let socket = socket_presetup(bind_addr.into())?; + + let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED); + let all_multicast_membership_ips = network_interface::NetworkInterface::show() + .into_iter() + .flatten() + .flat_map(|nic| nic.addr.into_iter()) + .filter_map(|addr| { + let ip = addr.ip(); + match ip { + std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => { + Some(addr) } - }); + _ => None, + } + }); + + for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) { + trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group"); + if let Err(e) = socket.tokio.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) { + debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}"); + } + } + + Ok(socket) +} - for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) { - trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "joining multicast v4 group"); - if let Err(e) = socket.join_multicast_v4(UPNP_BROADCAST_IP, ifaddr) { - debug!(error=?e, multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "error joining multicast v4 group"); +async fn bind_v6_socket() -> anyhow::Result { + let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, SSDP_PORT, 0, 0); + let socket = socket_presetup(bind_addr.into())?; + + for nic in network_interface::NetworkInterface::show() + .into_iter() + .flatten() + { + let mut has_link_local = false; + let mut has_site_local = false; + for addr in nic.addr.iter() { + let addr = match addr.ip() { + IpAddr::V4(_) => continue, + IpAddr::V6(v6) => v6, + }; + if addr.is_loopback() { + continue; + } + if ipv6_is_link_local(addr) { + has_link_local = true; + } else { + has_site_local = true; } } + for (present, multiaddr) in [ + (has_site_local, SSDP_MCAST_IPV6_SITE_LOCAL), + (has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL), + ] { + if !present { + continue; + } + trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group"); + if let Err(e) = socket.tokio.join_multicast_v6(&multiaddr, nic.index) { + debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}"); + } + } + } - Ok(Self { opts, socket }) + Ok(socket) +} + +struct MulticastOpts { + interface_addr: IpAddr, + #[allow(dead_code)] + interface_id: u32, + mcast_addr: SocketAddr, +} + +impl MulticastOpts { + fn addr_no_scope(&self) -> SocketAddr { + let mut addr = self.mcast_addr; + if let SocketAddr::V6(v6) = &mut addr { + v6.set_scope_id(0); + } + addr } +} - fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String { +impl SsdpRunner { + pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result { + let socket_v4 = bind_v4_socket() + .await + .map_err(|e| warn!("error creating IPv4 SSDP socket: {e:#}")) + .ok(); + let socket_v6 = bind_v6_socket() + .await + .map_err(|e| warn!("error creating IPv6 SSDP socket: {e:#}")) + .ok(); + Ok(Self { + opts, + socket_v4, + socket_v6, + }) + } + + fn generate_notify_message( + &self, + device_kind: &str, + nts: &str, + opts: &MulticastOpts, + ) -> String { let usn: &str = &self.opts.usn; let server: &str = &self.opts.server_string; - let bcast_addr = UPNP_BROADCAST_ADDR; + let host = opts.addr_no_scope(); + let mut location = self.opts.description_http_location.clone(); + let _ = location.set_ip_host(opts.interface_addr); format!( "NOTIFY * HTTP/1.1\r -Host: {bcast_addr}\r +Host: {host}\r Cache-Control: max-age=75\r Location: {location}\r -NT: {kind}\r +NT: {device_kind}\r NTS: {nts}\r Server: {server}\r -USN: {usn}::{kind}\r +USN: {usn}::{device_kind}\r \r " ) @@ -174,10 +280,10 @@ USN: {usn}::{kind}\r st: &str, addr: SocketAddr, ) -> anyhow::Result { - let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?; + let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr)?; let location = { let mut loc = self.opts.description_http_location.clone(); - loc.set_host(Some(&format!("{local_ip}")))?; + let _ = loc.set_ip_host(local_ip); loc }; let usn = &self.opts.usn; @@ -194,7 +300,10 @@ Content-Length: 0\r\n\r\n" )) } - async fn try_send_notifies(&self, nts: &str) { + async fn try_send_mcast_everywhere( + &self, + get_payload: &impl Fn(&MulticastOpts) -> bstr::BString, + ) { use network_interface::NetworkInterfaceConfig; let interfaces = network_interface::NetworkInterface::show(); let interfaces = match interfaces { @@ -205,52 +314,94 @@ Content-Length: 0\r\n\r\n" } }; + let sent = Mutex::new(HashSet::new()); + let sent = &sent; + let futs = interfaces .into_iter() - .flat_map(|ni| ni.addr) - .filter_map(|addr| { - match addr.ip() { - std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => Some(a), - _ => None + .flat_map(|ni| + ni.addr.into_iter().map(move |a| (ni.index, a)) + ) + .filter_map(|(ifidx, addr)| match addr.ip() { + std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => { + Some(MulticastOpts { + interface_addr: addr.ip(), + interface_id: ifidx, + mcast_addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)), + }) } + std::net::IpAddr::V6(a) if !a.is_loopback() => Some(MulticastOpts { + interface_addr: addr.ip(), + interface_id: ifidx, + mcast_addr: { + if ipv6_is_link_local(a) { + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_LINK_LOCAL, SSDP_PORT, 0, ifidx)) + } else { + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, 0)) + } + }, + }), + _ => { + trace!(oif_id=ifidx, addr=%addr.ip(), "ignoring address"); + None + }, }) - .map(|ip| async move { - let addr = SocketAddrV4::new(ip, 0); - let sock = match tokio::net::UdpSocket::bind(addr).await { - Ok(sock) => sock, - Err(e) => { - debug!(%addr, error=?e, "error binding UDP to send NOTIFY"); - return; - } - }; + .map(|opts| async move { + let payload = get_payload(&opts); + if !sent + .lock() + .insert((payload.clone(), opts.interface_id, opts.mcast_addr)) + { + trace!(oif_id=opts.interface_id, addr=%opts.mcast_addr, "not sending duplicate payload"); + return; + } - let mut location = self.opts.description_http_location.clone(); - location.set_host(Some(&format!("{ip}"))).unwrap(); - - macro_rules! gen { - ($kind:expr) => { - async { - let msg = self.generate_notify_message($kind, nts, &location); - trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY"); - if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await { - debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY") - } else { - debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY") - } + let sock = match ( + opts.interface_addr, + self.socket_v4.as_ref(), + self.socket_v6.as_ref(), + ) { + // Call setsockopt(IP_MULTICAST_IF), so that the message + // gets sent out of the interface we want (otherwise it'll get sent through + // default one). + (IpAddr::V4(ip), Some(sock_v4), _) => { + if let Err(e) = sock_v4.sock2.set_multicast_if_v4(&ip) { + debug!(addr=%ip, "error calling set_multicast_if_v4: {e:#}"); } + sock_v4 } - } - - let f1 = gen!(UPNP_KIND_ROOT_DEVICE); - let f2 = gen!(UPNP_KIND_MEDIASERVER) ; + (IpAddr::V6(_), _, Some(sock_v6)) => { + if let Err(e) = sock_v6.sock2.set_multicast_if_v6(opts.interface_id) { + debug!(oif_id=opts.interface_id, "error calling set_multicast_if_v6: {e:#}"); + } + sock_v6 + }, + _ => { + trace!(addr=%opts.interface_addr, "ignoring address, no socket to send to"); + return; + }, + }; - tokio::join!(f1, f2); + match sock.tokio.send_to(payload.as_slice(), opts.mcast_addr).await { + Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"), + Err(e) => { + debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}") + } + }; }); futures::future::join_all(futs).await; } - async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> { + async fn try_send_notifies(&self, nts: &str) { + self.try_send_mcast_everywhere(&|opts| { + self.generate_notify_message(UPNP_DEVICE_MEDIASERVER, nts, opts) + .into() + }) + .await + } + + async fn task_send_alive_notifies_periodically(&self) { let mut interval = tokio::time::interval(self.opts.notify_interval); loop { interval.tick().await; @@ -258,7 +409,12 @@ Content-Length: 0\r\n\r\n" } } - async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> { + async fn process_incoming_message( + &self, + msg: &[u8], + sock: &UdpSocket, + addr: SocketAddr, + ) -> anyhow::Result<()> { let mut headers = [httparse::EMPTY_HEADER; 16]; trace!(content = ?BStr::new(msg), ?addr, "received message"); let parsed = try_parse_ssdp(msg, &mut headers); @@ -281,7 +437,7 @@ Content-Length: 0\r\n\r\n" if let Ok(st) = std::str::from_utf8(msg.st) { let response = self.generate_ssdp_discover_response(st, addr)?; trace!(content = response, ?addr, "sending SSDP discover response"); - self.socket + sock.tokio .send_to(response.as_bytes(), addr) .await .context("error sending")?; @@ -290,54 +446,60 @@ Content-Length: 0\r\n\r\n" Ok(()) } - async fn task_respond_on_msearches(&self) -> anyhow::Result<()> { + async fn task_respond_on_msearches(&self, sock: Option<&UdpSocket>) { let mut buf = vec![0u8; 16184]; + let sock = match sock { + Some(sock) => sock, + None => return, + }; loop { - let (sz, addr) = self - .socket - .recv_from(&mut buf) - .await - .context("error receiving")?; + let (sz, addr) = match sock.tokio.recv_from(&mut buf).await { + Ok((sz, addr)) => (sz, addr), + Err(e) => { + warn!(error=?e, "error receving"); + return; + } + }; let msg = &buf[..sz]; - if let Err(e) = self.process_incoming_message(msg, addr).await { + if let Err(e) = self.process_incoming_message(msg, sock, addr).await { warn!(error=?e, ?addr, "error processing incoming SSDP message") } } } - async fn send_msearch(&self) -> anyhow::Result<()> { - let msearch_msg = "M-SEARCH * HTTP/1.1\r -HOST: 239.255.255.250:1900\r + async fn try_send_example_msearch(&self) { + self.try_send_mcast_everywhere(&|opts| { + let dest = opts.addr_no_scope(); + format!( + "M-SEARCH * HTTP/1.1\r +HOST: {dest}\r ST: urn:schemas-upnp-org:device:MediaServer:1\r MAN: \"ssdp:discover\"\r -MX: 2\r\n\r\n"; - - trace!(content = msearch_msg, "multicasting M-SEARCH"); - - self.socket - .send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR) - .await - .context("error sending msearch")?; - Ok(()) +MX: 2\r\n\r\n" + ) + .into() + }) + .await } pub async fn run_forever(&self) -> anyhow::Result<()> { // This isn't necessary, but would show that it works. - self.send_msearch().await?; - - let t1 = self.task_respond_on_msearches(); - let t2 = self.task_send_alive_notifies_periodically(); - - tokio::pin!(t1); - tokio::pin!(t2); + let t0 = self.try_send_example_msearch(); + let t1 = self.task_respond_on_msearches(self.socket_v4.as_ref()); + let t2 = self.task_respond_on_msearches(self.socket_v6.as_ref()); + let t3 = self.task_send_alive_notifies_periodically(); + + let wait = async move { + tokio::join!(t0, t1, t2, t3); + Ok(()) + }; tokio::select! { - r = &mut t1 => r, - r = &mut t2 => r, + r = wait => r, _ = self.opts.shutdown.cancelled() => { self.try_send_notifies(NTS_BYEBYE).await; - bail!("canceled"); + Ok(()) } } } diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 0feb3e62..7ff09aa1 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -6,7 +6,7 @@ use reqwest::Client; use serde::Deserialize; use std::{ collections::HashSet, - net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, time::Duration, }; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -30,21 +30,29 @@ pub fn make_ssdp_search_request(kind: &str) -> String { ) } -pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { - let local_dest = match local_dest { - IpAddr::V4(v4) => v4, - IpAddr::V6(v6) => { - anyhow::bail!("get_local_ip_relative_to not implemented for IPv6; addr={v6}") - } - }; +// .to_bits() isn't yet available on min rust version we support (1.75 at the time of writing this) +const fn ip_bits_v6(addr: Ipv6Addr) -> u128 { + u128::from_be_bytes(addr.octets()) +} + +pub fn ipv6_is_link_local(ip: Ipv6Addr) -> bool { + const LL: Ipv6Addr = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0); + const MASK: Ipv6Addr = Ipv6Addr::new(0xffff, 0xffff, 0xffff, 0xffff, 0, 0, 0, 0); - // Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now. - fn ip_bits(addr: Ipv4Addr) -> u32 { + ip_bits_v6(ip) & ip_bits_v6(MASK) == ip_bits_v6(LL) & ip_bits_v6(MASK) +} + +pub fn get_local_ip_relative_to(local_dest: SocketAddr) -> anyhow::Result { + fn ip_bits_v4(addr: Ipv4Addr) -> u32 { u32::from_be_bytes(addr.octets()) } - fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 { - ip_bits(ip) & ip_bits(mask) + fn masked_v4(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 { + ip_bits_v4(ip) & ip_bits_v4(mask) + } + + fn masked_v6(ip: Ipv6Addr, mask: Ipv6Addr) -> u128 { + ip_bits_v6(ip) & ip_bits_v6(mask) } let interfaces = @@ -52,18 +60,33 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result for i in interfaces { for addr in i.addr { - if let network_interface::Addr::V4(v4) = addr { - let ip = v4.ip; - let mask = match v4.netmask { - Some(mask) => mask, - None => continue, - }; - trace!("found local addr {ip}/{mask}"); - // If the masked address is the same, means we are on the same network, return - // the ip address - if masked(ip, mask) == masked(local_dest, mask) { - return Ok(ip); + trace!(%local_dest, nic=i.index, ip=?addr.ip(), nm=?addr.netmask(), "dbg"); + match (local_dest, addr.ip(), addr.netmask()) { + // We are connecting to ourselves, return itself. + (l, a, _) if l.ip() == a => return Ok(addr.ip()), + // IPv4 masks match. + (SocketAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m))) + if masked_v4(*l.ip(), m) == masked_v4(a, m) => + { + return Ok(addr.ip()) + } + // Return IPv6 link-local addresses when source is link-local address and there's a scope_id set. + (SocketAddr::V6(l), IpAddr::V6(a), _) + if ipv6_is_link_local(*l.ip()) && l.scope_id() > 0 => + { + if ipv6_is_link_local(a) && l.scope_id() == i.index { + return Ok(addr.ip()); + } + } + // If V6 masks match, return. + (SocketAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m))) + if masked_v6(*l.ip(), m) == masked_v6(a, m) => + { + return Ok(addr.ip()) } + // For IPv6 fallback to returning a random (first encountered) IPv6 address. + (SocketAddr::V6(_), IpAddr::V6(_), None) => return Ok(addr.ip()), + _ => continue, } } } @@ -72,7 +95,7 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result async fn forward_port( control_url: Url, - local_ip: Ipv4Addr, + local_ip: IpAddr, port: u16, lease_duration: Duration, ) -> anyhow::Result<()> { @@ -229,10 +252,10 @@ impl UpnpEndpoint { .flat_map(move |d| d.iter_services(self_span.clone())) } - fn my_local_ip(&self) -> anyhow::Result { - let dest_ip = self.discover_response.received_from.ip(); - let local_ip = get_local_ip_relative_to(dest_ip) - .with_context(|| format!("can't determine local IP relative to {dest_ip}"))?; + fn my_local_ip(&self) -> anyhow::Result { + let received_from = self.discover_response.received_from; + let local_ip = get_local_ip_relative_to(received_from) + .with_context(|| format!("can't determine local IP relative to {received_from}"))?; Ok(local_ip) } @@ -419,7 +442,7 @@ impl UpnpPortForwarder { } } - async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! { + async fn manage_port(&self, control_url: Url, local_ip: IpAddr, port: u16) -> ! { let lease_duration = self.opts.lease_duration; let mut interval = tokio::time::interval(lease_duration / 2); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -433,7 +456,7 @@ impl UpnpPortForwarder { } } - async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> { + async fn manage_service(&self, control_url: Url, local_ip: IpAddr) -> anyhow::Result<()> { futures::future::join_all(self.ports.iter().cloned().map(|port| { self.manage_port(control_url.clone(), local_ip, port) .instrument(error_span!("manage_port", port = port))