Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: eagerly bail on Leaseholder not found in sendToReplicas #140186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 112 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,23 @@ var ProxyBatchRequest = settings.RegisterBoolSetting(
true,
)

var EagerlyBailOnLeaseholderNotFound = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.eagerly_bail_on_leaseholder_not_found.enabled",
"when true, the dist sender will eagerly return an error when it encounters errors that "+
"indicates an unavailable range, rather than retrying until the circuit breaker trips",
true,
)

var LeaseholderNotFoundBackoffThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"kv.dist_sender.leaseholder_not_found_backoff.threshold",
"determines how long we will keep backing off when we can't find the range leaseholder before "+
"returning that the range in unavailable. This only works when "+
"eagerly_bail_on_leaseholder_not_found is set to true",
time.Second*6,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -2208,7 +2225,7 @@ func (ds *DistSender) sendPartialBatch(
}

prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit, tBegin.Elapsed())

if dur := tBegin.Elapsed(); dur > slowDistSenderRangeThreshold && tBegin != 0 {
{
Expand Down Expand Up @@ -2490,6 +2507,42 @@ func selectBestError(ambiguousErr, replicaUnavailableErr, lastAttemptErr error)
return lastAttemptErr
}

// requestBypassesCircuitBreaker determines whether the batch request bypasses
// the circuit breaker or not.
func requestBypassesCircuitBreaker(ba *kvpb.BatchRequest) bool {
// If the batch contains any operation that bypasses the circuit breaker,
// return true.
for _, ru := range ba.Requests {
req := ru.GetInner()
if kvpb.BypassesReplicaCircuitBreaker(req) {
return true
}
}
return false
}

// isRangeUnavailableBasedOnNLHE determines whether the range is considered
// unavailable based on the NLHEs seen during the execution of the batch.
func (ds *DistSender) isRangeUnavailableBasedOnNLHE(
transport Transport, seenNLHE, seenOtherThanNLHE bool,
) bool {
// We consider the range as unavailable based on NotLeaseholder errors if all
// the following conditions are met:
// 1. The cluster setting is turned on.
// 2. There are no more replicas to try.
// 3. We have only seen RPC and NLHEs.
return EagerlyBailOnLeaseholderNotFound.Get(&ds.st.SV) &&
transport.IsExhausted() &&
!seenOtherThanNLHE &&
seenNLHE
}

// shouldBackoffRangeUnavailableBasedOnNLHE determines whether the DistSender
// should backoff and retry the replicas
func (ds *DistSender) shouldBackoffRangeUnavailableBasedOnNLHE(duration time.Duration) bool {
return duration < LeaseholderNotFoundBackoffThreshold.Get(&ds.st.SV)
}

// slowDistSenderRangeThreshold is a latency threshold for logging slow
// requests to a range, potentially involving RPCs to multiple replicas
// of the range.
Expand All @@ -2510,8 +2563,14 @@ const slowDistSenderReplicaThreshold = 10 * time.Second
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
// internally by retrying (NotLeaseHolderError[1], RangeNotFoundError), and
// falls back to a sendError when it runs out of replicas to try.
//
// [1] If all replicas return RPC errors and NotLeaseHolderErrors, it typically
// indicates that the range is unavailable. In that case, and if the cluster
// setting eagerly_bail_on_leaseholder_not_found.enabled is set to true,
// sendToReplicas could return a ReplicaUnavailable error if the request started
// longer than leaseholder_not_found_backoff.threshold.
//
// routing dictates what replicas will be tried (but not necessarily their
// order).
Expand All @@ -2526,7 +2585,11 @@ const slowDistSenderReplicaThreshold = 10 * time.Second
// that do not definitively rule out the possibility that the batch could have
// succeeded are transformed into AmbiguousResultErrors.
func (ds *DistSender) sendToReplicas(
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
ctx context.Context,
ba *kvpb.BatchRequest,
routing rangecache.EvictionToken,
withCommit bool,
curDuration time.Duration,
) (*kvpb.BatchResponse, error) {

// If this request can be sent to a follower to perform a consistent follower
Expand Down Expand Up @@ -2627,12 +2690,55 @@ func (ds *DistSender) sendToReplicas(
var ambiguousError, replicaUnavailableError error
var leaseholderUnavailable bool
var br *kvpb.BatchResponse

// seenNLHE is set to true if any of the replicas return NotLeaseholderError.
// Note that RPC errors aren't considered.
seenNLHE := false

// seenOtherThanNLHE is set to true if any of the replicas return an error
// other than NotLeaseholderError. Note that RPC errors aren't considered.
seenOtherThanNLHE := false

attempts := int64(0)
for first := true; ; first, attempts = false, attempts+1 {
if !first {
ds.metrics.NextReplicaErrCount.Inc(1)
}

// If the last replica returned a non-RPC error, check what it was and set
// seenNLHE, seenOtherThanNLHE accordingly.
if br != nil {
if _, ok := br.Error.GetDetail().(*kvpb.NotLeaseHolderError); ok {
seenNLHE = true
} else {
seenOtherThanNLHE = true
}
}

rangeConsideredUnavailable :=
ds.isRangeUnavailableBasedOnNLHE(transport, seenNLHE, seenOtherThanNLHE)
if rangeConsideredUnavailable && ambiguousError == nil && !requestBypassesCircuitBreaker(ba) {
// If the range is considered unavailable based on the NLHEs that we saw,
// there is no ambiguousError that we need to return, and the request
// doesn't bypass the circuit breaker, we might return a
// ReplicaUnavailable error to avoid the circuit breaker looping for this
// request for a long time. However, we first need to check if we should
// backoff before returning the error.
if !ds.shouldBackoffRangeUnavailableBasedOnNLHE(curDuration) {
if replicaUnavailableError == nil {
var replDesc roachpb.ReplicaDescriptor
if routing.Leaseholder() != nil {
// If we know the leaseholder, include it in the error.
replDesc = *routing.Leaseholder()
} else {
replDesc = prevReplica
}
replicaUnavailableError = kvpb.NewReplicaUnavailableError(
errors.New("Leaseholder is unavailable"), desc, replDesc)
}
}
}

// Advance through the transport's replicas until we find one that's still
// part of routing.entry.Desc. The transport starts up initialized with
// routing's replica info, but routing can be updated as we go through the
Expand Down Expand Up @@ -3016,6 +3122,8 @@ func (ds *DistSender) sendToReplicas(
if updatedLeaseholder {
leaseholderUnavailable = false
routeToLeaseholder = true
seenNLHE = false
seenOtherThanNLHE = false
// If we changed the leaseholder, reset the transport to try all the
// replicas in order again. After a leaseholder change, requests to
// followers will be marked as potential proxy requests and point to
Expand Down
Loading