Skip to content

Commit

Permalink
refactor(iroh-net): Store DerpNodes as Arcs inside DerpMap (#1379)
Browse files Browse the repository at this point in the history
## Description

The DerpMap is now immutable, storing the DerpNodes inside it as
Arc'ed structs directly saves a lot of copying and moving them into
Arcs at runtime for a bit more heap allocations up front.

<!-- A summary of what this pull request achieves and a rough list of
changes. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [ ] ~~Documentation updates if relevant.~~
- [ ] ~~Tests if relevant.~~
  • Loading branch information
flub authored Aug 25, 2023
1 parent 38b06b0 commit bcce8a0
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 59 deletions.
2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ rcgen = "0.11"
reqwest = { version = "0.11.19", default-features = false, features = ["rustls-tls"] }
ring = "0.16.20"
rustls = { version = "0.21", default-features = false, features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
serde = { version = "1", features = ["derive", "rc"] }
ssh-key = { version = "0.6.0", features = ["ed25519", "std", "rand_core"] }
serdect = "0.2.0"
socket2 = "0.5.3"
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn default_na_derp_region() -> DerpRegion {
};
DerpRegion {
region_id: NA_REGION_ID,
nodes: vec![default_n0_derp],
nodes: vec![default_n0_derp.into()],
avoid: false,
region_code: "default-1".into(),
}
Expand All @@ -63,7 +63,7 @@ pub fn default_eu_derp_region() -> DerpRegion {
};
DerpRegion {
region_id: EU_REGION_ID,
nodes: vec![default_n0_derp],
nodes: vec![default_n0_derp.into()],
avoid: false,
region_code: "default-2".into(),
}
Expand Down
6 changes: 4 additions & 2 deletions iroh-net/src/derp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some(addr),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down Expand Up @@ -237,7 +238,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some(addr),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down
18 changes: 11 additions & 7 deletions iroh-net/src/derp/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl Client {
.body(Body::empty())
.unwrap();

let res = if self.use_https(derp_node.as_ref()) {
let res = if self.use_https(derp_node.as_deref()) {
debug!("Starting TLS handshake");
// TODO: review TLS config
let mut roots = rustls::RootCertStore::empty();
Expand All @@ -481,7 +481,7 @@ impl Client {

let tls_connector: tokio_rustls::TlsConnector = Arc::new(config).into();
let hostname = self
.tls_servername(derp_node.as_ref())
.tls_servername(derp_node.as_deref())
.ok_or_else(|| ClientError::InvalidUrl("no tls servername".into()))?;
let tls_stream = tls_connector.connect(hostname, tcp_stream).await?;
debug!("tls_connector connect success");
Expand Down Expand Up @@ -615,7 +615,10 @@ impl Client {
///
/// Return a TCP stream to the provided region, trying each node in order
/// (using [`Client::dial_node`]) until one connects
async fn dial_region(&self, reg: DerpRegion) -> Result<(TcpStream, DerpNode), ClientError> {
async fn dial_region(
&self,
reg: DerpRegion,
) -> Result<(TcpStream, Arc<DerpNode>), ClientError> {
debug!("dial region: {:?}", reg);
let target = self.target_string(&reg);
if reg.nodes.is_empty() {
Expand All @@ -624,16 +627,16 @@ impl Client {
let mut first_err: Option<ClientError> = None;
// TODO (ramfox): these dials should probably happen in parallel, and we should return the
// first one to respond.
for node in reg.nodes {
for node in reg.nodes.iter() {
if node.stun_only {
if first_err.is_none() {
first_err = Some(ClientError::StunOnlyNodesFound(target.clone()));
}
continue;
}
let conn = self.dial_node(&node).await;
let conn = self.dial_node(node).await;
match conn {
Ok(conn) => return Ok((conn, node)),
Ok(conn) => return Ok((conn, node.clone())),
Err(e) => first_err = Some(e),
}
}
Expand Down Expand Up @@ -1224,7 +1227,8 @@ mod tests {
stun_test_ip: None,
ipv4: UseIpv4::Some("35.175.99.112".parse().unwrap()),
ipv6: UseIpv6::Disabled,
}],
}
.into()],
region_code: "test_region".to_string(),
};

Expand Down
16 changes: 13 additions & 3 deletions iroh-net/src/derp/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ impl DerpMap {
Arc::get_mut(&mut self.regions).and_then(|r| r.get_mut(&region_id))
}

#[cfg(test)]
pub fn get_node_mut(&mut self, region_id: u16, node_idx: usize) -> Option<&mut DerpNode> {
Arc::get_mut(&mut self.regions)
.and_then(|regions| regions.get_mut(&region_id))
.map(|region| region.nodes.as_mut_slice())
.and_then(|slice| slice.get_mut(node_idx))
.map(Arc::make_mut)
}

/// How many regions are known?
pub fn len(&self) -> usize {
self.regions.len()
Expand Down Expand Up @@ -83,7 +92,8 @@ impl DerpMap {
ipv4: derp_ipv4,
ipv6: derp_ipv6,
stun_test_ip: None,
}],
}
.into()],
avoid: false,
region_code: "default".into(),
},
Expand All @@ -95,7 +105,7 @@ impl DerpMap {
}

/// Returns the [`DerpNode`] by name.
pub fn find_by_name(&self, node_name: &str) -> Option<&DerpNode> {
pub fn find_by_name(&self, node_name: &str) -> Option<&Arc<DerpNode>> {
self.regions
.values()
.flat_map(|r| r.nodes.iter())
Expand Down Expand Up @@ -137,7 +147,7 @@ pub struct DerpRegion {
/// A unique integer for a geographic region
pub region_id: u16,
/// A list of [`DerpNode`]s in this region
pub nodes: Vec<DerpNode>,
pub nodes: Vec<Arc<DerpNode>>,
/// Whether or not to avoid this region
pub avoid: bool,
/// The region-specific string identifier
Expand Down
3 changes: 2 additions & 1 deletion iroh-net/src/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ mod tests {
ipv6: UseIpv6::TryDns,
stun_test_ip: None,
})
.map(Arc::new)
.collect(),
avoid: false,
region_code: "default".into(),
Expand Down Expand Up @@ -956,7 +957,7 @@ mod tests {
let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
let stun_addr = blackhole.local_addr()?;
let mut dm = stun::test::derp_map_of([stun_addr].into_iter());
dm.get_region_mut(1).unwrap().nodes[0].stun_only = true;
dm.get_node_mut(1, 0).unwrap().stun_only = true;

let mut client = Client::new(None).await?;

Expand Down
46 changes: 5 additions & 41 deletions iroh-net/src/netcheck/reportgen/probes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ pub(super) enum Probe {
/// are for retries on UDP loss or timeout.
delay: Duration,

/// The name of the node name. DERP node names are globally
/// unique so there's no region ID.
/// The DERP server to send this probe to.
node: Arc<DerpNode>,
},
#[display("Ipv6 after {delay:?} to {node}")]
Expand Down Expand Up @@ -200,7 +199,6 @@ impl ProbePlan {
/// Creates an initial probe plan.
pub(super) fn initial(derp_map: &DerpMap, if_state: &interfaces::State) -> Self {
let mut plan = Self(BTreeSet::new());
let mut derp_nodes_cache = DerpNodeCache::new();

let mut sorted_regions: Vec<_> = derp_map.regions().map(|r| (r.region_id, r)).collect();
sorted_regions.sort_by_key(|(id, _)| *id);
Expand All @@ -211,7 +209,6 @@ impl ProbePlan {

for attempt in 0..3 {
let derp_node = &region.nodes[attempt % region.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = DEFAULT_INITIAL_RETRANSMIT * attempt as u32;

if if_state.have_v4 && derp_node.ipv4.is_enabled() {
Expand Down Expand Up @@ -239,7 +236,6 @@ impl ProbePlan {
let mut icmp_probes = ProbeSet::new(region.region_id, ProbeProto::Icmp);
for attempt in 0..3 {
let derp_node = &region.nodes[attempt % region.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let start = plan.max_delay() + DEFAULT_INITIAL_RETRANSMIT;
let delay = start + DEFAULT_INITIAL_RETRANSMIT * attempt as u32;

Expand Down Expand Up @@ -275,7 +271,6 @@ impl ProbePlan {
return Self::initial(derp_map, if_state);
}
let mut plan = Self(Default::default());
let mut derp_nodes_cache = DerpNodeCache::new();

let had_stun_ipv4 = !last_report.region_v4_latency.is_empty();
let had_stun_ipv6 = !last_report.region_v6_latency.is_empty();
Expand Down Expand Up @@ -326,7 +321,6 @@ impl ProbePlan {

for attempt in 0..attempts {
let derp_node = &reg.nodes[attempt % reg.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = (retransmit_delay * attempt as u32)
+ (ACTIVE_RETRANSMIT_EXTRA_DELAY * attempt as u32);
if do4 {
Expand Down Expand Up @@ -355,7 +349,6 @@ impl ProbePlan {
let start = plan.max_delay();
for attempt in 0..attempts {
let derp_node = &reg.nodes[attempt % reg.nodes.len()];
let derp_node = derp_nodes_cache.get(derp_node);
let delay = start
+ (retransmit_delay * attempt as u32)
+ (ACTIVE_RETRANSMIT_EXTRA_DELAY * (attempt as u32 + 1));
Expand Down Expand Up @@ -433,35 +426,6 @@ impl FromIterator<ProbeSet> for ProbePlan {
}
}

/// A cache to create [`DerpNode`]s on the heap and share them.
///
/// The probe code needs the [`DerpNode`] a lot and they need to be sent around. It is
/// better to allocate those on the heap and share them using pointers.
#[derive(Debug, Default)]
struct DerpNodeCache {
inner: BTreeSet<Arc<DerpNode>>,
}

impl DerpNodeCache {
fn new() -> Self {
Default::default()
}
/// Returns a [`DerpNode`] from the cache, inserting it if needed.
///
/// This allows you to exchange a [`DerpNode`] retrieved from the [`DerpMap`] for one
/// from the cache. Eventually the [`DerpMap`] should just do this directly.
fn get(&mut self, node: &DerpNode) -> Arc<DerpNode> {
match self.inner.get(node) {
Some(node) => node.clone(),
None => {
let node = Arc::new(node.clone());
self.inner.insert(node.clone());
node
}
}
}
}

/// Sorts the regions in the [`DerpMap`] from fastest to slowest.
///
/// This uses the latencies from the last report to determine the order. Regions with no
Expand Down Expand Up @@ -507,8 +471,8 @@ mod tests {
#[tokio::test]
async fn test_initial_probeplan() {
let derp_map = default_derp_map();
let derp_node_1 = Arc::new(derp_map.get_region(1).unwrap().nodes[0].clone());
let derp_node_2 = Arc::new(derp_map.get_region(2).unwrap().nodes[0].clone());
let derp_node_1 = derp_map.get_region(1).unwrap().nodes[0].clone();
let derp_node_2 = derp_map.get_region(2).unwrap().nodes[0].clone();
let if_state = interfaces::State::fake();
let plan = ProbePlan::initial(&derp_map, &if_state);

Expand Down Expand Up @@ -646,8 +610,8 @@ mod tests {
for i in 0..10 {
println!("round {}", i);
let derp_map = default_derp_map();
let derp_node_1 = Arc::new(derp_map.get_region(1).unwrap().nodes[0].clone());
let derp_node_2 = Arc::new(derp_map.get_region(2).unwrap().nodes[0].clone());
let derp_node_1 = derp_map.get_region(1).unwrap().nodes[0].clone();
let derp_node_2 = derp_map.get_region(2).unwrap().nodes[0].clone();
let if_state = interfaces::State::fake();
let mut latencies = RegionLatencies::new();
latencies.update_region(1, Duration::from_millis(2));
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/stun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub mod test {
region_id,
region_code: "".to_string(),
avoid: false,
nodes: vec![node],
nodes: vec![node.into()],
}
})
.into()
Expand Down
3 changes: 2 additions & 1 deletion iroh-net/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ pub(crate) async fn run_derp_and_stun(
ipv4: UseIpv4::Some("127.0.0.1".parse().unwrap()),
ipv6: UseIpv6::Disabled,
stun_test_ip: Some(stun_addr.ip()),
}],
}
.into()],
avoid: false,
}]
.into();
Expand Down

0 comments on commit bcce8a0

Please sign in to comment.