Skip to content

Commit

Permalink
Java: Add AzAffinityReplicasAndPrimary Read Strategy (#3083)
Browse files Browse the repository at this point in the history
* Java: Add AzAffinityReplicasAndPrimary Read Strategy
---------
Signed-off-by: Muhammad Awawdi <[email protected]>
  • Loading branch information
Muhammad-awawdi-amazon authored Feb 5, 2025
1 parent 557c2e9 commit 55c9712
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#### Changes


* Java: Add support to AzAffinityReplicasAndPrimary read strategy ([#3083](https://github.com/valkey-io/valkey-glide/pull/3083))
* Python: Add support to AzAffinityReplicasAndPrimary read strategy ([#3071](https://github.com/valkey-io/valkey-glide/pull/3071))
* Node: Add support to AzAffinityReplicasAndPrimary read strategy ([#3063](https://github.com/valkey-io/valkey-glide/pull/3063))
* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2986](https://github.com/valkey-io/valkey-glide/pull/2986))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public abstract class BaseClientConfiguration {
private final Integer inflightRequestsLimit;

/**
* Availability Zone of the client. If ReadFrom strategy is AZAffinity, this setting ensures that
* readonly commands are directed to replicas within the specified AZ if exits.
* Availability Zone of the client. If ReadFrom strategy is AZAffinity or
* AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes
* within the specified AZ if exits.
*/
private final String clientAZ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ public enum ReadFrom {
* round-robin manner, falling back to other replicas or the primary if needed.
*/
AZ_AFFINITY,
/**
* Spread the read requests among nodes within the client's Availability Zone (AZ) in a round
* robin manner, prioritizing local replicas, then the local primary, and falling back to any
* replica or the primary if needed.
*/
AZ_AFFINITY_REPLICAS_AND_PRIMARY,
}
10 changes: 10 additions & 0 deletions java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration
connectionRequestBuilder.setClientAz(configuration.getClientAZ());
}

if (configuration.getReadFrom() == ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY) {
if (configuration.getClientAZ() == null) {
throw new ConfigurationError(
"`clientAZ` must be set when read_from is set to `AZ_AFFINITY_REPLICAS_AND_PRIMARY`");
}
connectionRequestBuilder.setClientAz(configuration.getClientAZ());
}

if (configuration.getProtocol() != null) {
connectionRequestBuilder.setProtocolValue(configuration.getProtocol().ordinal());
}
Expand Down Expand Up @@ -256,6 +264,8 @@ private ConnectionRequestOuterClass.ReadFrom mapReadFromEnum(ReadFrom readFrom)
return ConnectionRequestOuterClass.ReadFrom.PreferReplica;
case AZ_AFFINITY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinity;
case AZ_AFFINITY_REPLICAS_AND_PRIMARY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinityReplicasAndPrimary;
default:
return ConnectionRequestOuterClass.ReadFrom.Primary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,23 @@ public void connection_on_resp_pointer_throws_ClosingException() {
@SneakyThrows
@Test
public void test_convert_config_with_azaffinity_to_protobuf() {
testConvertConfigWithAzAffinity(ReadFrom.AZ_AFFINITY);
}

@SneakyThrows
@Test
public void test_convert_config_with_azaffinity_replicas_and_primary_to_protobuf() {
testConvertConfigWithAzAffinity(ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY);
}

private void testConvertConfigWithAzAffinity(ReadFrom readFrom) throws Exception {
// setup
String az = "us-east-1a";
GlideClientConfiguration config =
GlideClientConfiguration.builder()
.address(NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build())
.useTLS(true)
.readFrom(ReadFrom.AZ_AFFINITY)
.readFrom(readFrom)
.clientAZ(az)
.build();

Expand All @@ -294,7 +304,7 @@ public void test_convert_config_with_azaffinity_to_protobuf() {
.setPort(DEFAULT_PORT)
.build())
.setTlsMode(TlsMode.SecureTls)
.setReadFrom(ConnectionRequestOuterClass.ReadFrom.AZAffinity)
.setReadFrom(mapReadFrom(readFrom))
.setClientAz(az)
.build();

Expand All @@ -314,16 +324,38 @@ public void test_convert_config_with_azaffinity_to_protobuf() {
@SneakyThrows
@Test
public void test_az_affinity_without_client_az_throws_ConfigurationError() {
testAzAffinityWithoutClientAzThrowsConfigurationError(ReadFrom.AZ_AFFINITY);
}

@SneakyThrows
@Test
public void test_az_affinity_replicas_and_primary_without_client_az_throws_ConfigurationError() {
testAzAffinityWithoutClientAzThrowsConfigurationError(
ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY);
}

private void testAzAffinityWithoutClientAzThrowsConfigurationError(ReadFrom readFrom) {
// setup
String az = "us-east-1a";
GlideClientConfiguration config =
GlideClientConfiguration.builder()
.address(NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build())
.useTLS(true)
.readFrom(ReadFrom.AZ_AFFINITY)
.readFrom(readFrom)
.build();

// verify
assertThrows(ConfigurationError.class, () -> connectionManager.connectToValkey(config));
}

private ConnectionRequestOuterClass.ReadFrom mapReadFrom(ReadFrom readFrom) {
switch (readFrom) {
case AZ_AFFINITY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinity;
case AZ_AFFINITY_REPLICAS_AND_PRIMARY:
return ConnectionRequestOuterClass.ReadFrom.AZAffinityReplicasAndPrimary;
default:
throw new IllegalArgumentException("Unsupported ReadFrom value: " + readFrom);
}
}
}
86 changes: 86 additions & 0 deletions java/integTest/src/test/java/glide/ConnectionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,90 @@ public void test_connection_timeout(boolean clusterMode) {
}
}
}

@SneakyThrows
@Test
public void test_az_affinity_replicas_and_primary_routes_to_primary() {
assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("8.0.0"), "Skip for versions below 8");

String az = "us-east-1a";
String otherAz = "us-east-1b";
int nGetCalls = 4;
String getCmdstat = String.format("cmdstat_get:calls=%d", nGetCalls);

// Create client for setting the configs
GlideClusterClient configSetClient =
GlideClusterClient.createClient(azClusterClientConfig().requestTimeout(2000).build()).get();

// Reset stats and set all nodes to other_az
assertEquals(configSetClient.configResetStat().get(), OK);
configSetClient.configSet(Map.of("availability-zone", otherAz), ALL_NODES).get();

// Set primary for slot 12182 to az
configSetClient
.configSet(
Map.of("availability-zone", az),
new RequestRoutingConfiguration.SlotIdRoute(12182, PRIMARY))
.get();

// Verify primary AZ
ClusterValue<Map<String, String>> primaryAzResult =
configSetClient
.configGet(
new String[] {"availability-zone"},
new RequestRoutingConfiguration.SlotIdRoute(12182, PRIMARY))
.get();
assertEquals(
az,
primaryAzResult.getSingleValue().get("availability-zone"),
"Primary for slot 12182 is not in the expected AZ " + az);

configSetClient.close();

// Create test client with AZ_AFFINITY_REPLICAS_AND_PRIMARY configuration
GlideClusterClient azTestClient =
GlideClusterClient.createClient(
azClusterClientConfig()
.readFrom(ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY)
.clientAZ(az)
.requestTimeout(2000)
.build())
.get();

// Execute GET commands
for (int i = 0; i < nGetCalls; i++) {
azTestClient.get("foo").get();
}

ClusterValue<String> infoResult =
azTestClient.info(new InfoOptions.Section[] {InfoOptions.Section.ALL}, ALL_NODES).get();
Map<String, String> infoData = infoResult.getMultiValue();

// Check that only the primary in the specified AZ handled all GET calls
long matchingEntries =
infoData.values().stream()
.filter(
value ->
value.contains(getCmdstat)
&& value.contains(az)
&& value.contains("role:master"))
.count();
assertEquals(1, matchingEntries, "Exactly one primary in AZ should handle all calls");

// Verify total GET calls
long totalGetCalls =
infoData.values().stream()
.filter(value -> value.contains("cmdstat_get:calls="))
.mapToInt(
value -> {
int startIndex =
value.indexOf("cmdstat_get:calls=") + "cmdstat_get:calls=".length();
int endIndex = value.indexOf(",", startIndex);
return Integer.parseInt(value.substring(startIndex, endIndex));
})
.sum();
assertEquals(nGetCalls, totalGetCalls, "Total GET calls mismatch");

azTestClient.close();
}
}

0 comments on commit 55c9712

Please sign in to comment.