Skip to content

Commit

Permalink
range unavailable
Browse files Browse the repository at this point in the history
Just a prototype

Epic: none

Release note: None
  • Loading branch information
iskettaneh committed Jan 31, 2025
1 parent 281a25b commit 049c9ab
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 39 deletions.
100 changes: 100 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -415,6 +416,20 @@ var ProxyBatchRequest = settings.RegisterBoolSetting(
true,
)

var EagerlyBailOnUnavailableRanges = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.eagerly_bail_on_unavailable_ranges.enabled",
"when true, the dist sender will eagerly return an error when it encounters errors that indicates an unavailable range, rather than retrying until the circuit breaker trips",
false,
)

var BackoffUnavailableRangesDuration = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"kv.dist_sender.unavailable_range_backoff_duration.enabled",
"determines how long we will backoff when we encounter an error that indicates an unavailable range. This only works when eagerly_bail_on_unavailable_ranges is set to true",
time.Second*6,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -2210,6 +2225,7 @@ func (ds *DistSender) sendPartialBatch(
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

fmt.Printf("!!! IBRAHIM !!! sendPartial reply: %+v, err: %+v\n", reply, err)
if dur := tBegin.Elapsed(); dur > slowDistSenderRangeThreshold && tBegin != 0 {
{
var s redact.StringBuilder
Expand Down Expand Up @@ -2490,6 +2506,46 @@ func selectBestError(ambiguousErr, replicaUnavailableErr, lastAttemptErr error)
return lastAttemptErr
}

func (ds *DistSender) isRangeEffectivelyUnavailable(
transport Transport, ba *kvpb.BatchRequest, seenNLHE, seenOtherThanNLHE bool,
) bool {
if !EagerlyBailOnUnavailableRanges.Get(&ds.st.SV) || !transport.IsExhausted() {
// We don't consider the range as unavailable based on NotLeaseholder errors
// in the following cases:
// 1. The cluster setting is turned off.
// 2. There are still replicas to try.
// 3. We've seen errors other than NotLeaseholderError. In that case, we
// will be returning the other error. Note that RPC errors are excluded.
return false
}

// If we haven't seen NotLeaseholder error, or we've seen other errors, we
// shouldn't consider this range as unavailable.
if seenOtherThanNLHE || !seenNLHE {
return false
}

// If this batch contains requests that bypass the replica circuit breaker,
// we shouldn't bail on it. The reason is that these requests expect us to
// keep trying, and should eventually succeed.
for _, ru := range ba.Requests {
req := ru.GetInner()
if kvpb.BypassesReplicaCircuitBreaker(req) {
return false
}
}

return true
}

func (ds *DistSender) shouldBackoffRangeUnavailable(ts time.Time, ambiguousErr error) bool {
if ambiguousErr != nil {
//fmt.Printf("!!! IBRAHIM !!! ambiguousErr: %+v\n", ambiguousErr)
return false
}
return timeutil.Now().Before(ts.Add(BackoffUnavailableRangesDuration.Get(&ds.st.SV)))
}

// slowDistSenderRangeThreshold is a latency threshold for logging slow
// requests to a range, potentially involving RPCs to multiple replicas
// of the range.
Expand Down Expand Up @@ -2529,6 +2585,8 @@ func (ds *DistSender) sendToReplicas(
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
) (*kvpb.BatchResponse, error) {

//fmt.Printf("!!! IBRAHIM !!! fresh call to sendToReplicas\n")

// If this request can be sent to a follower to perform a consistent follower
// read under the closed timestamp, promote its routing policy to NEAREST.
// If we don't know the closed timestamp policy, we ought to optimistically
Expand Down Expand Up @@ -2627,12 +2685,51 @@ func (ds *DistSender) sendToReplicas(
var ambiguousError, replicaUnavailableError error
var leaseholderUnavailable bool
var br *kvpb.BatchResponse

seenNLHE := false
seenOtherThanNLHE := false

ts := timeutil.Now()
attempts := int64(0)
for first := true; ; first, attempts = false, attempts+1 {
if !first {
ds.metrics.NextReplicaErrCount.Inc(1)
}

if br != nil {
if _, ok := br.Error.GetDetail().(*kvpb.NotLeaseHolderError); ok {
//fmt.Printf("!!! IBRAHIM !!! seen NLHE: %+v\n", br.Error.GetDetail())
seenNLHE = true
} else {
//fmt.Printf("!!! IBRAHIM !!! seen non NLHE: %+v\n", br.Error.GetDetail())
seenOtherThanNLHE = true
}
}

rangeEffectivelyUnavailable := ds.isRangeEffectivelyUnavailable(transport, ba, seenNLHE, seenOtherThanNLHE)
//fmt.Printf("!!! IBRAHIM !!! rangeEffectivelyUnavailable: %+v\n", rangeEffectivelyUnavailable)
if rangeEffectivelyUnavailable {
if ds.shouldBackoffRangeUnavailable(ts, ambiguousError) {
//fmt.Printf("!!! IBRAHIM !!! backing off \n")
// If we think that the range is unavailable, but it could be due to a
// transient issue, we backoff before retrying. We will reset everything
// and retry all the replicas again.
leaseholderUnavailable = false
seenNLHE = false
seenOtherThanNLHE = false
transport.Reset()
inTransferRetry.Next()
} else {
//fmt.Printf("!!! IBRAHIM !!! not backing off \n")
if replicaUnavailableError == nil {

// If there isn't already a replica unavailable error, and we know that
// the range is unavailable,
replicaUnavailableError = kvpb.NewReplicaUnavailableError(errors.New("Leaseholder is unavailable"), desc, prevReplica)
//fmt.Printf("!!! IBRAHIM !!! replicaUnavailableError: %+v\n", replicaUnavailableError)
}
}
}
// Advance through the transport's replicas until we find one that's still
// part of routing.entry.Desc. The transport starts up initialized with
// routing's replica info, but routing can be updated as we go through the
Expand Down Expand Up @@ -3016,6 +3113,8 @@ func (ds *DistSender) sendToReplicas(
if updatedLeaseholder {
leaseholderUnavailable = false
routeToLeaseholder = true
seenNLHE = false
seenOtherThanNLHE = false
// If we changed the leaseholder, reset the transport to try all the
// replicas in order again. After a leaseholder change, requests to
// followers will be marked as potential proxy requests and point to
Expand Down Expand Up @@ -3061,6 +3160,7 @@ func (ds *DistSender) sendToReplicas(
)
}
}

// Check whether the request was intentionally sent to a follower
// replica to perform a follower read. In such cases, the follower
// may reject the request with a NotLeaseHolderError if it does not
Expand Down
99 changes: 96 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,13 @@ func TestImmutableBatchArgs(t *testing.T) {
func TestErrorWithCancellationExit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
//retriableErr := kvpb.NewError(newSendError(errors.Errorf("retryable")))
retriableErr := kvpb.NewError(
&kvpb.NotLeaseHolderError{
Replica: testUserRangeDescriptor.InternalReplicas[0],
&kvpb.RangeNotFoundError{
RangeID: testUserRangeDescriptor.RangeID,
StoreID: testUserRangeDescriptor.InternalReplicas[0].StoreID,
})

terminalErr := kvpb.NewErrorf("boom")

tests := []struct {
Expand Down Expand Up @@ -704,6 +707,9 @@ func TestErrorWithCancellationExit(t *testing.T) {
return reply, nil
}

st := cluster.MakeTestingClusterSettings()
//BackoffUnavailableRangesDuration.Override(context.Background(), &st.SV, time.Duration(0))

cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
Expand All @@ -716,6 +722,91 @@ func TestErrorWithCancellationExit(t *testing.T) {
},
TransportFactory: adaptSimpleTransport(testFn),
RangeDescriptorDB: defaultMockRangeDescriptorDB,
Settings: st,
}

ds := NewDistSender(cfg)
// Start a request that runs through distSender.
put := kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))
_, pErr := kv.SendWrapped(ctx, ds, put)
if tc.expectedErr == "" {
require.Nil(t, pErr)
} else {
require.NotNil(t, pErr)
//fmt.Printf("!!! IBRAHIM !!! pERR: %v\n", pErr)
require.True(t, testutils.IsPError(pErr, tc.expectedErr))
}
})
}
}

func TestIbrahim(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

notLeaseholderErr := kvpb.NewError(
&kvpb.NotLeaseHolderError{
Replica: testUserRangeDescriptor3Replicas.InternalReplicas[0],
})

rangeNotFoundErr := kvpb.NewError(
&kvpb.RangeNotFoundError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
StoreID: testUserRangeDescriptor3Replicas.InternalReplicas[0].StoreID,
})

tests := []struct {
name string
errors []*kvpb.Error
expectedErr string
}{

{
name: "terminal error",
errors: []*kvpb.Error{notLeaseholderErr, rangeNotFoundErr, rangeNotFoundErr},
expectedErr: "boom",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
retryCount := atomic.Int64{}

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)
var testFn simpleSendFn = func(_ context.Context, _ *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
reply := &kvpb.BatchResponse{}

// Set a response so we don't get an out of bounds err in the non-error case.
var union kvpb.ResponseUnion
union.MustSetInner(&kvpb.PutResponse{})
reply.Responses = []kvpb.ResponseUnion{union}

// Count the number of times we are running.
count := retryCount.Add(1)
fmt.Printf("!!! IBRAHIM TEST !!! count: %v\n", count)

// Return a retriable error twice before running cancellation.
reply.Error = tc.errors[count-1]
return reply, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
Stopper: stopper,
// Retry very quickly to make this test finish fast.
RPCRetryOptions: &retry.Options{
InitialBackoff: time.Millisecond,
MaxBackoff: time.Millisecond,
},
TransportFactory: adaptSimpleTransport(testFn),
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)
Expand Down Expand Up @@ -5472,6 +5563,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
// itself.

st := cluster.MakeTestingClusterSettings()
EagerlyBailOnUnavailableRanges.Override(context.Background(), &st.SV, false)

tr := tracing.NewTracer()
getRangeDescCacheSize := func() int64 {
return 1 << 20
Expand Down Expand Up @@ -5530,7 +5623,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
return nil, nil, errors.New("range desc db unexpectedly used")
}),
TransportFactory: adaptSimpleTransport(transportFn),
Settings: cluster.MakeTestingClusterSettings(),
Settings: st,
}

