Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: fix an infinite loop in the DistSender #51085

Merged
merged 1 commit into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,12 +1944,11 @@ func (ds *DistSender) sendToReplicas(
// and bubble up a SendError, which will cause a cache eviction and a new
// descriptor lookup potentially unnecessarily.
ds.metrics.NextReplicaErrCount.Inc(1)
lastErr := err
if err == nil {
lastErr = br.Error.GoError()
}
for {
lastErr := err
if err == nil {
lastErr = br.Error.GoError()
}

if transport.IsExhausted() {
return nil, noMoreReplicasErr(ambiguousError, lastErr)
}
Expand All @@ -1968,6 +1967,8 @@ func (ds *DistSender) sendToReplicas(
curReplica = transport.NextReplica()
if _, ok := routing.entry.Desc.GetReplicaDescriptorByID(curReplica.ReplicaID); ok {
break
} else {
transport.SkipReplica()
}
}
log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica.String())
Expand Down
175 changes: 175 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor {
return roachpb.ReplicaDescriptor{}
}

func (l *simpleTransportAdapter) SkipReplica() {
if l.IsExhausted() {
return
}
l.nextReplica++
}

func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) {
}

Expand Down Expand Up @@ -3819,3 +3826,171 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) {
t.Fatal(pErr)
}
}

// Test that DistSender.sendToReplicas() deals well with descriptor updates.
func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)

ns := &mockNodeStore{
nodes: []roachpb.NodeDescriptor{
{
NodeID: 1,
Address: util.UnresolvedAddr{},
},
{
NodeID: 2,
Address: util.UnresolvedAddr{},
},
{
NodeID: 3,
Address: util.UnresolvedAddr{},
},
},
}
var desc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
var updatedDesc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 4, StoreID: 4, ReplicaID: 4},
},
}

for _, tc := range []struct {
name string
// updatedDesc, if not nil, is used to update the range cache in the middle
// of the first RPC.
updatedDesc *roachpb.RangeDescriptor
// expLeaseholder is the leaseholder that the cache is expected to be
// populated with after the RPC. If 0, the cache is expected to not have an
// entry corresponding to the descriptor in question - i.e. we expect the
// descriptor to have been evicted.
expLeaseholder roachpb.ReplicaID
}{
{
name: "no intervening update",
// In this test, the NotLeaseHolderError will point to a replica that's
// not part of the cached descriptor. The cached descriptor is going to be
// considered stale and evicted.
updatedDesc: nil,
expLeaseholder: 0,
},
{
name: "intervening update",
// In this test, the NotLeaseHolderError will point to a replica that's
// part of the cached descriptor (at the time when the DistSender gets the
// error). Thus, the cache entry will be updated with the lease.
updatedDesc: &updatedDesc,
expLeaseholder: 4,
},
} {
t.Run(tc.name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{
NodeID: 1, StoreID: 1, ReplicaID: 1,
},
},
})
ent, err := rc.Lookup(ctx, roachpb.RKeyMin)
require.NoError(t, err)
tok := EvictionToken{
rdc: rc,
entry: ent,
}

var called bool
var transportFn = func(
_ context.Context,
opts SendOptions,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
// We don't expect more than one RPC because we return a lease pointing
// to a replica that's not in the descriptor that sendToReplicas() was
// originally called with. sendToReplicas() doesn't deal with that; it
// returns a sendError and expects that caller to retry.
if called {
return nil, errors.New("unexpected 2nd call")
}
called = true
nlhe := &roachpb.NotLeaseHolderError{
RangeID: desc.RangeID,
LeaseHolder: &roachpb.ReplicaDescriptor{
NodeID: 4,
StoreID: 4,
ReplicaID: 4,
},
CustomMsg: "injected",
}
if tc.updatedDesc != nil {
rc.Insert(ctx, roachpb.RangeInfo{Desc: *tc.updatedDesc})
}
br := &roachpb.BatchResponse{}
br.Error = roachpb.NewError(nlhe)
return br, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) (
[]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error,
) {
// These tests only deal with the low-level sendToReplicas(). Nobody
// should be reading descriptor from the database, but the DistSender
// insists on having a non-nil one.
return nil, nil, errors.New("range desc db unexpectedly used")
}),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)

var ba roachpb.BatchRequest
get := &roachpb.GetRequest{}
get.Key = roachpb.Key("a")
ba.Add(get)
_, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */)
require.IsType(t, sendError{}, err)
require.Regexp(t, "NotLeaseHolderError", err)
cached := rc.GetCached(desc.StartKey, false /* inverted */)
if tc.expLeaseholder == 0 {
// Check that the descriptor was removed from the cache.
require.Nil(t, cached)
} else {
require.NotNil(t, cached)
require.Equal(t, tc.expLeaseholder, cached.Lease.Replica.ReplicaID)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor {
return f.replicas[f.numSent].ReplicaDescriptor
}

func (f *firstNErrorTransport) SkipReplica() {
panic("SkipReplica not supported")
}

func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) {
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type Transport interface {
// change. Returns a zero value if the transport is exhausted.
NextReplica() roachpb.ReplicaDescriptor

// SkipReplica changes the replica that the next SendNext() call would sent to
// - the replica that NextReplica() would return is skipped.
SkipReplica()

// MoveToFront locates the specified replica and moves it to the
// front of the ordering of replicas to try. If the replica has
// already been tried, it will be retried. If the specified replica
Expand Down Expand Up @@ -237,6 +241,14 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor {
return gt.orderedClients[gt.clientIndex].replica
}

// SkipReplica is part of the Transport interface.
func (gt *grpcTransport) SkipReplica() {
if gt.IsExhausted() {
return
}
gt.clientIndex++
}

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
gt.moveToFrontLocked(replica)
}
Expand Down Expand Up @@ -318,6 +330,7 @@ type senderTransport struct {
sender kv.Sender
replica roachpb.ReplicaDescriptor

// called is set once the RPC to the (one) replica is sent.
called bool
}

Expand Down Expand Up @@ -377,5 +390,10 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor {
return s.replica
}

func (s *senderTransport) SkipReplica() {
// Skipping the (only) replica makes the transport be exhausted.
s.called = true
}

func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ func (t *multiTestContextKVTransport) NextReplica() roachpb.ReplicaDescriptor {
return t.replicas[t.idx].ReplicaDescriptor
}

func (t *multiTestContextKVTransport) SkipReplica() {
if t.IsExhausted() {
return
}
t.idx++
}

func (t *multiTestContextKVTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down