Skip to content

Commit

Permalink
kvcoord: fix rangefeed retries on transport errors
Browse files Browse the repository at this point in the history
`DistSender.RangeFeed()` was meant to retry transport errors after
refreshing the range descriptor (invalidating the cached entry).
However, due to an incorrect error type check (`*sendError` vs
`sendError`), these errors failed the range feed without invalidating
the cached range descriptor.

This was particularly severe in cases where a large number of nodes had
been decommissioned, where some stale range descriptors on some nodes
contained only decommissioned nodes. Since change feeds set up range
feeds across many nodes and ranges in the cluster, they are likely to
encounter these decommissioned nodes and return an error -- and since
the descriptor cache wasn't invalidated they would keep erroring until
the nodes were restarted such that the caches were flushed (often
requiring a full cluster restart).

Release note (bug fix): Change feeds now properly invalidate cached
range descriptors and retry when encountering decommissioned nodes.
  • Loading branch information
erikgrinaker committed Jun 28, 2021
1 parent b396b8a commit 3a990da
Show file tree
Hide file tree
Showing 16 changed files with 851 additions and 60 deletions.
4 changes: 4 additions & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generi
pkg/roachpb/batch.go://go:generate go run -tags gen-batch gen/main.go
pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated.go -source=cert.go . Cert
pkg/cmd/roachtest/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mock_generated.go -source=prometheus.go . cluster
pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangecache -destination=mocks_generated.go . RangeDescriptorDB
pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -package=rangefeed -source rangefeed.go -destination=mocks_generated.go .
pkg/kv/kvclient/kvcoord/transport.go://go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport
pkg/roachpb/api.go://go:generate mockgen -package=roachpb -destination=mocks_generated.go . InternalClient,Internal_RangeFeedClient
pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs
pkg/security/securitytest/securitytest.go://go:generate gofmt -s -w embedded.go
pkg/security/securitytest/securitytest.go://go:generate goimports -w embedded.go
Expand Down
30 changes: 28 additions & 2 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
load("@bazel_gomock//:gomock.bzl", "gomock")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
name = "kvcoord",
name = "kvcoord_base",
srcs = [
"batch.go",
"condensable_span_set.go",
Expand Down Expand Up @@ -31,7 +32,7 @@ go_library(
":gen-txnstate-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord",
visibility = ["//visibility:public"],
visibility = ["//visibility:private"],
deps = [
"//pkg/base",
"//pkg/gossip",
Expand Down Expand Up @@ -69,12 +70,36 @@ go_library(
],
)

# keep
go_library(
name = "kvcoord",
srcs = [":mocks_transport"],
embed = [":kvcoord_base"],
visibility = ["//visibility:public"],
deps = [
"@com_github_golang_mock//gomock",
"@org_golang_google_grpc//metadata",
],
)

gomock(
name = "mocks_transport",
out = "mocks_generated.go",
interfaces = [
"Transport",
],
library = ":kvcoord_base",
package = "kvcoord",
self_package = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord",
)

go_test(
name = "kvcoord_test",
size = "medium",
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
"integration_test.go",
Expand Down Expand Up @@ -146,6 +171,7 @@ go_test(
"@com_github_cockroachdb_errors//errutil",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_mock//gomock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (ds *DistSender) partialRangeFeed(
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
rangeInfo.token.Evict(ctx)
rangeInfo.token = rangecache.EvictionToken{}
Expand Down
154 changes: 154 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"context"
"io"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
gomock "github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// Tests that the range feed handles transport errors appropriately. In
// particular, that when encountering other decommissioned nodes it will refresh
// its range descriptor and retry, but if this node is decommissioned it will
// bail out. Regression test for:
// https://github.com/cockroachdb/cockroach/issues/66636
func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, spec := range []struct {
errorCode codes.Code
expectRetry bool
}{
{codes.FailedPrecondition, true}, // target node is decommissioned; retry
{codes.PermissionDenied, false}, // this node is decommissioned; abort
{codes.Unauthenticated, false}, // this node is not part of cluster; abort
} {
t.Run(spec.errorCode.String(), func(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)

desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
for _, repl := range desc.InternalReplicas {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(repl.NodeID),
newNodeDesc(repl.NodeID),
gossip.NodeDescriptorTTL,
))
}

ctrl := gomock.NewController(t)
transport := NewMockTransport(ctrl)
rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl)

// We start off with a cached lease on r1.
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
Sequence: 1,
}

// All nodes return the specified error code. We expect the range feed to
// keep trying all replicas in sequence regardless of error.
for _, repl := range desc.InternalReplicas {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
ctx, nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// Once all replicas have failed, it should try to refresh the lease using
// the range cache. We let this succeed once.
rangeDB.EXPECT().FirstRange().Return(&desc, nil)

// It then tries the replicas again. This time we just report the
// transport as exhausted immediately.
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// This invalidates the cache yet again. This time we error.
rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, ""))

// If we expect a range lookup retry, allow the retry to succeed by
// returning a range descriptor and a client that immediately
// cancels the context and closes the range feed stream.
if spec.expectRetry {
rangeDB.EXPECT().FirstRange().Return(&desc, nil)
stream := roachpb.NewMockInternal_RangeFeedClient(ctrl)
stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF)
client := roachpb.NewMockInternalClient(ctrl)
client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil)
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0])
transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil)
transport.EXPECT().Release()
}

ds := NewDistSender(DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) {
return transport, nil
},
},
RangeDescriptorDB: rangeDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
})
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

err := ds.RangeFeed(ctx, roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, hlc.Timestamp{}, false, nil)
require.Error(t, err)
})
}
}
131 changes: 131 additions & 0 deletions pkg/kv/kvclient/kvcoord/mocks_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/cockroachdb/errors"
)

//go:generate mockgen -package=kvcoord -destination=mocks_generated.go . Transport

// A SendOptions structure describes the algorithm for sending RPCs to one or
// more replicas, depending on error conditions and how many successful
// responses are required.
Expand Down
Loading

0 comments on commit 3a990da

Please sign in to comment.