Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add AzAffinityReplicasAndPrimary Read Strategy #2986

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#### Changes

* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2792](https://github.com/valkey-io/valkey-glide/pull/2792))
* Go: Add `HScan` command ([#2917](https://github.com/valkey-io/valkey-glide/pull/2917))
* Java, Node, Python: Add transaction commands for JSON module ([#2862](https://github.com/valkey-io/valkey-glide/pull/2862))
* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// If ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
Expand Down
192 changes: 183 additions & 9 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,36 @@ where
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, false)
}

/// Returns the node's connection in the same availability zone as `client_az`,
/// checking replicas first, then primary, and falling back to any available node.
pub(crate) fn round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, true)
}

fn get_connection_by_az_affinity_strategy(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
check_primary: bool, // Strategy flag
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;

// Step 1: Try to find a replica in the same AZ
loop {
retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same availability zone.
if retries > addrs.replicas().len() {
// Attempt a fallback to any available replica or primary if needed.
return self.round_robin_read_from_replica(slot_map_value);
break;
}

// Calculate index based on initial index and check count.
Expand All @@ -286,6 +305,20 @@ where
}
}
}

// Step 2: Check if primary is in the same AZ
if check_primary {
if let Some((address, connection_details)) =
self.connection_details_for_address(addrs.primary().as_str())
{
if self.az_for_address(&address) == Some(client_az) {
return Some((address, connection_details.conn));
}
}
}

// Step 3: Fall back to any available replica using round-robin or primary if needed
self.round_robin_read_from_replica(slot_map_value)
}

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
Expand All @@ -311,6 +344,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
Expand All @@ -319,6 +357,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand Down Expand Up @@ -510,6 +553,7 @@ mod tests {

fn create_container_with_az_strategy(
use_management_connections: bool,
strategy: Option<ReadFromReplicaStrategy>,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Expand Down Expand Up @@ -562,15 +606,12 @@ mod tests {
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);
connection_map.insert(
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);

ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
read_from_replica_strategy: strategy
.unwrap_or(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
topology_hash: 0,
}
}
Expand Down Expand Up @@ -801,7 +842,10 @@ mod tests {

#[test]
fn get_connection_for_az_affinity_route() {
let container = create_container_with_az_strategy(false);
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);

// slot number is not exits
assert!(container
Expand Down Expand Up @@ -864,7 +908,10 @@ mod tests {

#[test]
fn get_connection_for_az_affinity_route_round_robin() {
let container = create_container_with_az_strategy(false);
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);

let mut addresses = vec![
container
Expand All @@ -888,6 +935,133 @@ mod tests {
assert_eq!(addresses, vec![31, 31, 33, 33]);
}

#[test]
fn get_connection_for_az_affinity_replicas_and_primary_route() {
// Create a container with AZAffinityReplicasAndPrimary strategy
let container: ConnectionsContainer<usize> = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(
"use-1a".to_string(),
)),
);
// Modify the AZ of primary1
container
.connection_map
.get_mut("primary1")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Modify the AZ of primary2
container
.connection_map
.get_mut("primary2")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());

// Modify the AZ of primary3
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Modify the AZ of replica2-1
container
.connection_map
.get_mut("replica2-1")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());

// Slot number does not exist (slot 1001 wasn't assigned to any primary)
assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

// Test getting replica in client's AZ for slot 2001
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));

// Remove one replica in the client's AZ
remove_nodes(&container, &["replica3-3"]);

// Should still get the remaining replica in the client's AZ
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas in the client's AZ
remove_nodes(&container, &["replica3-1"]);

// Test falling back to replica in different AZ
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Set the primary to be in the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1a".to_string());

// Remove the last replica
remove_nodes(&container, &["replica3-2"]);

// Should now fall back to the primary in the client's AZ
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);

// Move the primary out of the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Test falling back to replica under different primary
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas
remove_nodes(&container, &["replica2-1"]);

// Test falling back to available primaries with their respective slots
assert!(one_of(
container.connection_for_route(&Route::new(1002, SlotAddr::Master)),
&[2],
));
assert!(one_of(
container.connection_for_route(&Route::new(500, SlotAddr::Master)),
&[1],
));
}

#[test]
fn get_connection_by_address() {
let container = create_container();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ where
let discover_az = matches!(
params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_)
);

match create_connection::<C>(
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ where
let discover_az = matches!(
cluster_params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_)
);

let glide_connection_options = GlideConnectionOptions {
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ impl ClusterClientBuilder {
/// The parameter `read_strategy` can be one of:
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(availability_zone)` - attempt to access nodes in the same availability zone.
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
///
Expand Down
4 changes: 4 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy {
/// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner,
/// falling back to other replicas or the primary if needed.
AZAffinity(String),
/// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner,
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
AZAffinityReplicasAndPrimary(String),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -60,6 +63,7 @@ fn get_address_from_slot(
addrs.replicas()[index].clone()
}
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_az) => todo!(), // Drop sync client
}
}

Expand Down
Loading
Loading