diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index d511e3cc02db..f4f860a8dda0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1944,12 +1944,11 @@ func (ds *DistSender) sendToReplicas( // and bubble up a SendError, which will cause a cache eviction and a new // descriptor lookup potentially unnecessarily. ds.metrics.NextReplicaErrCount.Inc(1) + lastErr := err + if err == nil { + lastErr = br.Error.GoError() + } for { - lastErr := err - if err == nil { - lastErr = br.Error.GoError() - } - if transport.IsExhausted() { return nil, noMoreReplicasErr(ambiguousError, lastErr) } @@ -1968,6 +1967,8 @@ func (ds *DistSender) sendToReplicas( curReplica = transport.NextReplica() if _, ok := routing.entry.Desc.GetReplicaDescriptorByID(curReplica.ReplicaID); ok { break + } else { + transport.SkipReplica() } } log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica.String()) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index ad8390c9df4c..2239d0144347 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -172,6 +172,13 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor { return roachpb.ReplicaDescriptor{} } +func (l *simpleTransportAdapter) SkipReplica() { + if l.IsExhausted() { + return + } + l.nextReplica++ +} + func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) { } @@ -3819,3 +3826,171 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { t.Fatal(pErr) } } + +// Test that DistSender.sendToReplicas() deals well with descriptor updates. +func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + + ns := &mockNodeStore{ + nodes: []roachpb.NodeDescriptor{ + { + NodeID: 1, + Address: util.UnresolvedAddr{}, + }, + { + NodeID: 2, + Address: util.UnresolvedAddr{}, + }, + { + NodeID: 3, + Address: util.UnresolvedAddr{}, + }, + }, + } + var desc = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + var updatedDesc = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 2, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + } + + for _, tc := range []struct { + name string + // updatedDesc, if not nil, is used to update the range cache in the middle + // of the first RPC. + updatedDesc *roachpb.RangeDescriptor + // expLeaseholder is the leaseholder that the cache is expected to be + // populated with after the RPC. If 0, the cache is expected to not have an + // entry corresponding to the descriptor in question - i.e. we expect the + // descriptor to have been evicted. + expLeaseholder roachpb.ReplicaID + }{ + { + name: "no intervening update", + // In this test, the NotLeaseHolderError will point to a replica that's + // not part of the cached descriptor. The cached descriptor is going to be + // considered stale and evicted. + updatedDesc: nil, + expLeaseholder: 0, + }, + { + name: "intervening update", + // In this test, the NotLeaseHolderError will point to a replica that's + // part of the cached descriptor (at the time when the DistSender gets the + // error). Thus, the cache entry will be updated with the lease. + updatedDesc: &updatedDesc, + expLeaseholder: 4, + }, + } { + t.Run(tc.name, func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + getRangeDescCacheSize := func() int64 { + return 1 << 20 + } + rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, stopper) + rc.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{ + NodeID: 1, StoreID: 1, ReplicaID: 1, + }, + }, + }) + ent, err := rc.Lookup(ctx, roachpb.RKeyMin) + require.NoError(t, err) + tok := EvictionToken{ + rdc: rc, + entry: ent, + } + + var called bool + var transportFn = func( + _ context.Context, + opts SendOptions, + replicas ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + // We don't expect more than one RPC because we return a lease pointing + // to a replica that's not in the descriptor that sendToReplicas() was + // originally called with. sendToReplicas() doesn't deal with that; it + // returns a sendError and expects that caller to retry. + if called { + return nil, errors.New("unexpected 2nd call") + } + called = true + nlhe := &roachpb.NotLeaseHolderError{ + RangeID: desc.RangeID, + LeaseHolder: &roachpb.ReplicaDescriptor{ + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + CustomMsg: "injected", + } + if tc.updatedDesc != nil { + rc.Insert(ctx, roachpb.RangeInfo{Desc: *tc.updatedDesc}) + } + br := &roachpb.BatchResponse{} + br.Error = roachpb.NewError(nlhe) + return br, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( + []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, + ) { + // These tests only deal with the low-level sendToReplicas(). Nobody + // should be reading descriptor from the database, but the DistSender + // insists on having a non-nil one. + return nil, nil, errors.New("range desc db unexpectedly used") + }), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + + var ba roachpb.BatchRequest + get := &roachpb.GetRequest{} + get.Key = roachpb.Key("a") + ba.Add(get) + _, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */) + require.IsType(t, sendError{}, err) + require.Regexp(t, "NotLeaseHolderError", err) + cached := rc.GetCached(desc.StartKey, false /* inverted */) + if tc.expLeaseholder == 0 { + // Check that the descriptor was removed from the cache. + require.Nil(t, cached) + } else { + require.NotNil(t, cached) + require.Equal(t, tc.expLeaseholder, cached.Lease.Replica.ReplicaID) + } + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index dbf02693a040..174158ed0e6a 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -116,6 +116,10 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { return f.replicas[f.numSent].ReplicaDescriptor } +func (f *firstNErrorTransport) SkipReplica() { + panic("SkipReplica not supported") +} + func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) { } diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index ddde5e294ff0..f207c84feeda 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -81,6 +81,10 @@ type Transport interface { // change. Returns a zero value if the transport is exhausted. NextReplica() roachpb.ReplicaDescriptor + // SkipReplica changes the replica that the next SendNext() call would sent to + // - the replica that NextReplica() would return is skipped. + SkipReplica() + // MoveToFront locates the specified replica and moves it to the // front of the ordering of replicas to try. If the replica has // already been tried, it will be retried. If the specified replica @@ -237,6 +241,14 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor { return gt.orderedClients[gt.clientIndex].replica } +// SkipReplica is part of the Transport interface. +func (gt *grpcTransport) SkipReplica() { + if gt.IsExhausted() { + return + } + gt.clientIndex++ +} + func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { gt.moveToFrontLocked(replica) } @@ -318,6 +330,7 @@ type senderTransport struct { sender kv.Sender replica roachpb.ReplicaDescriptor + // called is set once the RPC to the (one) replica is sent. called bool } @@ -377,5 +390,10 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor { return s.replica } +func (s *senderTransport) SkipReplica() { + // Skipping the (only) replica makes the transport be exhausted. + s.called = true +} + func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index c7782a779929..4c850d752d29 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -673,6 +673,13 @@ func (t *multiTestContextKVTransport) NextReplica() roachpb.ReplicaDescriptor { return t.replicas[t.idx].ReplicaDescriptor } +func (t *multiTestContextKVTransport) SkipReplica() { + if t.IsExhausted() { + return + } + t.idx++ +} + func (t *multiTestContextKVTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { t.mu.Lock() defer t.mu.Unlock()