Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#54084
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
RidRisR authored and ti-chi-bot committed Jun 19, 2024
1 parent b75a314 commit 53be550
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 3 deletions.
23 changes: 23 additions & 0 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func TestPaginateScanLeader(t *testing.T) {
assertRegions(t, collectedRegions, "", "aay", "bba")
}

<<<<<<< HEAD:br/pkg/restore/import_retry_test.go
func TestImportKVFiles(t *testing.T) {
var (
importer = restore.FileImporter{}
Expand Down Expand Up @@ -596,4 +597,26 @@ func TestFilterFilesByRegion(t *testing.T) {
require.Equal(t, err, c.err)
require.Equal(t, subfile, c.subfiles)
}
=======
func TestRetryRecognizeErrCode(t *testing.T) {
waitTime := 1 * time.Millisecond
maxWaitTime := 16 * time.Millisecond
ctx := context.Background()
inner := 0
outer := 0
utils.WithRetry(ctx, func() error {
e := utils.WithRetry(ctx, func() error {
inner++
e := status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3")
if e != nil {
return errors.Trace(e)
}
return nil
}, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("download sst", 3)))
outer++
return errors.Trace(e)
}, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("import sst", 3)))
require.Equal(t, 10, outer)
require.Equal(t, 100, inner)
>>>>>>> c12bf3fa024 (br: fix backoffer can't handle multierrs (#54084)):br/pkg/restore/log_client/import_retry_test.go
}
121 changes: 121 additions & 0 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "log_client",
srcs = [
"client.go",
"import.go",
"import_retry.go",
"log_file_manager.go",
"log_file_map.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/restore/log_client",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/checkpoint",
"//br/pkg/checksum",
"//br/pkg/conn",
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/restore",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/internal/log_split",
"//br/pkg/restore/internal/rawkv",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
"//pkg/domain",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/redact",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "log_client_test",
timeout = "short",
srcs = [
"client_test.go",
"export_test.go",
"import_retry_test.go",
"import_test.go",
"log_file_manager_test.go",
"log_file_map_test.go",
"main_test.go",
],
embed = [":log_client"],
flaky = True,
shard_count = 39,
deps = [
"//br/pkg/errors",
"//br/pkg/gluetidb",
"//br/pkg/mock",
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/kv",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"//pkg/util/codec",
"//pkg/util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
9 changes: 6 additions & 3 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -169,12 +170,14 @@ func NewBackupSSTBackoffer() Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
// we don't care storeID here.
res := bo.errContext.HandleErrorMsg(err.Error(), 0)
errs := multierr.Errors(err)
lastErr := errs[len(errs)-1]
res := bo.errContext.HandleErrorMsg(lastErr.Error(), 0)
if res.Strategy == RetryStrategy {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
e := errors.Cause(err)
e := errors.Cause(lastErr)
switch e { // nolint:errorlint
case berrors.ErrKVEpochNotMatch, berrors.ErrKVDownloadFailed, berrors.ErrKVIngestFailed, berrors.ErrPDLeaderNotFound:
bo.delayTime = 2 * bo.delayTime
Expand All @@ -189,7 +192,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case codes.Canceled:
if isGRPCCancel(err) {
if isGRPCCancel(lastErr) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
Expand Down

0 comments on commit 53be550

Please sign in to comment.