From 3a990da6f86d352753461cc91d3610e5f734b44d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 25 Jun 2021 15:18:09 +0000 Subject: [PATCH] kvcoord: fix rangefeed retries on transport errors `DistSender.RangeFeed()` was meant to retry transport errors after refreshing the range descriptor (invalidating the cached entry). However, due to an incorrect error type check (`*sendError` vs `sendError`), these errors failed the range feed without invalidating the cached range descriptor. This was particularly severe in cases where a large number of nodes had been decommissioned, where some stale range descriptors on some nodes contained only decommissioned nodes. Since change feeds set up range feeds across many nodes and ranges in the cluster, they are likely to encounter these decommissioned nodes and return an error -- and since the descriptor cache wasn't invalidated they would keep erroring until the nodes were restarted such that the caches were flushed (often requiring a full cluster restart). Release note (bug fix): Change feeds now properly invalidate cached range descriptors and retry when encountering decommissioned nodes. --- build/bazelutil/check.sh | 4 + pkg/kv/kvclient/kvcoord/BUILD.bazel | 30 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- .../kvcoord/dist_sender_rangefeed_test.go | 154 ++++++++++ pkg/kv/kvclient/kvcoord/mocks_generated.go | 131 ++++++++ pkg/kv/kvclient/kvcoord/transport.go | 2 + pkg/kv/kvclient/rangecache/BUILD.bazel | 28 +- pkg/kv/kvclient/rangecache/mocks_generated.go | 67 +++++ pkg/kv/kvclient/rangecache/range_cache.go | 2 + pkg/kv/kvclient/rangefeed/BUILD.bazel | 34 ++- pkg/kv/kvclient/rangefeed/mocks_generated.go | 65 ++++ pkg/kv/kvclient/rangefeed/rangefeed.go | 2 + .../kvclient/rangefeed/rangefeed_mock_test.go | 92 +++--- pkg/roachpb/BUILD.bazel | 16 + pkg/roachpb/api.go | 2 + pkg/roachpb/mocks_generated.go | 280 ++++++++++++++++++ 16 files changed, 851 insertions(+), 60 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go create mode 100644 pkg/kv/kvclient/kvcoord/mocks_generated.go create mode 100644 pkg/kv/kvclient/rangecache/mocks_generated.go create mode 100644 pkg/kv/kvclient/rangefeed/mocks_generated.go create mode 100644 pkg/roachpb/mocks_generated.go diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index f1dba3d6cd5f..05377b29cb05 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -16,6 +16,10 @@ pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generi pkg/roachpb/batch.go://go:generate go run -tags gen-batch gen/main.go pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated.go -source=cert.go . Cert pkg/cmd/roachtest/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mock_generated.go -source=prometheus.go . cluster +pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB +pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go . +pkg/kv/kvclient/kvcoord/transport.go://go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport +pkg/roachpb/api.go://go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs pkg/security/securitytest/securitytest.go://go:generate gofmt -s -w embedded.go pkg/security/securitytest/securitytest.go://go:generate goimports -w embedded.go diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 20003bbc9c09..4ff6897fe0ce 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -1,8 +1,9 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("//build:STRINGER.bzl", "stringer") go_library( - name = "kvcoord", + name = "kvcoord_base", srcs = [ "batch.go", "condensable_span_set.go", @@ -31,7 +32,7 @@ go_library( ":gen-txnstate-stringer", # keep ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord", - visibility = ["//visibility:public"], + visibility = ["//visibility:private"], deps = [ "//pkg/base", "//pkg/gossip", @@ -69,12 +70,36 @@ go_library( ], ) +# keep +go_library( + name = "kvcoord", + srcs = [":mocks_transport"], + embed = [":kvcoord_base"], + visibility = ["//visibility:public"], + deps = [ + "@com_github_golang_mock//gomock", + "@org_golang_google_grpc//metadata", + ], +) + +gomock( + name = "mocks_transport", + out = "mocks_generated.go", + interfaces = [ + "Transport", + ], + library = ":kvcoord_base", + package = "kvcoord", + self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord", +) + go_test( name = "kvcoord_test", size = "medium", srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", "integration_test.go", @@ -146,6 +171,7 @@ go_test( "@com_github_cockroachdb_errors//errutil", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", + "@com_github_golang_mock//gomock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//:go_default_library", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 0d38a96f9cb3..b18a0633f74d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -164,7 +164,7 @@ func (ds *DistSender) partialRangeFeed( // These errors are likely to be unique to the replica that // reported them, so no action is required before the next // retry. - case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): + case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): // Evict the descriptor from the cache and reload on next attempt. rangeInfo.token.Evict(ctx) rangeInfo.token = rangecache.EvictionToken{} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go new file mode 100644 index 000000000000..e4b4e3efcd16 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -0,0 +1,154 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(spec.errorCode.String(), func(t *testing.T) { + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + ctx, nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client := roachpb.NewMockInternalClient(ctrl) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + err := ds.RangeFeed(ctx, roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, hlc.Timestamp{}, false, nil) + require.Error(t, err) + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/mocks_generated.go b/pkg/kv/kvclient/kvcoord/mocks_generated.go new file mode 100644 index 000000000000..597b69e49f0f --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/mocks_generated.go @@ -0,0 +1,131 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord (interfaces: Transport) + +// Package kvcoord is a generated GoMock package. +package kvcoord + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + gomock "github.com/golang/mock/gomock" +) + +// MockTransport is a mock of Transport interface. +type MockTransport struct { + ctrl *gomock.Controller + recorder *MockTransportMockRecorder +} + +// MockTransportMockRecorder is the mock recorder for MockTransport. +type MockTransportMockRecorder struct { + mock *MockTransport +} + +// NewMockTransport creates a new mock instance. +func NewMockTransport(ctrl *gomock.Controller) *MockTransport { + mock := &MockTransport{ctrl: ctrl} + mock.recorder = &MockTransportMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTransport) EXPECT() *MockTransportMockRecorder { + return m.recorder +} + +// IsExhausted mocks base method. +func (m *MockTransport) IsExhausted() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsExhausted") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsExhausted indicates an expected call of IsExhausted. +func (mr *MockTransportMockRecorder) IsExhausted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsExhausted", reflect.TypeOf((*MockTransport)(nil).IsExhausted)) +} + +// MoveToFront mocks base method. +func (m *MockTransport) MoveToFront(arg0 roachpb.ReplicaDescriptor) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "MoveToFront", arg0) +} + +// MoveToFront indicates an expected call of MoveToFront. +func (mr *MockTransportMockRecorder) MoveToFront(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveToFront", reflect.TypeOf((*MockTransport)(nil).MoveToFront), arg0) +} + +// NextInternalClient mocks base method. +func (m *MockTransport) NextInternalClient(arg0 context.Context) (context.Context, roachpb.InternalClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextInternalClient", arg0) + ret0, _ := ret[0].(context.Context) + ret1, _ := ret[1].(roachpb.InternalClient) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NextInternalClient indicates an expected call of NextInternalClient. +func (mr *MockTransportMockRecorder) NextInternalClient(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextInternalClient", reflect.TypeOf((*MockTransport)(nil).NextInternalClient), arg0) +} + +// NextReplica mocks base method. +func (m *MockTransport) NextReplica() roachpb.ReplicaDescriptor { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextReplica") + ret0, _ := ret[0].(roachpb.ReplicaDescriptor) + return ret0 +} + +// NextReplica indicates an expected call of NextReplica. +func (mr *MockTransportMockRecorder) NextReplica() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextReplica", reflect.TypeOf((*MockTransport)(nil).NextReplica)) +} + +// Release mocks base method. +func (m *MockTransport) Release() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Release") +} + +// Release indicates an expected call of Release. +func (mr *MockTransportMockRecorder) Release() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Release", reflect.TypeOf((*MockTransport)(nil).Release)) +} + +// SendNext mocks base method. +func (m *MockTransport) SendNext(arg0 context.Context, arg1 roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendNext", arg0, arg1) + ret0, _ := ret[0].(*roachpb.BatchResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendNext indicates an expected call of SendNext. +func (mr *MockTransportMockRecorder) SendNext(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendNext", reflect.TypeOf((*MockTransport)(nil).SendNext), arg0, arg1) +} + +// SkipReplica mocks base method. +func (m *MockTransport) SkipReplica() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SkipReplica") +} + +// SkipReplica indicates an expected call of SkipReplica. +func (mr *MockTransportMockRecorder) SkipReplica() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SkipReplica", reflect.TypeOf((*MockTransport)(nil).SkipReplica)) +} diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 37aec120ece3..cacb061a6397 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -26,6 +26,8 @@ import ( "github.com/cockroachdb/errors" ) +//go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport + // A SendOptions structure describes the algorithm for sending RPCs to one or // more replicas, depending on error conditions and how many successful // responses are required. diff --git a/pkg/kv/kvclient/rangecache/BUILD.bazel b/pkg/kv/kvclient/rangecache/BUILD.bazel index bd5997b1cd20..3fc2ec504e5b 100644 --- a/pkg/kv/kvclient/rangecache/BUILD.bazel +++ b/pkg/kv/kvclient/rangecache/BUILD.bazel @@ -1,10 +1,11 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "rangecache", + name = "rangecache_base", srcs = ["range_cache.go"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache", - visibility = ["//visibility:public"], + visibility = ["//visibility:private"], deps = [ "//pkg/keys", "//pkg/roachpb", @@ -24,6 +25,29 @@ go_library( ], ) +# keep +go_library( + name = "rangecache", + srcs = [":mocks_rangecache"], + embed = [":rangecache_base"], + visibility = ["//visibility:public"], + deps = [ + "@com_github_golang_mock//gomock", + "@org_golang_google_grpc//metadata", + ], +) + +gomock( + name = "mocks_rangecache", + out = "mocks_generated.go", + interfaces = [ + "RangeDescriptorDB", + ], + library = ":rangecache_base", + package = "rangecache", + self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache", +) + go_test( name = "rangecache_test", size = "small", diff --git a/pkg/kv/kvclient/rangecache/mocks_generated.go b/pkg/kv/kvclient/rangecache/mocks_generated.go new file mode 100644 index 000000000000..f37846a175a0 --- /dev/null +++ b/pkg/kv/kvclient/rangecache/mocks_generated.go @@ -0,0 +1,67 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache (interfaces: RangeDescriptorDB) + +// Package rangecache is a generated GoMock package. +package rangecache + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + gomock "github.com/golang/mock/gomock" +) + +// MockRangeDescriptorDB is a mock of RangeDescriptorDB interface. +type MockRangeDescriptorDB struct { + ctrl *gomock.Controller + recorder *MockRangeDescriptorDBMockRecorder +} + +// MockRangeDescriptorDBMockRecorder is the mock recorder for MockRangeDescriptorDB. +type MockRangeDescriptorDBMockRecorder struct { + mock *MockRangeDescriptorDB +} + +// NewMockRangeDescriptorDB creates a new mock instance. +func NewMockRangeDescriptorDB(ctrl *gomock.Controller) *MockRangeDescriptorDB { + mock := &MockRangeDescriptorDB{ctrl: ctrl} + mock.recorder = &MockRangeDescriptorDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRangeDescriptorDB) EXPECT() *MockRangeDescriptorDBMockRecorder { + return m.recorder +} + +// FirstRange mocks base method. +func (m *MockRangeDescriptorDB) FirstRange() (*roachpb.RangeDescriptor, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FirstRange") + ret0, _ := ret[0].(*roachpb.RangeDescriptor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FirstRange indicates an expected call of FirstRange. +func (mr *MockRangeDescriptorDBMockRecorder) FirstRange() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FirstRange", reflect.TypeOf((*MockRangeDescriptorDB)(nil).FirstRange)) +} + +// RangeLookup mocks base method. +func (m *MockRangeDescriptorDB) RangeLookup(arg0 context.Context, arg1 roachpb.RKey, arg2 bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RangeLookup", arg0, arg1, arg2) + ret0, _ := ret[0].([]roachpb.RangeDescriptor) + ret1, _ := ret[1].([]roachpb.RangeDescriptor) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// RangeLookup indicates an expected call of RangeLookup. +func (mr *MockRangeDescriptorDBMockRecorder) RangeLookup(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeLookup", reflect.TypeOf((*MockRangeDescriptorDB)(nil).RangeLookup), arg0, arg1, arg2) +} diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index dfe3787b9068..24d7647cc818 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -35,6 +35,8 @@ import ( "github.com/cockroachdb/logtags" ) +//go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB + // rangeCacheKey is the key type used to store and sort values in the // RangeCache. type rangeCacheKey roachpb.RKey diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 3ce86fc60db6..185a2d032f49 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -1,7 +1,8 @@ +load("@bazel_gomock//:gomock.bzl", "gomock") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "rangefeed", + name = "rangefeed_base", srcs = [ "config.go", "db_adapter.go", @@ -9,7 +10,7 @@ go_library( "rangefeed.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", - visibility = ["//visibility:public"], + visibility = ["//visibility:private"], deps = [ "//pkg/base", "//pkg/kv", @@ -26,6 +27,30 @@ go_library( ], ) +# keep +go_library( + name = "rangefeed", + srcs = [ + ":mocks_rangefeed", # keep + ], + embed = [":rangefeed_base"], + visibility = ["//visibility:public"], + deps = [ + "@com_github_golang_mock//gomock", + "@org_golang_google_grpc//metadata", + ], +) + +gomock( + name = "mocks_rangefeed", + out = "mocks_generated.go", + interfaces = [""], # required, yet ignored when using source -- bug? + library = ":rangefeed_base", + package = "rangefeed", + self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", + source = "rangefeed.go", +) + go_test( name = "rangefeed_test", srcs = [ @@ -38,12 +63,11 @@ go_test( embed = [":rangefeed"], deps = [ "//pkg/base", + "//pkg/keys", "//pkg/roachpb", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", - "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", @@ -53,8 +77,8 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_golang_mock//gomock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated.go b/pkg/kv/kvclient/rangefeed/mocks_generated.go new file mode 100644 index 000000000000..67fb198fe76c --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/mocks_generated.go @@ -0,0 +1,65 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: rangefeed.go + +// Package rangefeed is a generated GoMock package. +package rangefeed + +import ( + context "context" + reflect "reflect" + + roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" + gomock "github.com/golang/mock/gomock" +) + +// MockkvDB is a mock of kvDB interface. +type MockkvDB struct { + ctrl *gomock.Controller + recorder *MockkvDBMockRecorder +} + +// MockkvDBMockRecorder is the mock recorder for MockkvDB. +type MockkvDBMockRecorder struct { + mock *MockkvDB +} + +// NewMockkvDB creates a new mock instance. +func NewMockkvDB(ctrl *gomock.Controller) *MockkvDB { + mock := &MockkvDB{ctrl: ctrl} + mock.recorder = &MockkvDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockkvDB) EXPECT() *MockkvDBMockRecorder { + return m.recorder +} + +// RangeFeed mocks base method. +func (m *MockkvDB) RangeFeed(ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RangeFeed", ctx, span, startFrom, withDiff, eventC) + ret0, _ := ret[0].(error) + return ret0 +} + +// RangeFeed indicates an expected call of RangeFeed. +func (mr *MockkvDBMockRecorder) RangeFeed(ctx, span, startFrom, withDiff, eventC interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockkvDB)(nil).RangeFeed), ctx, span, startFrom, withDiff, eventC) +} + +// Scan mocks base method. +func (m *MockkvDB) Scan(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(roachpb.KeyValue)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scan", ctx, span, asOf, rowFn) + ret0, _ := ret[0].(error) + return ret0 +} + +// Scan indicates an expected call of Scan. +func (mr *MockkvDBMockRecorder) Scan(ctx, span, asOf, rowFn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scan", reflect.TypeOf((*MockkvDB)(nil).Scan), ctx, span, asOf, rowFn) +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index b68fef8df514..aff22fe5135e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -27,6 +27,8 @@ import ( "github.com/cockroachdb/logtags" ) +//go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go . + // TODO(ajwerner): Expose hooks for metrics. // TODO(ajwerner): Expose access to checkpoints and the frontier. // TODO(ajwerner): Expose better control over how the exponential backoff gets diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 12a3d2838e51..0a76f19055b9 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -12,27 +12,21 @@ package rangefeed_test import ( "context" - "strings" - "sync/atomic" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) type mockClient struct { @@ -335,50 +329,48 @@ func TestRangeFeedMock(t *testing.T) { }) } -// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a -// rangefeed fails. It observes this indirectly by looking at logs. +// TestBackoffOnRangefeedFailure ensures that the rangefeed is retried on +// failures. func TestBackoffOnRangefeedFailure(t *testing.T) { defer leaktest.AfterTest(t)() - var called int64 - const timesToFail = 3 - rpcKnobs := rpc.ContextTestingKnobs{ - StreamClientInterceptor: func( - target string, class rpc.ConnectionClass, - ) grpc.StreamClientInterceptor { - return func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (stream grpc.ClientStream, err error) { - if strings.Contains(method, "RangeFeed") && - atomic.AddInt64(&called, 1) <= timesToFail { - return nil, errors.Errorf("boom") - } - return streamer(ctx, desc, cc, method, opts...) - } - }, + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + ctrl := gomock.NewController(t) + db := rangefeed.NewMockkvDB(ctrl) + + // Make sure scan failure gets retried. + db.EXPECT().Scan(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(errors.New("scan failed")). + Return(nil) + + // Make sure rangefeed is retried even after 3 failures, then succeed and cancel context + // (which signals the rangefeed to shut down gracefully). + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Times(3). + Return(errors.New("rangefeed failed")) + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(context.Context, roachpb.Span, hlc.Timestamp, bool, chan<- *roachpb.RangeFeedEvent) { + cancel() + }). + Return(nil) + + f := rangefeed.NewFactoryWithDB(stopper, db, nil /* knobs */) + r, err := f.RangeFeed(ctx, "foo", + roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + hlc.Timestamp{}, + func(ctx context.Context, value *roachpb.RangeFeedValue) {}, + rangefeed.WithInitialScan(func(ctx context.Context) {}), + rangefeed.WithRetry(retry.Options{InitialBackoff: time.Millisecond}), + ) + require.NoError(t, err) + defer r.Close() + + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for retries") } - ctx := context.Background() - var seen int64 - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: rpcKnobs, - }, - RangeFeed: &rangefeed.TestingKnobs{ - OnRangefeedRestart: func() { - atomic.AddInt64(&seen, 1) - }, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - testutils.SucceedsSoon(t, func() error { - if n := atomic.LoadInt64(&seen); n < timesToFail { - return errors.Errorf("seen %d, waiting for %d", n, timesToFail) - } - return nil - }) } diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index c0cad24f84f1..5eb24bb73e3d 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -1,5 +1,6 @@ # gazelle:exclude string_test.go +load("@bazel_gomock//:gomock.bzl", "gomock") load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") @@ -24,6 +25,7 @@ go_library( ":gen-batch-generated", # keep ":gen-errordetailtype-stringer", # keep ":gen-method-stringer", # keep + ":mocks_api", # keep ], embed = [":roachpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/roachpb", @@ -49,7 +51,9 @@ go_library( "@com_github_cockroachdb_errors//errorspb", "@com_github_cockroachdb_errors//extgrpc", "@com_github_cockroachdb_redact//:redact", + "@com_github_golang_mock//gomock", # keep "@io_etcd_go_etcd_raft_v3//raftpb", + "@org_golang_google_grpc//metadata", # keep ], ) @@ -102,6 +106,18 @@ go_library( ], ) +gomock( + name = "mocks_api", + out = "mocks_generated.go", + interfaces = [ + "Internal_RangeFeedClient", + "InternalClient", + ], + library = ":bootstrap", + package = "roachpb", + self_package = "github.com/cockroachdb/cockroach/pkg/roachpb", +) + go_test( name = "roachpb_test", size = "small", diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index d4db73123ebb..af15f28f6602 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/redact" ) +//go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient + // UserPriority is a custom type for transaction's user priority. type UserPriority float64 diff --git a/pkg/roachpb/mocks_generated.go b/pkg/roachpb/mocks_generated.go new file mode 100644 index 000000000000..d9dea81cb6a6 --- /dev/null +++ b/pkg/roachpb/mocks_generated.go @@ -0,0 +1,280 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) + +// Package roachpb is a generated GoMock package. +package roachpb + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" +) + +// MockInternalClient is a mock of InternalClient interface. +type MockInternalClient struct { + ctrl *gomock.Controller + recorder *MockInternalClientMockRecorder +} + +// MockInternalClientMockRecorder is the mock recorder for MockInternalClient. +type MockInternalClientMockRecorder struct { + mock *MockInternalClient +} + +// NewMockInternalClient creates a new mock instance. +func NewMockInternalClient(ctrl *gomock.Controller) *MockInternalClient { + mock := &MockInternalClient{ctrl: ctrl} + mock.recorder = &MockInternalClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternalClient) EXPECT() *MockInternalClientMockRecorder { + return m.recorder +} + +// Batch mocks base method. +func (m *MockInternalClient) Batch(arg0 context.Context, arg1 *BatchRequest, arg2 ...grpc.CallOption) (*BatchResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Batch", varargs...) + ret0, _ := ret[0].(*BatchResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Batch indicates an expected call of Batch. +func (mr *MockInternalClientMockRecorder) Batch(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Batch", reflect.TypeOf((*MockInternalClient)(nil).Batch), varargs...) +} + +// GossipSubscription mocks base method. +func (m *MockInternalClient) GossipSubscription(arg0 context.Context, arg1 *GossipSubscriptionRequest, arg2 ...grpc.CallOption) (Internal_GossipSubscriptionClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GossipSubscription", varargs...) + ret0, _ := ret[0].(Internal_GossipSubscriptionClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GossipSubscription indicates an expected call of GossipSubscription. +func (mr *MockInternalClientMockRecorder) GossipSubscription(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GossipSubscription", reflect.TypeOf((*MockInternalClient)(nil).GossipSubscription), varargs...) +} + +// Join mocks base method. +func (m *MockInternalClient) Join(arg0 context.Context, arg1 *JoinNodeRequest, arg2 ...grpc.CallOption) (*JoinNodeResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Join", varargs...) + ret0, _ := ret[0].(*JoinNodeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Join indicates an expected call of Join. +func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) +} + +// RangeFeed mocks base method. +func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *RangeFeedRequest, arg2 ...grpc.CallOption) (Internal_RangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RangeFeed", varargs...) + ret0, _ := ret[0].(Internal_RangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RangeFeed indicates an expected call of RangeFeed. +func (mr *MockInternalClientMockRecorder) RangeFeed(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockInternalClient)(nil).RangeFeed), varargs...) +} + +// RangeLookup mocks base method. +func (m *MockInternalClient) RangeLookup(arg0 context.Context, arg1 *RangeLookupRequest, arg2 ...grpc.CallOption) (*RangeLookupResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RangeLookup", varargs...) + ret0, _ := ret[0].(*RangeLookupResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RangeLookup indicates an expected call of RangeLookup. +func (mr *MockInternalClientMockRecorder) RangeLookup(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeLookup", reflect.TypeOf((*MockInternalClient)(nil).RangeLookup), varargs...) +} + +// ResetQuorum mocks base method. +func (m *MockInternalClient) ResetQuorum(arg0 context.Context, arg1 *ResetQuorumRequest, arg2 ...grpc.CallOption) (*ResetQuorumResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ResetQuorum", varargs...) + ret0, _ := ret[0].(*ResetQuorumResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ResetQuorum indicates an expected call of ResetQuorum. +func (mr *MockInternalClientMockRecorder) ResetQuorum(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetQuorum", reflect.TypeOf((*MockInternalClient)(nil).ResetQuorum), varargs...) +} + +// MockInternal_RangeFeedClient is a mock of Internal_RangeFeedClient interface. +type MockInternal_RangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_RangeFeedClientMockRecorder +} + +// MockInternal_RangeFeedClientMockRecorder is the mock recorder for MockInternal_RangeFeedClient. +type MockInternal_RangeFeedClientMockRecorder struct { + mock *MockInternal_RangeFeedClient +} + +// NewMockInternal_RangeFeedClient creates a new mock instance. +func NewMockInternal_RangeFeedClient(ctrl *gomock.Controller) *MockInternal_RangeFeedClient { + mock := &MockInternal_RangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_RangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_RangeFeedClient) EXPECT() *MockInternal_RangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_RangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_RangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_RangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_RangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_RangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_RangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_RangeFeedClient) Recv() (*RangeFeedEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*RangeFeedEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_RangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_RangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_RangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).RecvMsg), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_RangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_RangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_RangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) +}