Skip to content

Commit

Permalink
Pipelines: error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Mar 4, 2025
1 parent d70368b commit e2d5b5b
Show file tree
Hide file tree
Showing 21 changed files with 1,063 additions and 332 deletions.
47 changes: 37 additions & 10 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
use connections_container::{RefreshTaskNotifier, RefreshTaskState, RefreshTaskStatus};
use dashmap::DashMap;
use pipeline_routing::{
collect_and_send_pending_requests, map_pipeline_to_nodes, process_pipeline_responses,
collect_and_send_pending_requests, map_pipeline_to_nodes, process_and_retry_pipeline_responses,
route_for_pipeline, PipelineResponses,
};
use std::{
Expand Down Expand Up @@ -292,6 +292,7 @@ where
count,
route: route.into(),
sub_pipeline: false,
retry: 0,
},
sender,
})
Expand Down Expand Up @@ -614,6 +615,7 @@ enum CmdArg<C> {
count: usize,
route: InternalSingleNodeRouting<C>,
sub_pipeline: bool,
retry: u32,
},
ClusterScan {
// struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type.
Expand Down Expand Up @@ -2190,6 +2192,7 @@ where
.map_err(|err| (OperationTarget::NotFound, err))?;
conn.req_packed_command(&cmd)
.await
.and_then(|value| value.extract_error(None, None))
.map(Response::Single)
.map_err(|err| (address.into(), err))
}
Expand All @@ -2204,8 +2207,15 @@ where
let (address, mut conn) = conn.await.map_err(|err| (OperationTarget::NotFound, err))?;
conn.req_packed_commands(&pipeline, offset, count)
.await
.and_then(|value| {
if pipeline.is_atomic() {
Value::extract_error_vec(value, Some(ErrorKind::ExtensionError), None)
} else {
Ok(value)
}
})
.map(Response::Multiple)
.map_err(|err| (OperationTarget::Node { address }, err))
.map_err(|err| (address.into(), err)) //TODO: check
}

async fn try_request(info: RequestInfo<C>, core: Core<C>) -> OperationResult {
Expand All @@ -2217,6 +2227,7 @@ where
count,
route,
sub_pipeline,
retry,
} => {
if pipeline.is_atomic() || sub_pipeline {
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines (i.e., the pipeline is already routed to a specific node), we can send it as is, with no need to split it into sub-pipelines.
Expand All @@ -2229,7 +2240,7 @@ where
.await
} else {
// The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately.
Self::handle_non_atomic_pipeline_request(pipeline, core).await
Self::handle_non_atomic_pipeline_request(pipeline, core, retry).await
}
}
CmdArg::ClusterScan {
Expand Down Expand Up @@ -2263,6 +2274,7 @@ where
async fn handle_non_atomic_pipeline_request(
pipeline: Arc<crate::Pipeline>,
core: Core<C>,
retry: u32,
) -> OperationResult {
// Distribute pipeline commands across cluster nodes based on routing information.
// Returns:
Expand All @@ -2284,10 +2296,18 @@ where
// - A vector of results for each sub-pipeline execution.
// - A vector of (address, indices) pairs indicating where each response should be placed.
let (responses, addresses_and_indices) =
collect_and_send_pending_requests(pipelines_by_node, core.clone()).await;
collect_and_send_pending_requests(pipelines_by_node, core.clone(), retry).await;

// Process the responses and update the pipeline_responses
process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?;
process_and_retry_pipeline_responses(
&mut pipeline_responses,
responses,
addresses_and_indices,
&pipeline,
core,
retry,
)
.await?;

// Process response policies after all tasks are complete and aggregate the relevant commands.
Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies)
Expand Down Expand Up @@ -2658,14 +2678,23 @@ where
}
}
}

async fn handle_loading_error(
async fn handle_loading_error_and_retry(
core: Core<C>,
info: RequestInfo<C>,
address: String,
retry: u32,
retry_params: RetryParams,
) -> OperationResult {
Self::handle_loading_error(core.clone(), address, retry, retry_params).await;
Self::try_request(info, core).await
}

async fn handle_loading_error(
core: Core<C>,
address: String,
retry: u32,
retry_params: RetryParams,
) {
let is_primary = core
.conn_lock
.read()
Expand All @@ -2684,8 +2713,6 @@ where
let sleep_duration = retry_params.wait_time_for_retry(retry);
boxed_sleep(sleep_duration).await;
}

Self::try_request(info, core).await
}

fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
Expand Down Expand Up @@ -2736,7 +2763,7 @@ where
}
Next::RetryBusyLoadingError { request, address } => {
// TODO - do we also want to try and reconnect to replica if it is loading?
let future = Self::handle_loading_error(
let future = Self::handle_loading_error_and_retry(
self.inner.clone(),
request.info.clone(),
address,
Expand Down
Loading

0 comments on commit e2d5b5b

Please sign in to comment.