Skip to content

Commit

Permalink
restore: replace ErrGRPC by recognizing raw gRPC errors (pingcap#344)
Browse files Browse the repository at this point in the history
Co-authored-by: kennytm <[email protected]>
  • Loading branch information
sre-bot and kennytm authored Jun 10, 2020
1 parent 6f9cfba commit b03f422
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
20 changes: 13 additions & 7 deletions pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/utils"
)
Expand All @@ -25,8 +27,6 @@ var (
// ErrRangeIsEmpty is the error raised when download failed with "range is
// empty". This error cannot be retried.
ErrRangeIsEmpty = errors.NewNoStackError("range is empty")
// ErrGRPC indicates any gRPC communication error. This error can be retried.
ErrGRPC = errors.NewNoStackError("gRPC error")
// ErrDownloadFailed indicates a generic download error, expected to be
// retryable.
ErrDownloadFailed = errors.NewNoStackError("download sst failed")
Expand Down Expand Up @@ -73,18 +73,24 @@ func newDownloadSSTBackoffer() utils.Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
switch errors.Cause(err) {
case ErrGRPC, ErrEpochNotMatch, ErrDownloadFailed, ErrIngestFailed:
case ErrEpochNotMatch, ErrDownloadFailed, ErrIngestFailed:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case ErrRangeIsEmpty, ErrRewriteRuleNotFound:
// Excepted error, finish the operation
bo.delayTime = 0
bo.attempt = 0
default:
// Unexcepted error
bo.delayTime = 0
bo.attempt = 0
log.Warn("unexcepted error, stop to retry", zap.Error(err))
switch status.Code(err) {
case codes.Unavailable, codes.Aborted:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
// Unexcepted error
bo.delayTime = 0
bo.attempt = 0
log.Warn("unexcepted error, stop to retry", zap.Error(err))
}
}
if bo.delayTime > bo.maxDelayTime {
return bo.maxDelayTime
Expand Down
23 changes: 20 additions & 3 deletions pkg/restore/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/restore"
Expand Down Expand Up @@ -38,7 +40,7 @@ func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
defer func() { counter++ }()
switch counter {
case 0:
return restore.ErrGRPC
return status.Error(codes.Unavailable, "transport is closing")
case 1:
return restore.ErrEpochNotMatch
case 2:
Expand All @@ -53,11 +55,12 @@ func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return restore.ErrGRPC
return gRPCError
case 1:
return restore.ErrEpochNotMatch
case 2:
Expand All @@ -69,13 +72,27 @@ func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
}, backoffer)
c.Assert(counter, Equals, 4)
c.Assert(multierr.Errors(err), DeepEquals, []error{
restore.ErrGRPC,
gRPCError,
restore.ErrEpochNotMatch,
restore.ErrDownloadFailed,
restore.ErrRangeIsEmpty,
})
}

func (s *testBackofferSuite) TestBackoffWithFatalRawGRPCError(c *C) {
var counter int
canceledError := status.Error(codes.Canceled, "context canceled")
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return canceledError
}, backoffer)
c.Assert(counter, Equals, 1)
c.Assert(multierr.Errors(err), DeepEquals, []error{
canceledError,
})
}

func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (importer *FileImporter) downloadSST(
for _, peer := range regionInfo.Region.GetPeers() {
resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req)
if err != nil {
return nil, errors.Annotatef(ErrGRPC, "%s", err)
return nil, errors.Trace(err)
}
if resp.GetError() != nil {
return nil, errors.Annotate(ErrDownloadFailed, resp.GetError().GetMessage())
Expand Down Expand Up @@ -439,7 +439,7 @@ func (importer *FileImporter) downloadRawKVSST(
for _, peer := range regionInfo.Region.GetPeers() {
resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req)
if err != nil {
return nil, errors.Annotatef(ErrGRPC, "%s", err)
return nil, errors.Trace(err)
}
if resp.GetError() != nil {
return nil, errors.Annotate(ErrDownloadFailed, resp.GetError().GetMessage())
Expand Down

0 comments on commit b03f422

Please sign in to comment.