Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add AZAffinityReplicasAndPrimary ReadFrom strategy support #3071

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#### Changes

* Python: Add support to AzAffinityReplicasAndPrimary read strategy ([#3071](https://github.com/valkey-io/valkey-glide/pull/3071))
* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2792](https://github.com/valkey-io/valkey-glide/pull/2792))
* Go: Add `HScan` command ([#2917](https://github.com/valkey-io/valkey-glide/pull/2917))
* Java, Node, Python: Add transaction commands for JSON module ([#2862](https://github.com/valkey-io/valkey-glide/pull/2862))
Expand Down
13 changes: 13 additions & 0 deletions python/python/glide/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class ReadFrom(Enum):
Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner,
falling back to other replicas or the primary if needed
"""
AZ_AFFINITY_REPLICAS_AND_PRIMARY = ProtobufReadFrom.AZAffinityReplicasAndPrimary
"""
Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner,
prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
"""


class ProtocolVersion(Enum):
Expand Down Expand Up @@ -194,6 +199,7 @@ def __init__(
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
If ReadFrom strategy is AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes (first replicas then primary) within the specified AZ if they exist.
advanced_config (Optional[AdvancedBaseClientConfiguration]): Advanced configuration settings for the client.
"""
self.addresses = addresses
Expand All @@ -212,6 +218,11 @@ def __init__(
"client_az must be set when read_from is set to AZ_AFFINITY"
)

if read_from == ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY and not client_az:
raise ValueError(
"client_az must be set when read_from is set to AZ_AFFINITY_REPLICAS_AND_PRIMARY"
)

def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
) -> ConnectionRequest:
Expand Down Expand Up @@ -302,6 +313,7 @@ class GlideClientConfiguration(BaseClientConfiguration):
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
If ReadFrom strategy is AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes (first replicas then primary) within the specified AZ if they exist.
advanced_config (Optional[AdvancedGlideClientConfiguration]): Advanced configuration settings for the client, see `AdvancedGlideClientConfiguration`.
"""

Expand Down Expand Up @@ -458,6 +470,7 @@ class GlideClusterClientConfiguration(BaseClientConfiguration):
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
If ReadFrom strategy is AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes (first replicas then primary) within the specified AZ if they exist.
advanced_config (Optional[AdvancedGlideClusterClientConfiguration]) : Advanced configuration settings for the client, see `AdvancedGlideClusterClientConfiguration`.


Expand Down
15 changes: 15 additions & 0 deletions python/python/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ def test_convert_config_with_azaffinity_to_protobuf():
assert request.client_az == az


def test_convert_config_with_azaffinity_replicas_and_primary_to_protobuf():
az = "us-east-1a"
config = BaseClientConfiguration(
[NodeAddress("127.0.0.1")],
use_tls=True,
read_from=ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY,
client_az=az,
)
request = config._create_a_protobuf_conn_request()
assert isinstance(request, ConnectionRequest)
assert request.tls_mode is TlsMode.SecureTls
assert request.read_from == ProtobufReadFrom.AZAffinityReplicasAndPrimary
assert request.client_az == az


def test_connection_timeout_in_protobuf_request():
connection_timeout = 5000 # in milliseconds
config = GlideClientConfiguration(
Expand Down
104 changes: 104 additions & 0 deletions python/python/tests/test_read_from_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,107 @@ async def test_az_affinity_requires_client_az(
read_from=ReadFrom.AZ_AFFINITY,
request_timeout=2000,
)

@pytest.mark.skip_if_version_below("8.0.0")
@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_az_affinity_replicas_and_primary_routes_to_primary(
self,
request,
cluster_mode: bool,
protocol: ProtocolVersion,
):
"""Test that the client with AZ_AFFINITY_REPLICAS_AND_PRIMARY routes to the primary in the same AZ"""
az = "us-east-1a"
other_az = "us-east-1b"
GET_CALLS = 4

client_for_config_set = None
client_for_testing_az = None

try:
client_for_config_set = await create_client(
request,
cluster_mode,
protocol=protocol,
request_timeout=2000,
)

# Reset stats and set all nodes to other_az
await client_for_config_set.config_resetstat()
await client_for_config_set.custom_command(
["CONFIG", "SET", "availability-zone", other_az],
AllNodes(),
)

# Set primary for slot 12182 to az
await client_for_config_set.custom_command(
["CONFIG", "SET", "availability-zone", az],
route=SlotIdRoute(SlotType.PRIMARY, 12182),
)

# Verify primary AZ
primary_az = await client_for_config_set.custom_command(
["CONFIG", "GET", "availability-zone"],
route=SlotIdRoute(SlotType.PRIMARY, 12182),
)
assert (
primary_az[b"availability-zone"].decode() == az
), f"Primary for slot 12182 is not in the expected AZ {az}"

# Create test client AFTER configuration
client_for_testing_az = await create_client(
request,
cluster_mode,
protocol=protocol,
read_from=ReadFrom.AZ_AFFINITY_REPLICAS_AND_PRIMARY,
client_az=az,
request_timeout=2000,
)

# Perform GET operations
for i in range(GET_CALLS):
await client_for_testing_az.get("foo")

# Collect info from all nodes
info_result = await client_for_testing_az.info(
[
InfoSection.SERVER,
InfoSection.REPLICATION,
InfoSection.COMMAND_STATS,
],
AllNodes(),
)

matching_entries_count = 0
total_get_calls = 0
node_info_list = []

for node, node_info in info_result.items():
info_str = node_info.decode()
is_primary = "role:master" in info_str
az_match = re.search(r"availability_zone:(\S+)", info_str)
node_az = az_match.group(1) if az_match else ""
get_calls_match = re.search(r"cmdstat_get:calls=(\d+)", info_str)
get_calls = int(get_calls_match.group(1)) if get_calls_match else 0

total_get_calls += get_calls
node_info_list.append((node_az, is_primary, get_calls))

if is_primary and node_az == az and get_calls == GET_CALLS:
matching_entries_count += 1
elif node_az != az and get_calls > 0:
pytest.fail(f"GET calls found on node not in AZ {az}")

assert (
matching_entries_count == 1
), f"Exactly one primary in AZ should handle all calls. Matching entries: {matching_entries_count}, Total GET calls: {total_get_calls}"
assert (
total_get_calls == GET_CALLS
), f"Total GET calls mismatch, expected {GET_CALLS}, got {total_get_calls}"

finally:
if client_for_testing_az:
await client_for_testing_az.close()
if client_for_config_set:
await client_for_config_set.close()
Loading