ds := NewDistSender(cfg)
Expand Down
88 changes: 56 additions & 32 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,39 +222,63 @@ func (sm *SupportManager) startLoop(ctx context.Context) {
idleSupportFromTicker := time.NewTicker(sm.options.IdleSupportFromInterval)
defer idleSupportFromTicker.Stop()

var counter int
for {
// NOTE: only listen to the receive queue's signal if we don't already have
// stores to add, heartbeats to send, or support to check. This prevents a
// constant flow of inbound messages from delaying the other work due to the
// random selection between multiple enabled channels.
var receiveQueueSig <-chan struct{}
if len(heartbeatTicker.C) == 0 &&
len(supportExpiryTicker.C) == 0 &&
len(sm.storesToAdd.sig) == 0 {
receiveQueueSig = sm.receiveQueue.Sig()
}

select {
case <-sm.storesToAdd.sig:
sm.maybeAddStores(ctx)
sm.sendHeartbeats(ctx)

case <-heartbeatTicker.C:
sm.sendHeartbeats(ctx)

case <-supportExpiryTicker.C:
sm.withdrawSupport(ctx)

case <-idleSupportFromTicker.C:
sm.requesterStateHandler.markIdleStores(ctx)

case <-receiveQueueSig:
// Decrementing the queue metrics is done in handleMessages.
msgs := sm.receiveQueue.Drain()
sm.handleMessages(ctx, msgs)

case <-sm.stopper.ShouldQuiesce():
return
switch counter % 7 {
case 0:
select {
case <-sm.storesToAdd.sig:
sm.maybeAddStores(ctx)
sm.sendHeartbeats(ctx)
counter++
default:
counter++
}
case 1:
select {
case <-heartbeatTicker.C:
sm.sendHeartbeats(ctx)
counter++
default:
counter++
}
case 2:
select {
case <-supportExpiryTicker.C:
sm.withdrawSupport(ctx)
counter++
default:
counter++
}
case 3:
select {
case <-idleSupportFromTicker.C:
sm.requesterStateHandler.markIdleStores(ctx)
counter++
default:
counter++
}
case 4:
select {
case <-sm.receiveQueue.Sig():
msgs := sm.receiveQueue.Drain()
sm.handleMessages(ctx, msgs)
counter++
default:
counter++
}
case 5:
select {
case <-sm.stopper.ShouldQuiesce():
return
default:
counter++
}
case 6:
select {
case <-time.After(10 * time.Millisecond):
counter++
}
}
}
}
Expand Down
Loading

0 comments on commit 049c9ab

Please sign in to comment.