Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add AzAffinityReplicasAndPrimary Read Strategy #3083

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#### Changes


* Java: Add support to AzAffinityReplicasAndPrimary read strategy ([#3083](https://github.com/valkey-io/valkey-glide/pull/3083))
* Python: Add support to AzAffinityReplicasAndPrimary read strategy ([#3071](https://github.com/valkey-io/valkey-glide/pull/3071))
* Node: Add support to AzAffinityReplicasAndPrimary read strategy ([#3063](https://github.com/valkey-io/valkey-glide/pull/3063))
* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2986](https://github.com/valkey-io/valkey-glide/pull/2986))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public abstract class BaseClientConfiguration {
private final Integer inflightRequestsLimit;

/**
* Availability Zone of the client. If ReadFrom strategy is AZAffinity, this setting ensures that
* readonly commands are directed to replicas within the specified AZ if exits.
* Availability Zone of the client. If ReadFrom strategy is AZAffinity or
* AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes
* within the specified AZ if exits.
*/
private final String clientAZ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ public enum ReadFrom {
* round-robin manner, falling back to other replicas or the primary if needed.
*/
AZ_AFFINITY,
/**
* Spread the read requests among nodes within the client's Availability Zone (AZ) in a round
* robin manner, prioritizing local replicas, then the local primary, and falling back to any
* replica or the primary if needed.
*/
AZ_AFFINITY_REPLICAS_AND_PRIMARY,
}
10 changes: 10 additions & 0 deletions java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration
connectionRequestBuilder.setClientAz(configuration.getClientAZ());
}

if (configuration.getReadFrom() == ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY) {
if (configuration.getClientAZ() == null) {
throw new ConfigurationError(
"`clientAZ` must be set when read_from is set to `AZ_AFFINITY_REPLICAS_AND_PRIMARY`");
}
connectionRequestBuilder.setClientAz(configuration.getClientAZ());
}

if (configuration.getProtocol() != null) {
connectionRequestBuilder.setProtocolValue(configuration.getProtocol().ordinal());
}
Expand Down Expand Up @@ -256,6 +264,8 @@ private ConnectionRequestOuterClass.ReadFrom mapReadFromEnum(ReadFrom readFrom)
return ConnectionRequestOuterClass.ReadFrom.PreferReplica;
case AZ_AFFINITY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinity;
case AZ_AFFINITY_REPLICAS_AND_PRIMARY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinityReplicasAndPrimary;
default:
return ConnectionRequestOuterClass.ReadFrom.Primary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,23 @@ public void connection_on_resp_pointer_throws_ClosingException() {
@SneakyThrows
@Test
public void test_convert_config_with_azaffinity_to_protobuf() {
testConvertConfigWithAzAffinity(ReadFrom.AZ_AFFINITY);
}

@SneakyThrows
@Test
public void test_convert_config_with_azaffinity_replicas_and_primary_to_protobuf() {
testConvertConfigWithAzAffinity(ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY);
}

private void testConvertConfigWithAzAffinity(ReadFrom readFrom) throws Exception {
// setup
String az = "us-east-1a";
GlideClientConfiguration config =
GlideClientConfiguration.builder()
.address(NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build())
.useTLS(true)
.readFrom(ReadFrom.AZ_AFFINITY)
.readFrom(readFrom)
.clientAZ(az)
.build();

Expand All @@ -294,7 +304,7 @@ public void test_convert_config_with_azaffinity_to_protobuf() {
.setPort(DEFAULT_PORT)
.build())
.setTlsMode(TlsMode.SecureTls)
.setReadFrom(ConnectionRequestOuterClass.ReadFrom.AZAffinity)
.setReadFrom(mapReadFrom(readFrom))
.setClientAz(az)
.build();

Expand All @@ -314,16 +324,38 @@ public void test_convert_config_with_azaffinity_to_protobuf() {
@SneakyThrows
@Test
public void test_az_affinity_without_client_az_throws_ConfigurationError() {
testAzAffinityWithoutClientAzThrowsConfigurationError(ReadFrom.AZ_AFFINITY);
}

@SneakyThrows
@Test
public void test_az_affinity_replicas_and_primary_without_client_az_throws_ConfigurationError() {
testAzAffinityWithoutClientAzThrowsConfigurationError(
ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY);
}

private void testAzAffinityWithoutClientAzThrowsConfigurationError(ReadFrom readFrom) {
// setup
String az = "us-east-1a";
GlideClientConfiguration config =
GlideClientConfiguration.builder()
.address(NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build())
.useTLS(true)
.readFrom(ReadFrom.AZ_AFFINITY)
.readFrom(readFrom)
.build();

// verify
assertThrows(ConfigurationError.class, () -> connectionManager.connectToValkey(config));
}

private ConnectionRequestOuterClass.ReadFrom mapReadFrom(ReadFrom readFrom) {
switch (readFrom) {
case AZ_AFFINITY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinity;
case AZ_AFFINITY_REPLICAS_AND_PRIMARY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinityReplicasAndPrimary;
default:
throw new IllegalArgumentException("Unsupported ReadFrom value: " + readFrom);
}
}
}
86 changes: 86 additions & 0 deletions java/integTest/src/test/java/glide/ConnectionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,90 @@ public void test_connection_timeout(boolean clusterMode) {
}
}
}

@SneakyThrows
@Test
public void test_az_affinity_replicas_and_primary_routes_to_primary() {
assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("8.0.0"), "Skip for versions below 8");

String az = "us-east-1a";
String otherAz = "us-east-1b";
int nGetCalls = 4;
String getCmdstat = String.format("cmdstat_get:calls=%d", nGetCalls);

// Create client for setting the configs
GlideClusterClient configSetClient =
GlideClusterClient.createClient(azClusterClientConfig().requestTimeout(2000).build()).get();

// Reset stats and set all nodes to other_az
assertEquals(configSetClient.configResetStat().get(), OK);
configSetClient.configSet(Map.of("availability-zone", otherAz), ALL_NODES).get();

// Set primary for slot 12182 to az
configSetClient
.configSet(
Map.of("availability-zone", az),
new RequestRoutingConfiguration.SlotIdRoute(12182, PRIMARY))
.get();

// Verify primary AZ
ClusterValue<Map<String, String>> primaryAzResult =
configSetClient
.configGet(
new String[] {"availability-zone"},
new RequestRoutingConfiguration.SlotIdRoute(12182, PRIMARY))
.get();
assertEquals(
az,
primaryAzResult.getSingleValue().get("availability-zone"),
"Primary for slot 12182 is not in the expected AZ " + az);

configSetClient.close();

// Create test client with AZ_AFFINITY_REPLICAS_AND_PRIMARY configuration
GlideClusterClient azTestClient =
GlideClusterClient.createClient(
azClusterClientConfig()
.readFrom(ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY)
.clientAZ(az)
.requestTimeout(2000)
.build())
.get();

// Execute GET commands
for (int i = 0; i < nGetCalls; i++) {
azTestClient.get("foo").get();
}

ClusterValue<String> infoResult =
azTestClient.info(new InfoOptions.Section[] {InfoOptions.Section.ALL}, ALL_NODES).get();
Map<String, String> infoData = infoResult.getMultiValue();

// Check that only the primary in the specified AZ handled all GET calls
long matchingEntries =
infoData.values().stream()
.filter(
value ->
value.contains(getCmdstat)
&& value.contains(az)
&& value.contains("role:master"))
.count();
assertEquals(1, matchingEntries, "Exactly one primary in AZ should handle all calls");

// Verify total GET calls
long totalGetCalls =
infoData.values().stream()
.filter(value -> value.contains("cmdstat_get:calls="))
.mapToInt(
value -> {
int startIndex =
value.indexOf("cmdstat_get:calls=") + "cmdstat_get:calls=".length();
int endIndex = value.indexOf(",", startIndex);
return Integer.parseInt(value.substring(startIndex, endIndex));
})
.sum();
assertEquals(nGetCalls, totalGetCalls, "Total GET calls mismatch");

azTestClient.close();
}
}
Loading