From 9ed7bcb68289bb26f89e69a3189d91826ae44f6a Mon Sep 17 00:00:00 2001 From: Muhammad Awawdi Date: Tue, 4 Feb 2025 10:05:00 +0000 Subject: [PATCH] Core: Implement AzAffinityReplicasAndPrimary Read Strategy Signed-off-by: Muhammad Awawdi --- CHANGELOG.md | 1 + glide-core/redis-rs/redis/src/aio/mod.rs | 2 +- glide-core/redis-rs/redis/src/client.rs | 2 +- .../cluster_async/connections_container.rs | 192 +++++++++++++++++- .../src/cluster_async/connections_logic.rs | 1 + .../redis-rs/redis/src/cluster_async/mod.rs | 1 + .../redis-rs/redis/src/cluster_client.rs | 2 + .../redis-rs/redis/src/cluster_slotmap.rs | 4 + .../redis/tests/test_cluster_async.rs | 166 ++++++++++++++- glide-core/src/client/mod.rs | 5 + glide-core/src/client/standalone_client.rs | 72 +++++++ glide-core/src/client/types.rs | 15 ++ .../src/protobuf/connection_request.proto | 1 + glide-core/tests/test_standalone_client.rs | 13 ++ node/src/BaseClient.ts | 15 +- 15 files changed, 469 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5301843e06..474e9a2dac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ #### Changes +* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2792](https://github.com/valkey-io/valkey-glide/pull/2792)) * Go: Add `HScan` command ([#2917](https://github.com/valkey-io/valkey-glide/pull/2917)) * Java, Node, Python: Add transaction commands for JSON module ([#2862](https://github.com/valkey-io/valkey-glide/pull/2862)) * Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847)) diff --git a/glide-core/redis-rs/redis/src/aio/mod.rs b/glide-core/redis-rs/redis/src/aio/mod.rs index 077046feba..17ec7b58b0 100644 --- a/glide-core/redis-rs/redis/src/aio/mod.rs +++ b/glide-core/redis-rs/redis/src/aio/mod.rs @@ -146,7 +146,7 @@ where async fn setup_connection( connection_info: &RedisConnectionInfo, con: &mut C, - // This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity. + // This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary. // An INFO command will be triggered in the connection's setup to update the 'availability_zone' property. discover_az: bool, ) -> RedisResult<()> diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 2b97671110..06764cd0f7 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions { #[cfg(feature = "aio")] /// Passive disconnect notifier pub disconnect_notifier: Option>, - /// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'. + /// If ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary, this parameter will be set to 'true'. /// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property. pub discover_az: bool, /// Connection timeout duration. diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 955d24d9e9..3ab073002b 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -253,17 +253,36 @@ where &self, slot_map_value: &SlotMapValue, client_az: String, + ) -> Option> { + self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, false) + } + + /// Returns the node's connection in the same availability zone as `client_az`, + /// checking replicas first, then primary, and falling back to any available node. + pub(crate) fn round_robin_read_from_replica_with_az_awareness_replicas_and_primary( + &self, + slot_map_value: &SlotMapValue, + client_az: String, + ) -> Option> { + self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, true) + } + + fn get_connection_by_az_affinity_strategy( + &self, + slot_map_value: &SlotMapValue, + client_az: String, + check_primary: bool, // Strategy flag ) -> Option> { let addrs = &slot_map_value.addrs; let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed); let mut retries = 0usize; + // Step 1: Try to find a replica in the same AZ loop { retries = retries.saturating_add(1); // Looped through all replicas; no connected replica found in the same availability zone. if retries > addrs.replicas().len() { - // Attempt a fallback to any available replica or primary if needed. - return self.round_robin_read_from_replica(slot_map_value); + break; } // Calculate index based on initial index and check count. @@ -286,6 +305,20 @@ where } } } + + // Step 2: Check if primary is in the same AZ + if check_primary { + if let Some((address, connection_details)) = + self.connection_details_for_address(addrs.primary().as_str()) + { + if self.az_for_address(&address) == Some(client_az) { + return Some((address, connection_details.conn)); + } + } + } + + // Step 3: Fall back to any available replica using round-robin or primary if needed + self.round_robin_read_from_replica(slot_map_value) } fn lookup_route(&self, route: &Route) -> Option> { @@ -311,6 +344,11 @@ where slot_map_value, az.to_string(), ), + ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self + .round_robin_read_from_replica_with_az_awareness_replicas_and_primary( + slot_map_value, + az.to_string(), + ), }, // when the user strategy per command is replica_preffered SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy { @@ -319,6 +357,11 @@ where slot_map_value, az.to_string(), ), + ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self + .round_robin_read_from_replica_with_az_awareness_replicas_and_primary( + slot_map_value, + az.to_string(), + ), _ => self.round_robin_read_from_replica(slot_map_value), }, } @@ -510,6 +553,7 @@ mod tests { fn create_container_with_az_strategy( use_management_connections: bool, + strategy: Option, ) -> ConnectionsContainer { let slot_map = SlotMap::new( vec![ @@ -562,15 +606,12 @@ mod tests { "replica3-3".into(), create_cluster_node(33, use_management_connections, Some("use-1a".to_string())), ); - connection_map.insert( - "replica3-3".into(), - create_cluster_node(33, use_management_connections, Some("use-1a".to_string())), - ); ConnectionsContainer { slot_map, connection_map, - read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()), + read_from_replica_strategy: strategy + .unwrap_or(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())), topology_hash: 0, } } @@ -801,7 +842,10 @@ mod tests { #[test] fn get_connection_for_az_affinity_route() { - let container = create_container_with_az_strategy(false); + let container = create_container_with_az_strategy( + false, + Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())), + ); // slot number is not exits assert!(container @@ -864,7 +908,10 @@ mod tests { #[test] fn get_connection_for_az_affinity_route_round_robin() { - let container = create_container_with_az_strategy(false); + let container = create_container_with_az_strategy( + false, + Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())), + ); let mut addresses = vec![ container @@ -888,6 +935,133 @@ mod tests { assert_eq!(addresses, vec![31, 31, 33, 33]); } + #[test] + fn get_connection_for_az_affinity_replicas_and_primary_route() { + // Create a container with AZAffinityReplicasAndPrimary strategy + let container: ConnectionsContainer = create_container_with_az_strategy( + false, + Some(ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary( + "use-1a".to_string(), + )), + ); + // Modify the AZ of primary1 + container + .connection_map + .get_mut("primary1") + .unwrap() + .user_connection + .az = Some("use-1b".to_string()); + + // Modify the AZ of primary2 + container + .connection_map + .get_mut("primary2") + .unwrap() + .user_connection + .az = Some("use-1c".to_string()); + + // Modify the AZ of primary3 + container + .connection_map + .get_mut("primary3") + .unwrap() + .user_connection + .az = Some("use-1b".to_string()); + + // Modify the AZ of replica2-1 + container + .connection_map + .get_mut("replica2-1") + .unwrap() + .user_connection + .az = Some("use-1c".to_string()); + + // Slot number does not exist (slot 1001 wasn't assigned to any primary) + assert!(container + .connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional)) + .is_none()); + + // Test getting replica in client's AZ for slot 2001 + assert!(one_of( + container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)), + &[31, 33], + )); + + // Remove one replica in the client's AZ + remove_nodes(&container, &["replica3-3"]); + + // Should still get the remaining replica in the client's AZ + assert_eq!( + 31, + container + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Remove all replicas in the client's AZ + remove_nodes(&container, &["replica3-1"]); + + // Test falling back to replica in different AZ + assert_eq!( + 32, + container + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Set the primary to be in the client's AZ + container + .connection_map + .get_mut("primary3") + .unwrap() + .user_connection + .az = Some("use-1a".to_string()); + + // Remove the last replica + remove_nodes(&container, &["replica3-2"]); + + // Should now fall back to the primary in the client's AZ + assert_eq!( + 3, + container + .connection_for_route(&Route::new(2001, SlotAddr::Master)) + .unwrap() + .1 + ); + + // Move the primary out of the client's AZ + container + .connection_map + .get_mut("primary3") + .unwrap() + .user_connection + .az = Some("use-1b".to_string()); + + // Test falling back to replica under different primary + assert_eq!( + 21, + container + .connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Remove all replicas + remove_nodes(&container, &["replica2-1"]); + + // Test falling back to available primaries with their respective slots + assert!(one_of( + container.connection_for_route(&Route::new(1002, SlotAddr::Master)), + &[2], + )); + assert!(one_of( + container.connection_for_route(&Route::new(500, SlotAddr::Master)), + &[1], + )); + } + #[test] fn get_connection_by_address() { let container = create_container(); diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index e5af8d1e50..9fbd699bc1 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -182,6 +182,7 @@ where let discover_az = matches!( params.read_from_replicas, crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + | crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_) ); match create_connection::( diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3d61efce29..0555c59472 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1088,6 +1088,7 @@ where let discover_az = matches!( cluster_params.read_from_replicas, crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + | crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_) ); let glide_connection_options = GlideConnectionOptions { diff --git a/glide-core/redis-rs/redis/src/cluster_client.rs b/glide-core/redis-rs/redis/src/cluster_client.rs index c4dc0103dc..d0a14b17d7 100644 --- a/glide-core/redis-rs/redis/src/cluster_client.rs +++ b/glide-core/redis-rs/redis/src/cluster_client.rs @@ -392,6 +392,8 @@ impl ClusterClientBuilder { /// The parameter `read_strategy` can be one of: /// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone. /// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed. + /// `ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(availability_zone)` - attempt to access nodes in the same availability zone. + /// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed. /// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed. /// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node. /// diff --git a/glide-core/redis-rs/redis/src/cluster_slotmap.rs b/glide-core/redis-rs/redis/src/cluster_slotmap.rs index f2e43b4449..f519995f50 100644 --- a/glide-core/redis-rs/redis/src/cluster_slotmap.rs +++ b/glide-core/redis-rs/redis/src/cluster_slotmap.rs @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy { /// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner, /// falling back to other replicas or the primary if needed. AZAffinity(String), + /// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner, + /// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed. + AZAffinityReplicasAndPrimary(String), } #[derive(Debug, Default)] @@ -60,6 +63,7 @@ fn get_address_from_slot( addrs.replicas()[index].clone() } ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client + ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_az) => todo!(), // Drop sync client } } diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index c69a9f933f..4888bccde5 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -208,6 +208,20 @@ mod cluster_async { #[tokio::test] async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_half_replicas() { + test_az_affinity_helper(StrategyVariant::AZAffinity).await; + } + + #[tokio::test] + async fn test_routing_by_slot_to_replica_with_az_affinity_replicas_and_primary_strategy_to_half_replicas( + ) { + test_az_affinity_helper(StrategyVariant::AZAffinityReplicasAndPrimary).await; + } + enum StrategyVariant { + AZAffinity, + AZAffinityReplicasAndPrimary, + } + + async fn test_az_affinity_helper(strategy_variant: StrategyVariant) { // Skip test if version is less then Valkey 8.0 if crate::engine_version_less_than("8.0").await { return; @@ -243,11 +257,18 @@ mod cluster_async { .await .unwrap(); } - + let strategy = match strategy_variant { + StrategyVariant::AZAffinity => { + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(az.clone()) + } + StrategyVariant::AZAffinityReplicasAndPrimary => { + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary( + az.clone(), + ) + } + }; let mut client = ClusterClient::builder(cluster_addresses.clone()) - .read_from(redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity( - az.clone(), - )) + .read_from(strategy) .build() .unwrap() .get_async_connection(None) @@ -303,7 +324,16 @@ mod cluster_async { } #[tokio::test] - async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_replicas() { + async fn test_az_affinity_strategy_to_all_replicas() { + test_all_replicas_helper(StrategyVariant::AZAffinity).await; + } + + #[tokio::test] + async fn test_az_affinity_replicas_and_primary_to_all_replicas() { + test_all_replicas_helper(StrategyVariant::AZAffinityReplicasAndPrimary).await; + } + + async fn test_all_replicas_helper(strategy_variant: StrategyVariant) { // Skip test if version is less then Valkey 8.0 if crate::engine_version_less_than("8.0").await { return; @@ -334,10 +364,19 @@ mod cluster_async { .await .unwrap(); + // Strategy-specific client configuration + let strategy = match strategy_variant { + StrategyVariant::AZAffinity => { + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(az.clone()) + } + StrategyVariant::AZAffinityReplicasAndPrimary => { + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary( + az.clone(), + ) + } + }; let mut client = ClusterClient::builder(cluster_addresses.clone()) - .read_from(redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity( - az.clone(), - )) + .read_from(strategy) .build() .unwrap() .get_async_connection(None) @@ -392,6 +431,117 @@ mod cluster_async { ); } + #[tokio::test] + async fn test_az_affinity_replicas_and_primary_prefers_local_primary() { + // Skip test if version is less than Valkey 8.0 + if crate::engine_version_less_than("8.0").await { + return; + } + + let replica_num: u16 = 4; + let primaries_num: u16 = 3; + let primary_in_same_az: u16 = 1; + + let cluster = + TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num); + let client_az = "us-east-1a".to_string(); + let other_az = "us-east-1b".to_string(); + + let mut connection = cluster.async_connection(None).await; + let cluster_addresses: Vec<_> = cluster + .cluster + .servers + .iter() + .map(|server| server.connection_info()) + .collect(); + + // Set AZ for all nodes to a different AZ initially + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &other_az.clone()]); + + connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + // Set the client's AZ for one primary (the last one) + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &client_az]); + connection + .route_command( + &cmd, + RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( + 12182, // This should target the third primary + SlotAddr::Master, + ))), + ) + .await + .unwrap(); + + let mut client = ClusterClient::builder(cluster_addresses.clone()) + .read_from( + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary( + client_az.clone(), + ), + ) + .build() + .unwrap() + .get_async_connection(None) + .await + .unwrap(); + + // Perform read operations + let n = 100; + for _ in 0..n { + let mut cmd = redis::cmd("GET"); + cmd.arg("foo"); // This key should hash to the third primary's slot + let _res: RedisResult = cmd.query_async(&mut client).await; + } + + // Gather INFO + let mut cmd = redis::cmd("INFO"); + cmd.arg("ALL"); + let info = connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + let info_result: HashMap = + redis::from_owned_redis_value::>(info).unwrap(); + let get_cmdstat = "cmdstat_get:calls=".to_string(); + let n_get_cmdstat = format!("cmdstat_get:calls={}", n); + let client_az2 = format!("availability-zone:{}", client_az); + let mut matching_entries_count: usize = 0; + + for value in info_result.values() { + if value.contains(&get_cmdstat) { + if value.contains(&client_az) && value.contains(&n_get_cmdstat) { + matching_entries_count += 1; + } else { + panic!( + "Invalid entry found: {}. Expected cmdstat_get:calls={} and availability_zone={}", + value, n, client_az2); + } + } + } + + assert_eq!( + (matching_entries_count.try_into() as Result).unwrap(), + primary_in_same_az, + "Test failed: expected exactly '{}' entries with '{}' and '{}', found {}", + primary_in_same_az, + get_cmdstat, + client_az, + matching_entries_count + ); + } + #[test] #[serial_test::serial] fn test_async_cluster_basic_eval() { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 005a38a9ca..1f890b20bd 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -579,6 +579,9 @@ async fn create_cluster_client( let read_from_strategy = request.read_from.unwrap_or_default(); builder = builder.read_from(match read_from_strategy { ReadFrom::AZAffinity(az) => ReadFromReplicaStrategy::AZAffinity(az), + ReadFrom::AZAffinityReplicasAndPrimary(az) => { + ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) + } ReadFrom::PreferReplica => ReadFromReplicaStrategy::RoundRobin, ReadFrom::Primary => ReadFromReplicaStrategy::AlwaysFromPrimary, }); @@ -733,6 +736,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { ReadFrom::Primary => "Only primary", ReadFrom::PreferReplica => "Prefer replica", ReadFrom::AZAffinity(_) => "Prefer replica in user's availability zone", + ReadFrom::AZAffinityReplicasAndPrimary(_) => + "Prefer replica and primary in user's availability zone", } ) }) diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index c2c541c763..f4945335b2 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -31,6 +31,10 @@ enum ReadFrom { client_az: String, last_read_replica_index: Arc, }, + AZAffinityReplicasAndPrimary { + client_az: String, + last_read_replica_index: Arc, + }, } #[derive(Debug)] @@ -130,6 +134,7 @@ impl StandaloneClient { let discover_az = matches!( connection_request.read_from, Some(ClientReadFrom::AZAffinity(_)) + | Some(ClientReadFrom::AZAffinityReplicasAndPrimary(_)) ); let connection_timeout = to_duration( @@ -306,6 +311,57 @@ impl StandaloneClient { } } + async fn round_robin_read_from_replica_az_awareness_replicas_and_primary( + &self, + latest_read_replica_index: &Arc, + client_az: String, + ) -> &ReconnectingConnection { + let initial_index = latest_read_replica_index.load(Ordering::Relaxed); + let mut retries = 0usize; + + // Step 1: Try to find a replica in the same AZ + loop { + retries = retries.saturating_add(1); + // Looped through all replicas; no connected replica found in the same AZ. + if retries >= self.inner.nodes.len() { + break; + } + + // Calculate index based on initial index and check count. + let index = (initial_index + retries) % self.inner.nodes.len(); + let replica = &self.inner.nodes[index]; + + // Attempt to get a connection and retrieve the replica's AZ. + if let Ok(connection) = replica.get_connection().await { + if let Some(replica_az) = connection.get_az().as_deref() { + if replica_az == client_az { + // Update `latest_used_replica` with the index of this replica. + let _ = latest_read_replica_index.compare_exchange_weak( + initial_index, + index, + Ordering::Relaxed, + Ordering::Relaxed, + ); + return replica; + } + } + } + } + + // Step 2: Check if primary is in the same AZ + let primary = self.get_primary_connection(); + if let Ok(connection) = primary.get_connection().await { + if let Some(primary_az) = connection.get_az().as_deref() { + if primary_az == client_az { + return primary; + } + } + } + + // Step 3: Fall back to any available replica using round-robin + self.round_robin_read_from_replica(latest_read_replica_index) + } + async fn get_connection(&self, readonly: bool) -> &ReconnectingConnection { if self.inner.nodes.len() == 1 || !readonly { return self.get_primary_connection(); @@ -326,6 +382,16 @@ impl StandaloneClient { ) .await } + ReadFrom::AZAffinityReplicasAndPrimary { + client_az, + last_read_replica_index, + } => { + self.round_robin_read_from_replica_az_awareness_replicas_and_primary( + last_read_replica_index, + client_az.to_string(), + ) + .await + } } } @@ -608,6 +674,12 @@ fn get_read_from(read_from: Option) -> ReadFrom { client_az: az, last_read_replica_index: Default::default(), }, + Some(super::ReadFrom::AZAffinityReplicasAndPrimary(az)) => { + ReadFrom::AZAffinityReplicasAndPrimary { + client_az: az, + last_read_replica_index: Default::default(), + } + } None => ReadFrom::Primary, } } diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs index e2314a1ab6..0435eac7f1 100644 --- a/glide-core/src/client/types.rs +++ b/glide-core/src/client/types.rs @@ -58,6 +58,7 @@ pub enum ReadFrom { Primary, PreferReplica, AZAffinity(String), + AZAffinityReplicasAndPrimary(String), } #[derive(PartialEq, Eq, Clone, Copy, Default)] @@ -113,6 +114,20 @@ impl From for ConnectionRequest { ReadFrom::PreferReplica } } + protobuf::ReadFrom::AZAffinityReplicasAndPrimary => { + if let Some(client_az) = chars_to_string_option(&value.client_az) { + ReadFrom::AZAffinityReplicasAndPrimary(client_az) + } else { + log_warn( + "types", + format!( + "Failed to convert availability zone string: '{:?}'. Falling back to `ReadFrom::PreferReplica`", + value.client_az + ), + ); + ReadFrom::PreferReplica + } + }, }); let client_name = chars_to_string_option(&value.client_name); diff --git a/glide-core/src/protobuf/connection_request.proto b/glide-core/src/protobuf/connection_request.proto index 8e33b39da3..3cb2077dba 100644 --- a/glide-core/src/protobuf/connection_request.proto +++ b/glide-core/src/protobuf/connection_request.proto @@ -11,6 +11,7 @@ enum ReadFrom { PreferReplica = 1; LowestLatency = 2; AZAffinity = 3; + AZAffinityReplicasAndPrimary = 4; } enum TlsMode { diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index 77363b5c18..546cfb9ba9 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -272,6 +272,7 @@ mod standalone_client_tests { }); } + // TODO - Current test falls back to PreferReplica when run, need to integrate the az here also #[rstest] #[serial_test::serial] #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] @@ -283,6 +284,18 @@ mod standalone_client_tests { ..Default::default() }); } + // TODO - Needs changes in the struct and the create_primary_mock + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] + fn test_read_from_replica_az_affinity_replicas_and_primary() { + test_read_from_replica(ReadFromReplicaTestConfig { + read_from: ReadFrom::AZAffinityReplicasAndPrimary, + expected_primary_reads: 0, + expected_replica_reads: vec![1, 1, 1], + ..Default::default() + }); + } #[rstest] #[serial_test::serial] diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 025d0218bf..88595d15cc 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -501,7 +501,10 @@ export type ReadFrom = | "preferReplica" /** Spread the requests between replicas in the same client's Aviliablity zone in a round robin manner. If no replica is available, route the requests to the primary.*/ - | "AZAffinity"; + | "AZAffinity" + /** Spread the read requests among all nodes within the client's Availability Zone (AZ) in a round robin manner, + prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.*/ + | "AZAffinityReplicasAndPrimary"; /** * Configuration settings for creating a client. Shared settings for standalone and cluster clients. @@ -531,11 +534,11 @@ export type ReadFrom = * * ### Read Strategy * - * - Use `readFrom` to specify the client's read strategy (e.g., primary, preferReplica, AZAffinity). + * - Use `readFrom` to specify the client's read strategy (e.g., primary, preferReplica, AZAffinity, AZAffinityReplicasAndPrimary). * * ### Availability Zone * - * - Use `clientAz` to specify the client's availability zone, which can influence read operations when using `readFrom: 'AZAffinity'`. + * - Use `clientAz` to specify the client's availability zone, which can influence read operations when using `readFrom: 'AZAffinity'or `readFrom: 'AZAffinityReplicasAndPrimary'`. * * ### Decoder Settings * @@ -637,13 +640,15 @@ export interface BaseClientConfiguration { inflightRequestsLimit?: number; /** * Availability Zone of the client. - * If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + * If ReadFrom strategy is AZAffinity or AZAffinityReplicasAndPrimary, this setting ensures that readonly commands are directed to nodes within the specified AZ if they exist. * * @example * ```typescript * // Example configuration for setting client availability zone and read strategy * configuration.clientAz = 'us-east-1a'; // Sets the client's availability zone * configuration.readFrom = 'AZAffinity'; // Directs read operations to nodes within the same AZ + * Or + * configuration.readFrom = 'AZAffinityReplicasAndPrimary'; // Directs read operations to any node (primary or replica) within the same AZ * ``` */ clientAz?: string; @@ -6069,6 +6074,8 @@ export class BaseClient { primary: connection_request.ReadFrom.Primary, preferReplica: connection_request.ReadFrom.PreferReplica, AZAffinity: connection_request.ReadFrom.AZAffinity, + AZAffinityReplicasAndPrimary: + connection_request.ReadFrom.AZAffinityReplicasAndPrimary, }; /**