diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index f1dba3d6cd5f..05377b29cb05 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -16,6 +16,10 @@ pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generi pkg/roachpb/batch.go://go:generate go run -tags gen-batch gen/main.go pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated.go -source=cert.go . Cert pkg/cmd/roachtest/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mock_generated.go -source=prometheus.go . cluster +pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB +pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go . +pkg/kv/kvclient/kvcoord/transport.go://go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport +pkg/roachpb/api.go://go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs pkg/security/securitytest/securitytest.go://go:generate gofmt -s -w embedded.go pkg/security/securitytest/securitytest.go://go:generate goimports -w embedded.go diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 20003bbc9c09..43e4ecf3ac5d 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -1,3 +1,4 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("//build:STRINGER.bzl", "stringer") @@ -69,12 +70,21 @@ go_library( ], ) +gomock( + name = "mocks_transport", + out = "mocks_generated_test.go", + interfaces = ["Transport"], + library = ":kvcoord", + package = "kvcoord_test", +) + go_test( name = "kvcoord_test", size = "medium", srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", "integration_test.go", @@ -95,6 +105,7 @@ go_test( "txn_interceptor_seq_num_allocator_test.go", "txn_interceptor_span_refresher_test.go", "txn_test.go", + "mocks_generated_test.go", # keep ], data = glob(["testdata/**"]), embed = [":kvcoord"], @@ -146,6 +157,7 @@ go_test( "@com_github_cockroachdb_errors//errutil", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", + "@com_github_golang_mock//gomock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//:go_default_library", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 0d38a96f9cb3..b18a0633f74d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -164,7 +164,7 @@ func (ds *DistSender) partialRangeFeed( // These errors are likely to be unique to the replica that // reported them, so no action is required before the next // retry. - case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): + case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): // Evict the descriptor from the cache and reload on next attempt. rangeInfo.token.Evict(ctx) rangeInfo.token = rangecache.EvictionToken{} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go new file mode 100644 index 000000000000..760f38abeb9e --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -0,0 +1,155 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord_test + +import ( + "context" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(spec.errorCode.String(), func(t *testing.T) { + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := kvcoord.TestingMakeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + kvcoord.TestingNewNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + ctx, nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client := roachpb.NewMockInternalClient(ctrl) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) + transport.EXPECT().Release() + } + + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: kvcoord.ClientTestingKnobs{ + TransportFactory: func(kvcoord.SendOptions, *nodedialer.Dialer, kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.RangeDescriptorCache().Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + err := ds.RangeFeed(ctx, roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, hlc.Timestamp{}, false, nil) + require.Error(t, err) + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 2277bcc9caec..edd0d21b799e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -200,12 +200,14 @@ func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) func (l *simpleTransportAdapter) Release() {} -func makeGossip(t *testing.T, stopper *stop.Stopper, rpcContext *rpc.Context) *gossip.Gossip { +func TestingMakeGossip( + t *testing.T, stopper *stop.Stopper, rpcContext *rpc.Context, +) *gossip.Gossip { server := rpc.NewServer(rpcContext) const nodeID = 1 g := gossip.NewTest(nodeID, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) - if err := g.SetNodeDescriptor(newNodeDesc(nodeID)); err != nil { + if err := g.SetNodeDescriptor(TestingNewNodeDesc(nodeID)); err != nil { t.Fatal(err) } if err := g.AddInfo(gossip.KeySentinel, nil, time.Hour); err != nil { @@ -215,7 +217,7 @@ func makeGossip(t *testing.T, stopper *stop.Stopper, rpcContext *rpc.Context) *g return g } -func newNodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor { +func TestingNewNodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor { return &roachpb.NodeDescriptor{ NodeID: nodeID, Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("invalid.invalid:%d", nodeID)), @@ -234,7 +236,7 @@ func TestSendRPCOrder(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) rangeID := roachpb.RangeID(99) nodeTiers := map[int32][]roachpb.Tier{ @@ -535,7 +537,7 @@ func TestImmutableBatchArgs(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) var testFn simpleSendFn = func( _ context.Context, args roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { @@ -645,11 +647,11 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { require.NoError(t, g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, )) } @@ -724,12 +726,12 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) repls := testUserRangeDescriptor3Replicas.InternalReplicas for _, n := range repls { if err := g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, ); err != nil { t.Fatal(err) @@ -844,12 +846,12 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) repls := testUserRangeDescriptor3Replicas.InternalReplicas for _, n := range repls { if err := g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, ); err != nil { t.Fatal(err) @@ -905,11 +907,11 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { require.NoError(t, g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, )) } @@ -998,11 +1000,11 @@ func TestDistSenderRetryOnTransportErrors(t *testing.T) { t.Run(fmt.Sprintf("retry_after_%v", spec.errorCode), func(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { require.NoError(t, g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, )) } @@ -1085,10 +1087,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) if err := g.AddInfoProto( gossip.MakeNodeIDKey(roachpb.NodeID(2)), - newNodeDesc(2), + TestingNewNodeDesc(2), gossip.NodeDescriptorTTL, ); err != nil { t.Fatal(err) @@ -1195,7 +1197,7 @@ func TestRetryOnDescriptorLookupError(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) errs := []error{ errors.New("boom"), @@ -1247,7 +1249,7 @@ func TestEvictOnFirstRangeGossip(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) sender := func( _ context.Context, ba roachpb.BatchRequest, @@ -1398,7 +1400,7 @@ func TestEvictCacheOnError(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) leaseHolder := roachpb.ReplicaDescriptor{ NodeID: 99, StoreID: 999, @@ -1469,11 +1471,11 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) // Gossip the two nodes referred to in testUserRangeDescriptor3Replicas. for i := 2; i <= 3; i++ { - nd := newNodeDesc(roachpb.NodeID(i)) + nd := TestingNewNodeDesc(roachpb.NodeID(i)) if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(i)), nd, time.Hour); err != nil { t.Fatal(err) } @@ -1531,7 +1533,7 @@ func TestRetryOnWrongReplicaError(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) if err := g.AddInfoProto(gossip.KeyFirstRangeDescriptor, &testMetaRangeDescriptor, time.Hour); err != nil { t.Fatal(err) } @@ -1627,7 +1629,7 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) if err := g.AddInfoProto(gossip.KeyFirstRangeDescriptor, &testMetaRangeDescriptor, time.Hour); err != nil { t.Fatal(err) } @@ -1779,8 +1781,8 @@ func TestSendRPCRetry(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + g := TestingMakeGossip(t, stopper, rpcContext) + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } @@ -1850,8 +1852,8 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + g := TestingMakeGossip(t, stopper, rpcContext) + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } @@ -1994,8 +1996,8 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + g := TestingMakeGossip(t, stopper, rpcContext) + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } @@ -2089,7 +2091,7 @@ func TestGetNodeDescriptor(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -2099,7 +2101,7 @@ func TestGetNodeDescriptor(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), }) g.NodeID.Reset(5) - if err := g.SetNodeDescriptor(newNodeDesc(5)); err != nil { + if err := g.SetNodeDescriptor(TestingNewNodeDesc(5)); err != nil { t.Fatal(err) } testutils.SucceedsSoon(t, func() error { @@ -2119,7 +2121,7 @@ func TestMultiRangeGapReverse(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) var descs []roachpb.RangeDescriptor splits := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c"), roachpb.Key("d")} @@ -2213,7 +2215,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) // Assume we have two ranges, [a-b) and [b-KeyMax). merged := false // The stale first range descriptor which is unaware of the merge. @@ -2314,7 +2316,7 @@ func TestRangeLookupOptionOnReverseScan(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -2353,7 +2355,7 @@ func TestClockUpdateOnResponse(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -2412,8 +2414,8 @@ func TestTruncateWithSpanAndDescriptor(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + g := TestingMakeGossip(t, stopper, rpcContext) + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } nd := &roachpb.NodeDescriptor{ @@ -2537,8 +2539,8 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + g := TestingMakeGossip(t, stopper, rpcContext) + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } nd := &roachpb.NodeDescriptor{ @@ -2662,7 +2664,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) testCases := []struct { put1, put2, et roachpb.Key parCommit bool @@ -2749,7 +2751,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { }, } - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } nd := &roachpb.NodeDescriptor{ @@ -2859,7 +2861,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) keyA, keyB := roachpb.Key("a"), roachpb.Key("ab") put1 := roachpb.NewPut(keyA, roachpb.MakeValueFromString("val1")) @@ -2975,7 +2977,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) key := roachpb.Key("a") txn := roachpb.MakeTransaction( @@ -3116,7 +3118,7 @@ func TestCountRanges(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) // Create a slice of fake descriptors. const numDescriptors = 9 const firstKeyBoundary = 'a' @@ -3220,9 +3222,9 @@ func TestGatewayNodeID(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) const expNodeID = 42 - nd := newNodeDesc(expNodeID) + nd := TestingNewNodeDesc(expNodeID) g.NodeID.Reset(nd.NodeID) if err := g.SetNodeDescriptor(nd); err != nil { t.Fatal(err) @@ -3270,9 +3272,9 @@ func TestMultipleErrorsMerged(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } nd := &roachpb.NodeDescriptor{ @@ -3479,9 +3481,9 @@ func TestErrorIndexAlignment(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) - if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + if err := g.SetNodeDescriptor(TestingNewNodeDesc(1)); err != nil { t.Fatal(err) } nd := &roachpb.NodeDescriptor{ @@ -3632,12 +3634,12 @@ func TestCanSendToFollower(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) repls := testUserRangeDescriptor3Replicas.InternalReplicas for _, n := range repls { if err := g.AddInfoProto( gossip.MakeNodeIDKey(n.NodeID), - newNodeDesc(n.NodeID), + TestingNewNodeDesc(n.NodeID), gossip.NodeDescriptorTTL, ); err != nil { t.Fatal(err) @@ -3781,7 +3783,7 @@ func TestEvictMetaRange(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) if err := g.AddInfoProto(gossip.KeyFirstRangeDescriptor, &testMeta1RangeDescriptor, time.Hour); err != nil { t.Fatal(err) } @@ -3972,7 +3974,7 @@ func TestConnectionClass(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -4041,7 +4043,7 @@ func TestEvictionTokenCoalesce(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) if err := g.AddInfoProto(gossip.KeyFirstRangeDescriptor, &testMetaRangeDescriptor, time.Hour); err != nil { t.Fatal(err) } @@ -4190,7 +4192,7 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) // First request will be sent to an unsplit descriptor. var initialDesc = roachpb.RangeDescriptor{ diff --git a/pkg/kv/kvclient/kvcoord/mocks_generated_test.go b/pkg/kv/kvclient/kvcoord/mocks_generated_test.go new file mode 100755 index 000000000000..091014a1f2b4 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/mocks_generated_test.go @@ -0,0 +1,135 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord (interfaces: Transport) + +// Package kvcoord_test is a generated GoMock package. +package kvcoord_test + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + gomock "github.com/golang/mock/gomock" +) + +// MockTransport is a mock of Transport interface. +type MockTransport struct { + ctrl *gomock.Controller + recorder *MockTransportMockRecorder +} + +// MockTransportMockRecorder is the mock recorder for MockTransport. +type MockTransportMockRecorder struct { + mock *MockTransport +} + +// NewMockTransport creates a new mock instance. +func NewMockTransport(ctrl *gomock.Controller) *MockTransport { + mock := &MockTransport{ctrl: ctrl} + mock.recorder = &MockTransportMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTransport) EXPECT() *MockTransportMockRecorder { + return m.recorder +} + +// IsExhausted mocks base method. +func (m *MockTransport) IsExhausted() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsExhausted") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsExhausted indicates an expected call of IsExhausted. +func (mr *MockTransportMockRecorder) IsExhausted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsExhausted", reflect.TypeOf((*MockTransport)(nil).IsExhausted)) +} + +// MoveToFront mocks base method. +func (m *MockTransport) MoveToFront(arg0 roachpb.ReplicaDescriptor) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "MoveToFront", arg0) +} + +// MoveToFront indicates an expected call of MoveToFront. +func (mr *MockTransportMockRecorder) MoveToFront(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveToFront", reflect.TypeOf((*MockTransport)(nil).MoveToFront), arg0) +} + +// NextInternalClient mocks base method. +func (m *MockTransport) NextInternalClient( + arg0 context.Context, +) (context.Context, roachpb.InternalClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextInternalClient", arg0) + ret0, _ := ret[0].(context.Context) + ret1, _ := ret[1].(roachpb.InternalClient) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NextInternalClient indicates an expected call of NextInternalClient. +func (mr *MockTransportMockRecorder) NextInternalClient(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextInternalClient", reflect.TypeOf((*MockTransport)(nil).NextInternalClient), arg0) +} + +// NextReplica mocks base method. +func (m *MockTransport) NextReplica() roachpb.ReplicaDescriptor { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextReplica") + ret0, _ := ret[0].(roachpb.ReplicaDescriptor) + return ret0 +} + +// NextReplica indicates an expected call of NextReplica. +func (mr *MockTransportMockRecorder) NextReplica() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextReplica", reflect.TypeOf((*MockTransport)(nil).NextReplica)) +} + +// Release mocks base method. +func (m *MockTransport) Release() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Release") +} + +// Release indicates an expected call of Release. +func (mr *MockTransportMockRecorder) Release() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Release", reflect.TypeOf((*MockTransport)(nil).Release)) +} + +// SendNext mocks base method. +func (m *MockTransport) SendNext( + arg0 context.Context, arg1 roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendNext", arg0, arg1) + ret0, _ := ret[0].(*roachpb.BatchResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendNext indicates an expected call of SendNext. +func (mr *MockTransportMockRecorder) SendNext(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendNext", reflect.TypeOf((*MockTransport)(nil).SendNext), arg0, arg1) +} + +// SkipReplica mocks base method. +func (m *MockTransport) SkipReplica() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SkipReplica") +} + +// SkipReplica indicates an expected call of SkipReplica. +func (mr *MockTransportMockRecorder) SkipReplica() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SkipReplica", reflect.TypeOf((*MockTransport)(nil).SkipReplica)) +} diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index d90e6815f152..effe43714527 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -59,7 +59,7 @@ func TestRangeIterForward(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -96,7 +96,7 @@ func TestRangeIterSeekForward(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -136,7 +136,7 @@ func TestRangeIterReverse(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, @@ -173,7 +173,7 @@ func TestRangeIterSeekReverse(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 46bbc64fdf9f..0f7ac5157308 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -299,7 +299,7 @@ func sendBatch( ) (*roachpb.BatchResponse, error) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - g := makeGossip(t, stopper, rpcContext) + g := TestingMakeGossip(t, stopper, rpcContext) desc := new(roachpb.RangeDescriptor) desc.StartKey = roachpb.RKeyMin diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 37aec120ece3..d31cbca28e78 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -26,6 +26,8 @@ import ( "github.com/cockroachdb/errors" ) +//go:generate mockgen -package=kvcoord_test -destination=mocks_generated_test.go . Transport + // A SendOptions structure describes the algorithm for sending RPCs to one or // more replicas, depending on error conditions and how many successful // responses are required. diff --git a/pkg/kv/kvclient/rangecache/BUILD.bazel b/pkg/kv/kvclient/rangecache/BUILD.bazel index bd5997b1cd20..3fc2ec504e5b 100644 --- a/pkg/kv/kvclient/rangecache/BUILD.bazel +++ b/pkg/kv/kvclient/rangecache/BUILD.bazel @@ -1,10 +1,11 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "rangecache", + name = "rangecache_base", srcs = ["range_cache.go"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache", - visibility = ["//visibility:public"], + visibility = ["//visibility:private"], deps = [ "//pkg/keys", "//pkg/roachpb", @@ -24,6 +25,29 @@ go_library( ], ) +# keep +go_library( + name = "rangecache", + srcs = [":mocks_rangecache"], + embed = [":rangecache_base"], + visibility = ["//visibility:public"], + deps = [ + "@com_github_golang_mock//gomock", + "@org_golang_google_grpc//metadata", + ], +) + +gomock( + name = "mocks_rangecache", + out = "mocks_generated.go", + interfaces = [ + "RangeDescriptorDB", + ], + library = ":rangecache_base", + package = "rangecache", + self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache", +) + go_test( name = "rangecache_test", size = "small", diff --git a/pkg/kv/kvclient/rangecache/mocks_generated.go b/pkg/kv/kvclient/rangecache/mocks_generated.go new file mode 100644 index 000000000000..f37846a175a0 --- /dev/null +++ b/pkg/kv/kvclient/rangecache/mocks_generated.go @@ -0,0 +1,67 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache (interfaces: RangeDescriptorDB) + +// Package rangecache is a generated GoMock package. +package rangecache + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + gomock "github.com/golang/mock/gomock" +) + +// MockRangeDescriptorDB is a mock of RangeDescriptorDB interface. +type MockRangeDescriptorDB struct { + ctrl *gomock.Controller + recorder *MockRangeDescriptorDBMockRecorder +} + +// MockRangeDescriptorDBMockRecorder is the mock recorder for MockRangeDescriptorDB. +type MockRangeDescriptorDBMockRecorder struct { + mock *MockRangeDescriptorDB +} + +// NewMockRangeDescriptorDB creates a new mock instance. +func NewMockRangeDescriptorDB(ctrl *gomock.Controller) *MockRangeDescriptorDB { + mock := &MockRangeDescriptorDB{ctrl: ctrl} + mock.recorder = &MockRangeDescriptorDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRangeDescriptorDB) EXPECT() *MockRangeDescriptorDBMockRecorder { + return m.recorder +} + +// FirstRange mocks base method. +func (m *MockRangeDescriptorDB) FirstRange() (*roachpb.RangeDescriptor, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FirstRange") + ret0, _ := ret[0].(*roachpb.RangeDescriptor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FirstRange indicates an expected call of FirstRange. +func (mr *MockRangeDescriptorDBMockRecorder) FirstRange() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FirstRange", reflect.TypeOf((*MockRangeDescriptorDB)(nil).FirstRange)) +} + +// RangeLookup mocks base method. +func (m *MockRangeDescriptorDB) RangeLookup(arg0 context.Context, arg1 roachpb.RKey, arg2 bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RangeLookup", arg0, arg1, arg2) + ret0, _ := ret[0].([]roachpb.RangeDescriptor) + ret1, _ := ret[1].([]roachpb.RangeDescriptor) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// RangeLookup indicates an expected call of RangeLookup. +func (mr *MockRangeDescriptorDBMockRecorder) RangeLookup(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeLookup", reflect.TypeOf((*MockRangeDescriptorDB)(nil).RangeLookup), arg0, arg1, arg2) +} diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 2930131a1cc2..24d7647cc818 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -35,6 +35,8 @@ import ( "github.com/cockroachdb/logtags" ) +//go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB + // rangeCacheKey is the key type used to store and sort values in the // RangeCache. type rangeCacheKey roachpb.RKey @@ -1324,7 +1326,7 @@ func (e *CacheEntry) evictLeaseholder( // IsRangeLookupErrorRetryable returns whether the provided range lookup error // can be retried or whether it should be propagated immediately. func IsRangeLookupErrorRetryable(err error) bool { - // For now, all errors are retryable except transport errors where request was - // rejected. - return !grpcutil.IsConnectionRejected(err) + // Auth errors are not retryable. These imply that the local node has been + // decommissioned or is otherwise not part of the cluster. + return !grpcutil.IsAuthError(err) } diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 3ce86fc60db6..185a2d032f49 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -1,7 +1,8 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "rangefeed", + name = "rangefeed_base", srcs = [ "config.go", "db_adapter.go", @@ -9,7 +10,7 @@ go_library( "rangefeed.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", - visibility = ["//visibility:public"], + visibility = ["//visibility:private"], deps = [ "//pkg/base", "//pkg/kv", @@ -26,6 +27,30 @@ go_library( ], ) +# keep +go_library( + name = "rangefeed", + srcs = [ + ":mocks_rangefeed", # keep + ], + embed = [":rangefeed_base"], + visibility = ["//visibility:public"], + deps = [ + "@com_github_golang_mock//gomock", + "@org_golang_google_grpc//metadata", + ], +) + +gomock( + name = "mocks_rangefeed", + out = "mocks_generated.go", + interfaces = [""], # required, yet ignored when using source -- bug? + library = ":rangefeed_base", + package = "rangefeed", + self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", + source = "rangefeed.go", +) + go_test( name = "rangefeed_test", srcs = [ @@ -38,12 +63,11 @@ go_test( embed = [":rangefeed"], deps = [ "//pkg/base", + "//pkg/keys", "//pkg/roachpb", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", - "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", @@ -53,8 +77,8 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_golang_mock//gomock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated.go b/pkg/kv/kvclient/rangefeed/mocks_generated.go new file mode 100644 index 000000000000..67fb198fe76c --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/mocks_generated.go @@ -0,0 +1,65 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: rangefeed.go + +// Package rangefeed is a generated GoMock package. +package rangefeed + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" + gomock "github.com/golang/mock/gomock" +) + +// MockkvDB is a mock of kvDB interface. +type MockkvDB struct { + ctrl *gomock.Controller + recorder *MockkvDBMockRecorder +} + +// MockkvDBMockRecorder is the mock recorder for MockkvDB. +type MockkvDBMockRecorder struct { + mock *MockkvDB +} + +// NewMockkvDB creates a new mock instance. +func NewMockkvDB(ctrl *gomock.Controller) *MockkvDB { + mock := &MockkvDB{ctrl: ctrl} + mock.recorder = &MockkvDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockkvDB) EXPECT() *MockkvDBMockRecorder { + return m.recorder +} + +// RangeFeed mocks base method. +func (m *MockkvDB) RangeFeed(ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RangeFeed", ctx, span, startFrom, withDiff, eventC) + ret0, _ := ret[0].(error) + return ret0 +} + +// RangeFeed indicates an expected call of RangeFeed. +func (mr *MockkvDBMockRecorder) RangeFeed(ctx, span, startFrom, withDiff, eventC interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockkvDB)(nil).RangeFeed), ctx, span, startFrom, withDiff, eventC) +} + +// Scan mocks base method. +func (m *MockkvDB) Scan(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(roachpb.KeyValue)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scan", ctx, span, asOf, rowFn) + ret0, _ := ret[0].(error) + return ret0 +} + +// Scan indicates an expected call of Scan. +func (mr *MockkvDBMockRecorder) Scan(ctx, span, asOf, rowFn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scan", reflect.TypeOf((*MockkvDB)(nil).Scan), ctx, span, asOf, rowFn) +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index b68fef8df514..aff22fe5135e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -27,6 +27,8 @@ import ( "github.com/cockroachdb/logtags" ) +//go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go . + // TODO(ajwerner): Expose hooks for metrics. // TODO(ajwerner): Expose access to checkpoints and the frontier. // TODO(ajwerner): Expose better control over how the exponential backoff gets diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 12a3d2838e51..a8654a9af878 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -12,27 +12,21 @@ package rangefeed_test import ( "context" - "strings" - "sync/atomic" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) type mockClient struct { @@ -335,50 +329,47 @@ func TestRangeFeedMock(t *testing.T) { }) } -// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a -// rangefeed fails. It observes this indirectly by looking at logs. +// TestBackoffOnRangefeedFailure ensures that the rangefeed is retried on +// failures. func TestBackoffOnRangefeedFailure(t *testing.T) { defer leaktest.AfterTest(t)() - var called int64 - const timesToFail = 3 - rpcKnobs := rpc.ContextTestingKnobs{ - StreamClientInterceptor: func( - target string, class rpc.ConnectionClass, - ) grpc.StreamClientInterceptor { - return func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (stream grpc.ClientStream, err error) { - if strings.Contains(method, "RangeFeed") && - atomic.AddInt64(&called, 1) <= timesToFail { - return nil, errors.Errorf("boom") - } - return streamer(ctx, desc, cc, method, opts...) - } - }, + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + ctrl := gomock.NewController(t) + db := rangefeed.NewMockkvDB(ctrl) + + // Make sure scan failure gets retried. + db.EXPECT().Scan(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("scan failed")) + db.EXPECT().Scan(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + // Make sure rangefeed is retried even after 3 failures, then succeed and cancel context + // (which signals the rangefeed to shut down gracefully). + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Times(3). + Return(errors.New("rangefeed failed")) + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(context.Context, roachpb.Span, hlc.Timestamp, bool, chan<- *roachpb.RangeFeedEvent) { + cancel() + }). + Return(nil) + + f := rangefeed.NewFactoryWithDB(stopper, db, nil /* knobs */) + r, err := f.RangeFeed(ctx, "foo", + roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + hlc.Timestamp{}, + func(ctx context.Context, value *roachpb.RangeFeedValue) {}, + rangefeed.WithInitialScan(func(ctx context.Context) {}), + rangefeed.WithRetry(retry.Options{InitialBackoff: time.Millisecond}), + ) + require.NoError(t, err) + defer r.Close() + + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for retries") } - ctx := context.Background() - var seen int64 - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: rpcKnobs, - }, - RangeFeed: &rangefeed.TestingKnobs{ - OnRangefeedRestart: func() { - atomic.AddInt64(&seen, 1) - }, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - testutils.SucceedsSoon(t, func() error { - if n := atomic.LoadInt64(&seen); n < timesToFail { - return errors.Errorf("seen %d, waiting for %d", n, timesToFail) - } - return nil - }) } diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index c0cad24f84f1..5eb24bb73e3d 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -1,5 +1,6 @@ # gazelle:exclude string_test.go +load("@bazel_gomock//:gomock.bzl", "gomock") load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") @@ -24,6 +25,7 @@ go_library( ":gen-batch-generated", # keep ":gen-errordetailtype-stringer", # keep ":gen-method-stringer", # keep + ":mocks_api", # keep ], embed = [":roachpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/roachpb", @@ -49,7 +51,9 @@ go_library( "@com_github_cockroachdb_errors//errorspb", "@com_github_cockroachdb_errors//extgrpc", "@com_github_cockroachdb_redact//:redact", + "@com_github_golang_mock//gomock", # keep "@io_etcd_go_etcd_raft_v3//raftpb", + "@org_golang_google_grpc//metadata", # keep ], ) @@ -102,6 +106,18 @@ go_library( ], ) +gomock( + name = "mocks_api", + out = "mocks_generated.go", + interfaces = [ + "Internal_RangeFeedClient", + "InternalClient", + ], + library = ":bootstrap", + package = "roachpb", + self_package = "github.com/cockroachdb/cockroach/pkg/roachpb", +) + go_test( name = "roachpb_test", size = "small", diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index d4db73123ebb..af15f28f6602 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/redact" ) +//go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient + // UserPriority is a custom type for transaction's user priority. type UserPriority float64 diff --git a/pkg/roachpb/mocks_generated.go b/pkg/roachpb/mocks_generated.go new file mode 100644 index 000000000000..d9dea81cb6a6 --- /dev/null +++ b/pkg/roachpb/mocks_generated.go @@ -0,0 +1,280 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) + +// Package roachpb is a generated GoMock package. +package roachpb + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" +) + +// MockInternalClient is a mock of InternalClient interface. +type MockInternalClient struct { + ctrl *gomock.Controller + recorder *MockInternalClientMockRecorder +} + +// MockInternalClientMockRecorder is the mock recorder for MockInternalClient. +type MockInternalClientMockRecorder struct { + mock *MockInternalClient +} + +// NewMockInternalClient creates a new mock instance. +func NewMockInternalClient(ctrl *gomock.Controller) *MockInternalClient { + mock := &MockInternalClient{ctrl: ctrl} + mock.recorder = &MockInternalClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternalClient) EXPECT() *MockInternalClientMockRecorder { + return m.recorder +} + +// Batch mocks base method. +func (m *MockInternalClient) Batch(arg0 context.Context, arg1 *BatchRequest, arg2 ...grpc.CallOption) (*BatchResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Batch", varargs...) + ret0, _ := ret[0].(*BatchResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Batch indicates an expected call of Batch. +func (mr *MockInternalClientMockRecorder) Batch(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Batch", reflect.TypeOf((*MockInternalClient)(nil).Batch), varargs...) +} + +// GossipSubscription mocks base method. +func (m *MockInternalClient) GossipSubscription(arg0 context.Context, arg1 *GossipSubscriptionRequest, arg2 ...grpc.CallOption) (Internal_GossipSubscriptionClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GossipSubscription", varargs...) + ret0, _ := ret[0].(Internal_GossipSubscriptionClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GossipSubscription indicates an expected call of GossipSubscription. +func (mr *MockInternalClientMockRecorder) GossipSubscription(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GossipSubscription", reflect.TypeOf((*MockInternalClient)(nil).GossipSubscription), varargs...) +} + +// Join mocks base method. +func (m *MockInternalClient) Join(arg0 context.Context, arg1 *JoinNodeRequest, arg2 ...grpc.CallOption) (*JoinNodeResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Join", varargs...) + ret0, _ := ret[0].(*JoinNodeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Join indicates an expected call of Join. +func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) +} + +// RangeFeed mocks base method. +func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *RangeFeedRequest, arg2 ...grpc.CallOption) (Internal_RangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RangeFeed", varargs...) + ret0, _ := ret[0].(Internal_RangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RangeFeed indicates an expected call of RangeFeed. +func (mr *MockInternalClientMockRecorder) RangeFeed(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockInternalClient)(nil).RangeFeed), varargs...) +} + +// RangeLookup mocks base method. +func (m *MockInternalClient) RangeLookup(arg0 context.Context, arg1 *RangeLookupRequest, arg2 ...grpc.CallOption) (*RangeLookupResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RangeLookup", varargs...) + ret0, _ := ret[0].(*RangeLookupResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RangeLookup indicates an expected call of RangeLookup. +func (mr *MockInternalClientMockRecorder) RangeLookup(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeLookup", reflect.TypeOf((*MockInternalClient)(nil).RangeLookup), varargs...) +} + +// ResetQuorum mocks base method. +func (m *MockInternalClient) ResetQuorum(arg0 context.Context, arg1 *ResetQuorumRequest, arg2 ...grpc.CallOption) (*ResetQuorumResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ResetQuorum", varargs...) + ret0, _ := ret[0].(*ResetQuorumResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ResetQuorum indicates an expected call of ResetQuorum. +func (mr *MockInternalClientMockRecorder) ResetQuorum(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetQuorum", reflect.TypeOf((*MockInternalClient)(nil).ResetQuorum), varargs...) +} + +// MockInternal_RangeFeedClient is a mock of Internal_RangeFeedClient interface. +type MockInternal_RangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_RangeFeedClientMockRecorder +} + +// MockInternal_RangeFeedClientMockRecorder is the mock recorder for MockInternal_RangeFeedClient. +type MockInternal_RangeFeedClientMockRecorder struct { + mock *MockInternal_RangeFeedClient +} + +// NewMockInternal_RangeFeedClient creates a new mock instance. +func NewMockInternal_RangeFeedClient(ctrl *gomock.Controller) *MockInternal_RangeFeedClient { + mock := &MockInternal_RangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_RangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_RangeFeedClient) EXPECT() *MockInternal_RangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_RangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_RangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_RangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_RangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_RangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_RangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_RangeFeedClient) Recv() (*RangeFeedEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*RangeFeedEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_RangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_RangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_RangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).RecvMsg), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_RangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_RangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_RangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) +}