From 8aff78bb9b28392f5c4f37c235936d2963cac28d Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 7 Jul 2020 15:33:29 -0400 Subject: [PATCH] kvclient: fix an infinite loop in the DistSender This patch fixes some silly code which deals with the situation in which sendToReplicas() needs to try another replica, but some of the replicas with which it started are known to be stale. The code tries to skip the stale replicas except that instead of skipping anything, it was just looping endlessly. This should fix recent timeouts of tests with stacktraces in DistSender.sendToReplicas(). Fixes #51061 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 11 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 175 ++++++++++++++++++++ pkg/kv/kvclient/kvcoord/send_test.go | 4 + pkg/kv/kvclient/kvcoord/transport.go | 18 ++ pkg/kv/kvserver/client_test.go | 7 + 5 files changed, 210 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index d511e3cc02db..f4f860a8dda0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1944,12 +1944,11 @@ func (ds *DistSender) sendToReplicas( // and bubble up a SendError, which will cause a cache eviction and a new // descriptor lookup potentially unnecessarily. ds.metrics.NextReplicaErrCount.Inc(1) + lastErr := err + if err == nil { + lastErr = br.Error.GoError() + } for { - lastErr := err - if err == nil { - lastErr = br.Error.GoError() - } - if transport.IsExhausted() { return nil, noMoreReplicasErr(ambiguousError, lastErr) } @@ -1968,6 +1967,8 @@ func (ds *DistSender) sendToReplicas( curReplica = transport.NextReplica() if _, ok := routing.entry.Desc.GetReplicaDescriptorByID(curReplica.ReplicaID); ok { break + } else { + transport.SkipReplica() } } log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica.String()) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index ad8390c9df4c..2239d0144347 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -172,6 +172,13 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor { return roachpb.ReplicaDescriptor{} } +func (l *simpleTransportAdapter) SkipReplica() { + if l.IsExhausted() { + return + } + l.nextReplica++ +} + func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) { } @@ -3819,3 +3826,171 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { t.Fatal(pErr) } } + +// Test that DistSender.sendToReplicas() deals well with descriptor updates. +func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + + ns := &mockNodeStore{ + nodes: []roachpb.NodeDescriptor{ + { + NodeID: 1, + Address: util.UnresolvedAddr{}, + }, + { + NodeID: 2, + Address: util.UnresolvedAddr{}, + }, + { + NodeID: 3, + Address: util.UnresolvedAddr{}, + }, + }, + } + var desc = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + var updatedDesc = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 2, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + } + + for _, tc := range []struct { + name string + // updatedDesc, if not nil, is used to update the range cache in the middle + // of the first RPC. + updatedDesc *roachpb.RangeDescriptor + // expLeaseholder is the leaseholder that the cache is expected to be + // populated with after the RPC. If 0, the cache is expected to not have an + // entry corresponding to the descriptor in question - i.e. we expect the + // descriptor to have been evicted. + expLeaseholder roachpb.ReplicaID + }{ + { + name: "no intervening update", + // In this test, the NotLeaseHolderError will point to a replica that's + // not part of the cached descriptor. The cached descriptor is going to be + // considered stale and evicted. + updatedDesc: nil, + expLeaseholder: 0, + }, + { + name: "intervening update", + // In this test, the NotLeaseHolderError will point to a replica that's + // part of the cached descriptor (at the time when the DistSender gets the + // error). Thus, the cache entry will be updated with the lease. + updatedDesc: &updatedDesc, + expLeaseholder: 4, + }, + } { + t.Run(tc.name, func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + getRangeDescCacheSize := func() int64 { + return 1 << 20 + } + rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, stopper) + rc.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{ + NodeID: 1, StoreID: 1, ReplicaID: 1, + }, + }, + }) + ent, err := rc.Lookup(ctx, roachpb.RKeyMin) + require.NoError(t, err) + tok := EvictionToken{ + rdc: rc, + entry: ent, + } + + var called bool + var transportFn = func( + _ context.Context, + opts SendOptions, + replicas ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + // We don't expect more than one RPC because we return a lease pointing + // to a replica that's not in the descriptor that sendToReplicas() was + // originally called with. sendToReplicas() doesn't deal with that; it + // returns a sendError and expects that caller to retry. + if called { + return nil, errors.New("unexpected 2nd call") + } + called = true + nlhe := &roachpb.NotLeaseHolderError{ + RangeID: desc.RangeID, + LeaseHolder: &roachpb.ReplicaDescriptor{ + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + CustomMsg: "injected", + } + if tc.updatedDesc != nil { + rc.Insert(ctx, roachpb.RangeInfo{Desc: *tc.updatedDesc}) + } + br := &roachpb.BatchResponse{} + br.Error = roachpb.NewError(nlhe) + return br, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( + []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, + ) { + // These tests only deal with the low-level sendToReplicas(). Nobody + // should be reading descriptor from the database, but the DistSender + // insists on having a non-nil one. + return nil, nil, errors.New("range desc db unexpectedly used") + }), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + + var ba roachpb.BatchRequest + get := &roachpb.GetRequest{} + get.Key = roachpb.Key("a") + ba.Add(get) + _, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */) + require.IsType(t, sendError{}, err) + require.Regexp(t, "NotLeaseHolderError", err) + cached := rc.GetCached(desc.StartKey, false /* inverted */) + if tc.expLeaseholder == 0 { + // Check that the descriptor was removed from the cache. + require.Nil(t, cached) + } else { + require.NotNil(t, cached) + require.Equal(t, tc.expLeaseholder, cached.Lease.Replica.ReplicaID) + } + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index dbf02693a040..174158ed0e6a 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -116,6 +116,10 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { return f.replicas[f.numSent].ReplicaDescriptor } +func (f *firstNErrorTransport) SkipReplica() { + panic("SkipReplica not supported") +} + func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) { } diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index ddde5e294ff0..f207c84feeda 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -81,6 +81,10 @@ type Transport interface { // change. Returns a zero value if the transport is exhausted. NextReplica() roachpb.ReplicaDescriptor + // SkipReplica changes the replica that the next SendNext() call would sent to + // - the replica that NextReplica() would return is skipped. + SkipReplica() + // MoveToFront locates the specified replica and moves it to the // front of the ordering of replicas to try. If the replica has // already been tried, it will be retried. If the specified replica @@ -237,6 +241,14 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor { return gt.orderedClients[gt.clientIndex].replica } +// SkipReplica is part of the Transport interface. +func (gt *grpcTransport) SkipReplica() { + if gt.IsExhausted() { + return + } + gt.clientIndex++ +} + func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { gt.moveToFrontLocked(replica) } @@ -318,6 +330,7 @@ type senderTransport struct { sender kv.Sender replica roachpb.ReplicaDescriptor + // called is set once the RPC to the (one) replica is sent. called bool } @@ -377,5 +390,10 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor { return s.replica } +func (s *senderTransport) SkipReplica() { + // Skipping the (only) replica makes the transport be exhausted. + s.called = true +} + func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index c7782a779929..4c850d752d29 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -673,6 +673,13 @@ func (t *multiTestContextKVTransport) NextReplica() roachpb.ReplicaDescriptor { return t.replicas[t.idx].ReplicaDescriptor } +func (t *multiTestContextKVTransport) SkipReplica() { + if t.IsExhausted() { + return + } + t.idx++ +} + func (t *multiTestContextKVTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { t.mu.Lock() defer t.mu.Unlock()