|
| 1 | +use scylla_cql::frame::response::error::DbError; |
| 2 | +use tracing::{error, warn}; |
| 3 | + |
1 | 4 | use crate::client::caching_session::CachingSession;
|
2 | 5 | use crate::client::execution_profile::ExecutionProfile;
|
3 | 6 | use crate::client::session::Session;
|
4 | 7 | use crate::client::session_builder::{GenericSessionBuilder, SessionBuilderKind};
|
5 | 8 | use crate::cluster::ClusterState;
|
6 | 9 | use crate::cluster::NodeRef;
|
7 |
| -use crate::errors::ExecutionError; |
| 10 | +use crate::errors::{ExecutionError, RequestAttemptError}; |
8 | 11 | use crate::network::Connection;
|
9 | 12 | use crate::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
|
| 13 | +use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; |
10 | 14 | use crate::query::Query;
|
11 | 15 | use crate::routing::Shard;
|
12 | 16 | use std::sync::Arc;
|
@@ -142,12 +146,54 @@ impl LoadBalancingPolicy for SchemaQueriesLBP {
|
142 | 146 | }
|
143 | 147 | }
|
144 | 148 |
|
| 149 | +#[derive(Debug, Default)] |
| 150 | +struct SchemaQueriesRetrySession { |
| 151 | + count: usize, |
| 152 | +} |
| 153 | + |
| 154 | +impl RetrySession for SchemaQueriesRetrySession { |
| 155 | + fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision { |
| 156 | + match request_info.error { |
| 157 | + RequestAttemptError::DbError(DbError::ServerError, s) |
| 158 | + if s == "Failed to apply group 0 change due to concurrent modification" => |
| 159 | + { |
| 160 | + self.count += 1; |
| 161 | + // Give up if there are many failures. |
| 162 | + // In this case we really should do something about it in the |
| 163 | + // core, because it is absurd for DDL queries to fail this often. |
| 164 | + if self.count >= 10 { |
| 165 | + error!("Received TENTH(!) group 0 concurrent modification error during DDL. Please fix Scylla Core."); |
| 166 | + RetryDecision::DontRetry |
| 167 | + } else { |
| 168 | + warn!("Received group 0 concurrent modification error during DDL. Performing retry #{}.", self.count); |
| 169 | + RetryDecision::RetrySameNode(None) |
| 170 | + } |
| 171 | + } |
| 172 | + _ => RetryDecision::DontRetry, |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + fn reset(&mut self) { |
| 177 | + *self = Default::default() |
| 178 | + } |
| 179 | +} |
| 180 | + |
| 181 | +#[derive(Debug)] |
| 182 | +struct SchemaQueriesRetryPolicy; |
| 183 | + |
| 184 | +impl RetryPolicy for SchemaQueriesRetryPolicy { |
| 185 | + fn new_session(&self) -> Box<dyn RetrySession> { |
| 186 | + Box::new(SchemaQueriesRetrySession::default()) |
| 187 | + } |
| 188 | +} |
| 189 | + |
145 | 190 | fn apply_ddl_lbp(query: &mut Query) {
|
146 | 191 | let policy = query
|
147 | 192 | .get_execution_profile_handle()
|
148 | 193 | .map(|profile| profile.pointee_to_builder())
|
149 | 194 | .unwrap_or(ExecutionProfile::builder())
|
150 | 195 | .load_balancing_policy(Arc::new(SchemaQueriesLBP))
|
| 196 | + .retry_policy(Arc::new(SchemaQueriesRetryPolicy)) |
151 | 197 | .build();
|
152 | 198 | query.set_execution_profile_handle(Some(policy.into_handle()));
|
153 | 199 | }
|
|
0 commit comments