Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-net): Store DerpNodes as Arcs inside DerpMap #1379

Merged
merged 3 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ rcgen = "0.11"
reqwest = { version = "0.11.14", default-features = false, features = ["rustls-tls"] }
ring = "0.16.20"
rustls = { version = "0.21", default-features = false, features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
serde = { version = "1", features = ["derive", "rc"] }
ssh-key = { version = "0.6.0-rc.0", features = ["ed25519", "std", "rand_core"] }
serdect = "0.2.0"
socket2 = "0.5.3"
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn default_na_derp_region() -> DerpRegion {
};
DerpRegion {
region_id: NA_REGION_ID,
nodes: vec![default_n0_derp],
nodes: vec![default_n0_derp.into()],
avoid: false,
region_code: "default-1".into(),
}
Expand All @@ -63,7 +63,7 @@ pub fn default_eu_derp_region() -> DerpRegion {
};
DerpRegion {
region_id: EU_REGION_ID,
nodes: vec![default_n0_derp],
nodes: vec![default_n0_derp.into()],
avoid: false,
region_code: "default-2".into(),
}
Expand Down
6 changes: 4 additions & 2 deletions iroh-net/src/derp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some(addr),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down Expand Up @@ -237,7 +238,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some(addr),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down
18 changes: 11 additions & 7 deletions iroh-net/src/derp/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl Client {
.body(Body::empty())
.unwrap();

let res = if self.use_https(derp_node.as_ref()) {
let res = if self.use_https(derp_node.as_deref()) {
debug!("Starting TLS handshake");
// TODO: review TLS config
let mut roots = rustls::RootCertStore::empty();
Expand All @@ -466,7 +466,7 @@ impl Client {

let tls_connector: tokio_rustls::TlsConnector = Arc::new(config).into();
let hostname = self
.tls_servername(derp_node.as_ref())
.tls_servername(derp_node.as_deref())
.ok_or_else(|| ClientError::InvalidUrl("no tls servername".into()))?;
let tls_stream = tls_connector.connect(hostname, tcp_stream).await?;
debug!("tls_connector connect success");
Expand Down Expand Up @@ -603,7 +603,10 @@ impl Client {
///
/// Return a TCP stream to the provided region, trying each node in order
/// (using [`Client::dial_node`]) until one connects
async fn dial_region(&self, reg: DerpRegion) -> Result<(TcpStream, DerpNode), ClientError> {
async fn dial_region(
&self,
reg: DerpRegion,
) -> Result<(TcpStream, Arc<DerpNode>), ClientError> {
debug!("dial region: {:?}", reg);
let target = self.target_string(&reg);
if reg.nodes.is_empty() {
Expand All @@ -612,16 +615,16 @@ impl Client {
let mut first_err: Option<ClientError> = None;
// TODO (ramfox): these dials should probably happen in parallel, and we should return the
// first one to respond.
for node in reg.nodes {
for node in reg.nodes.iter() {
if node.stun_only {
if first_err.is_none() {
first_err = Some(ClientError::StunOnlyNodesFound(target.clone()));
}
continue;
}
let conn = self.dial_node(&node).await;
let conn = self.dial_node(node).await;
match conn {
Ok(conn) => return Ok((conn, node)),
Ok(conn) => return Ok((conn, node.clone())),
Err(e) => first_err = Some(e),
}
}
Expand Down Expand Up @@ -1197,7 +1200,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some("35.175.99.112".parse().unwrap()),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down
16 changes: 13 additions & 3 deletions iroh-net/src/derp/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ impl DerpMap {
Arc::get_mut(&mut self.regions).and_then(|r| r.get_mut(&region_id))
}

#[cfg(test)]
pub fn get_node_mut(&mut self, region_id: u16, node_idx: usize) -> Option<&mut DerpNode> {
Arc::get_mut(&mut self.regions)
.and_then(|regions| regions.get_mut(&region_id))
.map(|region| region.nodes.as_mut_slice())
.and_then(|slice| slice.get_mut(node_idx))
.map(Arc::make_mut)
}

/// How many regions are known?
pub fn len(&self) -> usize {
self.regions.len()
Expand Down Expand Up @@ -83,7 +92,8 @@ impl DerpMap {
ipv4: derp_ipv4,
ipv6: derp_ipv6,
stun_test_ip: None,
}],
}
.into()],
avoid: false,
region_code: "default".into(),
},
Expand All @@ -95,7 +105,7 @@ impl DerpMap {
}

/// Returns the [`DerpNode`] by name.
pub fn find_by_name(&self, node_name: &str) -> Option<&DerpNode> {
pub fn find_by_name(&self, node_name: &str) -> Option<&Arc<DerpNode>> {
self.regions
.values()
.flat_map(|r| r.nodes.iter())
Expand Down Expand Up @@ -137,7 +147,7 @@ pub struct DerpRegion {
/// A unique integer for a geographic region
pub region_id: u16,
/// A list of [`DerpNode`]s in this region
pub nodes: Vec<DerpNode>,
pub nodes: Vec<Arc<DerpNode>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to have DerpNode be a standalone type in this case? Can we just move the Arc into DerpNode instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean wrapping the DerpNode field inside an "inner"? Currently all the DerpNode fields (and DerpPregion and DerpMap) and pub and accessed directly everywhere. Making this change here means you only have to touch the creation of this but not the access. Doing it the other way around involves touching all the access places, probably changing the fields to methods.

I'm not strongly in favour of either, though I also don't think this version is so bad. The explicit Arc is pretty reasonable. Though if you have a strong preference I can change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good enough for now

/// Whether or not to avoid this region
pub avoid: bool,
/// The region-specific string identifier
Expand Down
3 changes: 2 additions & 1 deletion iroh-net/src/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ mod tests {
ipv6: UseIpv6::TryDns,
stun_test_ip: None,
})
.map(Arc::new)
.collect(),
avoid: false,
region_code: "default".into(),
Expand Down Expand Up @@ -957,7 +958,7 @@ mod tests {
let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
let stun_addr = blackhole.local_addr()?;
let mut dm = stun::test::derp_map_of([stun_addr].into_iter());
dm.get_region_mut(1).unwrap().nodes[0].stun_only = true;
dm.get_node_mut(1, 0).unwrap().stun_only = true;

let mut client = Client::new(None).await?;

Expand Down
46 changes: 5 additions & 41 deletions iroh-net/src/netcheck/reportgen/probes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ pub(super) enum Probe {
/// are for retries on UDP loss or timeout.
delay: Duration,

/// The name of the node name. DERP node names are globally
/// unique so there's no region ID.
/// The DERP server to send this probe to.
node: Arc<DerpNode>,
},
#[display("Ipv6 after {delay:?} to {node}")]
Expand Down Expand Up @@ -200,7 +199,6 @@ impl ProbePlan {
/// Creates an initial probe plan.
pub(super) fn initial(derp_map: &DerpMap, if_state: &interfaces::State) -> Self {
let mut plan = Self(BTreeSet::new());
let mut derp_nodes_cache = DerpNodeCache::new();

let mut sorted_regions: Vec<_> = derp_map.regions().map(|r| (r.region_id, r)).collect();
sorted_regions.sort_by_key(|(id, _)| *id);
Expand All @@ -211,7 +209,6 @@ impl ProbePlan {

for attempt in 0..3 {
let derp_node = &region.nodes[attempt % region.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = DEFAULT_INITIAL_RETRANSMIT * attempt as u32;

if if_state.have_v4 && derp_node.ipv4.is_enabled() {
Expand Down Expand Up @@ -239,7 +236,6 @@ impl ProbePlan {
let mut icmp_probes = ProbeSet::new(region.region_id, ProbeProto::Icmp);
for attempt in 0..3 {
let derp_node = &region.nodes[attempt % region.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let start = plan.max_delay() + DEFAULT_INITIAL_RETRANSMIT;
let delay = start + DEFAULT_INITIAL_RETRANSMIT * attempt as u32;

Expand Down Expand Up @@ -275,7 +271,6 @@ impl ProbePlan {
return Self::initial(derp_map, if_state);
}
let mut plan = Self(Default::default());
let mut derp_nodes_cache = DerpNodeCache::new();

let had_stun_ipv4 = !last_report.region_v4_latency.is_empty();
let had_stun_ipv6 = !last_report.region_v6_latency.is_empty();
Expand Down Expand Up @@ -326,7 +321,6 @@ impl ProbePlan {

for attempt in 0..attempts {
let derp_node = &reg.nodes[attempt % reg.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = (retransmit_delay * attempt as u32)
+ (ACTIVE_RETRANSMIT_EXTRA_DELAY * attempt as u32);
if do4 {
Expand Down Expand Up @@ -355,7 +349,6 @@ impl ProbePlan {
let start = plan.max_delay();
for attempt in 0..attempts {
let derp_node = &reg.nodes[attempt % reg.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = start
+ (retransmit_delay * attempt as u32)
+ (ACTIVE_RETRANSMIT_EXTRA_DELAY * (attempt as u32 + 1));
Expand Down Expand Up @@ -433,35 +426,6 @@ impl FromIterator<ProbeSet> for ProbePlan {
}
}

/// A cache to create [`DerpNode`]s on the heap and share them.
///
/// The probe code needs the [`DerpNode`] a lot and they need to be sent around. It is
/// better to allocate those on the heap and share them using pointers.
#[derive(Debug, Default)]
struct DerpNodeCache {
inner: BTreeSet<Arc<DerpNode>>,
}

impl DerpNodeCache {
fn new() -> Self {
Default::default()
}
/// Returns a [`DerpNode`] from the cache, inserting it if needed.
///
/// This allows you to exchange a [`DerpNode`] retrieved from the [`DerpMap`] for one
/// from the cache. Eventually the [`DerpMap`] should just do this directly.
fn get(&mut self, node: &DerpNode) -> Arc<DerpNode> {
match self.inner.get(node) {
Some(node) => node.clone(),
None => {
let node = Arc::new(node.clone());
self.inner.insert(node.clone());
node
}
}
}
}

/// Sorts the regions in the [`DerpMap`] from fastest to slowest.
///
/// This uses the latencies from the last report to determine the order. Regions with no
Expand Down Expand Up @@ -507,8 +471,8 @@ mod tests {
#[tokio::test]
async fn test_initial_probeplan() {
let derp_map = default_derp_map();
let derp_node_1 = Arc::new(derp_map.get_region(1).unwrap().nodes[0].clone());
let derp_node_2 = Arc::new(derp_map.get_region(2).unwrap().nodes[0].clone());
let derp_node_1 = derp_map.get_region(1).unwrap().nodes[0].clone();
let derp_node_2 = derp_map.get_region(2).unwrap().nodes[0].clone();
let if_state = interfaces::State::fake();
let plan = ProbePlan::initial(&derp_map, &if_state);

Expand Down Expand Up @@ -646,8 +610,8 @@ mod tests {
for i in 0..10 {
println!("round {}", i);
let derp_map = default_derp_map();
let derp_node_1 = Arc::new(derp_map.get_region(1).unwrap().nodes[0].clone());
let derp_node_2 = Arc::new(derp_map.get_region(2).unwrap().nodes[0].clone());
let derp_node_1 = derp_map.get_region(1).unwrap().nodes[0].clone();
let derp_node_2 = derp_map.get_region(2).unwrap().nodes[0].clone();
let if_state = interfaces::State::fake();
let mut latencies = RegionLatencies::new();
latencies.update_region(1, Duration::from_millis(2));
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/stun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub mod test {
region_id,
region_code: "".to_string(),
avoid: false,
nodes: vec![node],
nodes: vec![node.into()],
}
})
.into()
Expand Down
3 changes: 2 additions & 1 deletion iroh-net/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ pub(crate) async fn run_derp_and_stun(
ipv4: UseIpv4::Some("127.0.0.1".parse().unwrap()),
ipv6: UseIpv6::Disabled,
stun_test_ip: Some(stun_addr.ip()),
}],
}
.into()],
avoid: false,
}]
.into();
Expand Down