Skip to content

Commit

Permalink
JVMCBC-1541 TopologyParser should get node identifiers from default n…
Browse files Browse the repository at this point in the history
…etwork

Motivation
----------
In some private link deployments, the server advertises the same
manager host:port address for all nodes on the `external` network.

The newly-refactored cluster topology parser resolves the network
before assigning node IDs, so the node ids were sourced from the
external network -- where each node has the same host and manager port.

This resulted in all nodes being assigned the same node identifier,
and havoc ensued.

Modifications
-------------
Modify the cluster topology parser to get the manager host:port
from the default (internal) network instead.

Add a bit of logging and sanity checking that would have caught this
sooner, as well as regression tests for parsing the the private link
topology.

Change-Id: I3d313a77042aa5cd495b3a118480de1ced068c04
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/212758
Reviewed-by: Michael Reiche <[email protected]>
Tested-by: Build Bot <[email protected]>
Reviewed-by: David Nault <[email protected]>
  • Loading branch information
dnault committed Jul 16, 2024
1 parent c1784e7 commit 188881f
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ static List<PortInfo> getPortInfos(ClusterTopology topology) {
nonTlsPorts(topology, it),
tlsPorts(topology, it),
emptyMap(), // The host is always accurate -- there is never an alternate.
it.host()
it.host(),
it.id().toLegacy()
)
);
}
Expand Down Expand Up @@ -107,7 +108,8 @@ static List<NodeInfo> getNodeInfosForBucket(ClusterTopologyWithBucket topology)
it.host(),
nonTlsPorts(topology, it),
tlsPorts(topology, it),
it.ketamaAuthority()
it.ketamaAuthority(),
it.id().toLegacy()
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ public NodeInfo(
String hostname,
Map<ServiceType, Integer> direct,
Map<ServiceType, Integer> ssl,
@Nullable HostAndPort ketamaAuthority
@Nullable HostAndPort ketamaAuthority,
NodeIdentifier nodeIdentifier
) {
this.hostname = requireNonNull(hostname);
this.directServices = requireNonNull(direct);
this.sslServices = requireNonNull(ssl);
this.alternateAddresses = Collections.emptyMap();

this.nodeIdentifier = initNodeIdentifier(hostname, directServices, sslServices);
this.nodeIdentifier = requireNonNull(nodeIdentifier);
this.ketamaAuthority = ketamaAuthority;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public PortInfo(
this.nodeIdentifier = initNodeIdentifier(hostname, ports, sslPorts);
}

PortInfo(
final Map<ServiceType, Integer> ports,
final Map<ServiceType, Integer> sslPorts,
final Map<String, AlternateAddress> alternateAddresses,
final String hostname,
final NodeIdentifier nodeIdentifier
) {
this.ports = requireNonNull(ports);
this.sslPorts = requireNonNull(sslPorts);
this.alternateAddresses = requireNonNull(alternateAddresses);
this.hostname = requireNonNull(hostname);
this.nodeIdentifier = requireNonNull(nodeIdentifier);
}

public NodeIdentifier identifier() {
return nodeIdentifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,25 @@
import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.node.MemcachedHashingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.couchbase.client.core.logging.RedactableArgument.redactSystem;
import static com.couchbase.client.core.util.CbCollections.transform;
import static java.util.Collections.emptySet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

@Stability.Internal
class ClusterTopologyParser {
private static final Logger log = LoggerFactory.getLogger(ClusterTopologyParser.class);

private ClusterTopologyParser() {
throw new AssertionError("not instantiable");
}
Expand Down Expand Up @@ -74,7 +81,18 @@ public static ClusterTopology parse(
// currently enforce this. If a node doesn't have an alternate address
// for the selected network, use an "inaccessible" placeholder to preserve
// the node indexes required by the KV partition map.
List<HostAndServicePorts> resolvedNodes = transform(nodes, it -> it.getOrDefault(resolvedNetwork, HostAndServicePorts.INACCESSIBLE));
List<HostAndServicePorts> resolvedNodes = transform(nodes, it -> {
HostAndServicePorts resolved = it.getOrDefault(resolvedNetwork, HostAndServicePorts.INACCESSIBLE);
if (resolved.inaccessible()) {
log.error(
"Cluster topology has at least one node that is inaccessible on the selected network ({}) : {}",
resolvedNetwork, redactSystem(it)
);
}
return resolved;
});

sanityCheck(resolvedNodes);

// RELATIONSHIP BETWEEN "nodes" and "nodesEXT":
//
Expand Down Expand Up @@ -115,6 +133,20 @@ public static ClusterTopology parse(
);
}

private static void sanityCheck(List<HostAndServicePorts> resolvedNodes) {
List<NodeIdentifier> idsOfAccessibleNodes = resolvedNodes.stream()
.filter(it -> !it.inaccessible())
.map(HostAndServicePorts::id)
.collect(toList());

Set<NodeIdentifier> distinct = new HashSet<>(idsOfAccessibleNodes);
if (distinct.size() != idsOfAccessibleNodes.size()) {
throw new CouchbaseException(
"Cluster topology has nodes with non-unique IDs (host and manager port on default network: " + redactSystem(resolvedNodes)
);
}
}

/**
* A single-node cluster can omit the hostname, so patch it in!
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,27 @@
public class HostAndServicePorts implements KetamaRingNode {
// Placeholder for a node that can't be reached because it doesn't have an alternate address
// for the requested network. (Can't just ignore it, because bucket config refers to nodes by index.)
public static final HostAndServicePorts INACCESSIBLE = new HostAndServicePorts("<inaccessible>", emptyMap(), null);
public static final HostAndServicePorts INACCESSIBLE = new HostAndServicePorts(
"<inaccessible>",
emptyMap(),
new NodeIdentifier("<inaccessible>", 0),
null
);

private final String host;
private final Map<ServiceType, Integer> ports;
private final NodeIdentifier id;
@Nullable private final HostAndPort ketamaAuthority;

public HostAndServicePorts(
String host,
Map<ServiceType, Integer> ports,
NodeIdentifier id,
@Nullable HostAndPort ketamaAuthority
) {
this.host = requireNonNull(host);
this.ports = unmodifiableMap(newEnumMap(ServiceType.class, ports));
this.id = requireNonNull(id);
this.ketamaAuthority = ketamaAuthority;
}

Expand All @@ -67,7 +75,7 @@ public boolean inaccessible() {
}

public NodeIdentifier id() {
return new NodeIdentifier(host, port(ServiceType.MANAGER).orElse(0));
return id;
}

public String host() {
Expand Down Expand Up @@ -113,15 +121,15 @@ public HostAndServicePorts without(ServiceType service, ServiceType... moreServi
temp.remove(t);
}

return new HostAndServicePorts(this.host, temp, this.ketamaAuthority);
return new HostAndServicePorts(this.host, temp, this.id, this.ketamaAuthority);
}

@Stability.Internal
public HostAndServicePorts withKetamaAuthority(@Nullable HostAndPort ketamaAuthority) {
if (Objects.equals(this.ketamaAuthority, ketamaAuthority)) {
return this;
}
return new HostAndServicePorts(this.host, this.ports, ketamaAuthority);
return new HostAndServicePorts(this.host, this.ports, this.id, ketamaAuthority);
}

boolean matches(SeedNode seedNode) {
Expand Down Expand Up @@ -153,6 +161,7 @@ public String toString() {
return "HostAndServicePorts{" +
"host='" + redactSystem(host) + '\'' +
", ports=" + redactSystem(ports) +
", id=" + redactSystem(id) +
", ketamaAuthority=" + redactSystem(ketamaAuthority) +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.HashMap;
import java.util.Map;

import static com.couchbase.client.core.logging.RedactableArgument.redactSystem;
import static com.couchbase.client.core.util.CbCollections.transformValues;
import static com.couchbase.client.core.util.CbObjects.defaultIfNull;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;

Expand All @@ -52,11 +54,13 @@ public static Map<NetworkResolution, HostAndServicePorts> parse(
) {
Map<NetworkResolution, HostAndRawServicePorts> raw = parseIntermediate(json);
HostAndPort ketamaAuthority = getKetamaAuthority(raw);
NodeIdentifier id = getId(raw);

return transformValues(raw, value ->
new HostAndServicePorts(
value.host,
portSelector.selectPorts(value.rawServicePorts),
id,
ketamaAuthority
)
);
Expand All @@ -73,15 +77,52 @@ private static HostAndPort getKetamaAuthority(Map<NetworkResolution, HostAndRawS
return null;
}

Map<ServiceType, Integer> nonTlsPorts = PortSelector.NON_TLS.selectPorts(defaultNodeMap.rawServicePorts);
Integer nonTlsKvPort = nonTlsPorts.get(ServiceType.KV);
Integer nonTlsKvPort = getPort(defaultNodeMap, PortSelector.NON_TLS, ServiceType.KV);
if (nonTlsKvPort == null) {
return null;
}

return new HostAndPort(defaultNodeMap.host, nonTlsKvPort);
}

/**
* Returns an ID consisting of the host and manager port on the default network.
* <p>
* Depending on which ports the server advertises, it might be a TLS or non-TLS port.
* This must not matter though, since this is just for uniquely identifying nodes,
* and not for making network connections.
*
* @throws CouchbaseException If the default network has no manager ports for the node
*/
private static NodeIdentifier getId(
Map<NetworkResolution, HostAndRawServicePorts> networkToNodeInfo
) {
HostAndRawServicePorts defaultNodeMap = networkToNodeInfo.get(NetworkResolution.DEFAULT);
if (defaultNodeMap == null) {
throw new CouchbaseException("Network map is missing entry for default network.");
}

Integer idPort = defaultIfNull(
getPort(defaultNodeMap, PortSelector.NON_TLS, ServiceType.MANAGER),
() -> getPort(defaultNodeMap, PortSelector.TLS, ServiceType.MANAGER)
);

if (idPort == null) {
throw new CouchbaseException(
"Cluster topology has no manager port on the default network for node: " +
redactSystem(networkToNodeInfo)
);
}

return new NodeIdentifier(defaultNodeMap.host, idPort);
}

@Nullable
private static Integer getPort(HostAndRawServicePorts nodeMap, PortSelector portSelector, ServiceType serviceType) {
Map<ServiceType, Integer> ports = portSelector.selectPorts(nodeMap.rawServicePorts);
return ports.get(serviceType);
}

private static Map<NetworkResolution, HostAndRawServicePorts> parseIntermediate(ObjectNode json) {
Map<NetworkResolution, HostAndRawServicePorts> result = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public NodeIdentifier(String host, int port) {
this.port = port;
}

@Deprecated
public com.couchbase.client.core.node.NodeIdentifier toLegacy() {
return new com.couchbase.client.core.node.NodeIdentifier(host, port);
}

@Override
public String toString() {
return host + ":" + port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.couchbase.client.core.config;

import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.node.StandardMemcachedHashingStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
Expand Down Expand Up @@ -359,6 +362,41 @@ void shouldParseElixirConfig() {
assertEquals(11207, config.nodeAtIndex(2).sslServices().get(KV));
}

@Test
void nodeIdsComeFromInternalNetwork() {
String originHost = "private-endpoint.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com";

ClusterTopologyWithBucket topology = readTopology(
"config_7.6_external_manager_ports_not_unique_with_bucket.json",
NetworkSelector.autoDetect(setOf(SeedNode.create(originHost).withKvPort(11208))),
PortSelector.TLS,
originHost
);

assertEquals(NetworkResolution.EXTERNAL, topology.network());

CouchbaseBucketConfig config = new CouchbaseBucketConfig(topology);

List<NodeIdentifier> expectedIds = listOf(
new NodeIdentifier("svc-dqisea-node-001.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-002.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-004.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-005.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
// Node 003 comes last because it's not servicing the bucket.
new NodeIdentifier("svc-dqisea-node-003.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091)
);

assertEquals(
expectedIds,
transform(config.portInfos(), PortInfo::identifier)
);

assertEquals(
expectedIds,
transform(config.nodes(), NodeInfo::identifier)
);
}

private static CouchbaseBucketConfig readConfig(final String path) {
return readConfig(path, "origin.example.com");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.couchbase.client.core.config;

import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.node.StandardMemcachedHashingStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
Expand All @@ -26,6 +28,9 @@

import java.util.Collections;

import static com.couchbase.client.core.util.CbCollections.listOf;
import static com.couchbase.client.core.util.CbCollections.setOf;
import static com.couchbase.client.core.util.CbCollections.transform;
import static com.couchbase.client.test.Util.readResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -70,6 +75,30 @@ void parseMultiNodeGlobalConfig() {
assertTrue(config.clusterCapabilities().get(ServiceType.KV).isEmpty());
}

@Test
void nodeIdsComeFromInternalNetwork() {
String originHost = "private-endpoint.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com";

ClusterTopology topology = readTopology(
"config_7.6_external_manager_ports_not_unique.json",
NetworkSelector.autoDetect(setOf(SeedNode.create(originHost).withKvPort(11208))),
PortSelector.TLS,
originHost
);

GlobalConfig config = new GlobalConfig(topology);

assertEquals(
listOf(
new NodeIdentifier("svc-dqisea-node-001.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-002.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-003.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-004.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091),
new NodeIdentifier("svc-dqisea-node-005.nyarjaj-crhge67o.sandbox.nonprod-project-avengers.com", 8091)
),
transform(config.portInfos(), PortInfo::identifier)
);
}

/**
* Helper method to load the config.
Expand Down
Loading

0 comments on commit 188881f

Please sign in to comment.