Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dnm] kvcoord: re-work bazel+gomock implementation #66975

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generi
pkg/roachpb/batch.go://go:generate go run -tags gen-batch gen/main.go
pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated.go -source=cert.go . Cert
pkg/cmd/roachtest/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mock_generated.go -source=prometheus.go . cluster
pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB
pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go .
pkg/kv/kvclient/kvcoord/transport.go://go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport
pkg/roachpb/api.go://go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient
pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs
pkg/security/securitytest/securitytest.go://go:generate gofmt -s -w embedded.go
pkg/security/securitytest/securitytest.go://go:generate goimports -w embedded.go
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
load("@bazel_gomock//:gomock.bzl", "gomock")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

Expand Down Expand Up @@ -69,12 +70,21 @@ go_library(
],
)

gomock(
name = "mocks_transport",
out = "mocks_generated_test.go",
interfaces = ["Transport"],
library = ":kvcoord",
package = "kvcoord_test",
)

go_test(
name = "kvcoord_test",
size = "medium",
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
"integration_test.go",
Expand All @@ -95,6 +105,7 @@ go_test(
"txn_interceptor_seq_num_allocator_test.go",
"txn_interceptor_span_refresher_test.go",
"txn_test.go",
"mocks_generated_test.go", # keep
],
data = glob(["testdata/**"]),
embed = [":kvcoord"],
Expand Down Expand Up @@ -146,6 +157,7 @@ go_test(
"@com_github_cockroachdb_errors//errutil",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_mock//gomock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (ds *DistSender) partialRangeFeed(
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
rangeInfo.token.Evict(ctx)
rangeInfo.token = rangecache.EvictionToken{}
Expand Down
155 changes: 155 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord_test

import (
"context"
"io"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
gomock "github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// Tests that the range feed handles transport errors appropriately. In
// particular, that when encountering other decommissioned nodes it will refresh
// its range descriptor and retry, but if this node is decommissioned it will
// bail out. Regression test for:
// https://github.com/cockroachdb/cockroach/issues/66636
func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, spec := range []struct {
errorCode codes.Code
expectRetry bool
}{
{codes.FailedPrecondition, true}, // target node is decommissioned; retry
{codes.PermissionDenied, false}, // this node is decommissioned; abort
{codes.Unauthenticated, false}, // this node is not part of cluster; abort
} {
t.Run(spec.errorCode.String(), func(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := kvcoord.TestingMakeGossip(t, stopper, rpcContext)

desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
for _, repl := range desc.InternalReplicas {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(repl.NodeID),
kvcoord.TestingNewNodeDesc(repl.NodeID),
gossip.NodeDescriptorTTL,
))
}

ctrl := gomock.NewController(t)
transport := NewMockTransport(ctrl)
rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl)

// We start off with a cached lease on r1.
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
Sequence: 1,
}

// All nodes return the specified error code. We expect the range feed to
// keep trying all replicas in sequence regardless of error.
for _, repl := range desc.InternalReplicas {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
ctx, nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// Once all replicas have failed, it should try to refresh the lease using
// the range cache. We let this succeed once.
rangeDB.EXPECT().FirstRange().Return(&desc, nil)

// It then tries the replicas again. This time we just report the
// transport as exhausted immediately.
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// This invalidates the cache yet again. This time we error.
rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, ""))

// If we expect a range lookup retry, allow the retry to succeed by
// returning a range descriptor and a client that immediately
// cancels the context and closes the range feed stream.
if spec.expectRetry {
rangeDB.EXPECT().FirstRange().Return(&desc, nil)
stream := roachpb.NewMockInternal_RangeFeedClient(ctrl)
stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF)
client := roachpb.NewMockInternalClient(ctrl)
client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil)
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0])
transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil)
transport.EXPECT().Release()
}

ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
RPCContext: rpcContext,
TestingKnobs: kvcoord.ClientTestingKnobs{
TransportFactory: func(kvcoord.SendOptions, *nodedialer.Dialer, kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return transport, nil
},
},
RangeDescriptorDB: rangeDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
})
ds.RangeDescriptorCache().Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

err := ds.RangeFeed(ctx, roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, hlc.Timestamp{}, false, nil)
require.Error(t, err)
})
}
}
Loading