Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
restore: Make all download error as retryable (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm authored May 25, 2020
1 parent b25f71f commit 0b47b8f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
5 changes: 3 additions & 2 deletions pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
ErrRangeIsEmpty = errors.NewNoStackError("range is empty")
// ErrGRPC indicates any gRPC communication error. This error can be retried.
ErrGRPC = errors.NewNoStackError("gRPC error")
// ErrDownloadFailed indicates a generic, non-retryable download error.
// ErrDownloadFailed indicates a generic download error, expected to be
// retryable.
ErrDownloadFailed = errors.NewNoStackError("download sst failed")
// ErrIngestFailed indicates a generic, retryable ingest error.
ErrIngestFailed = errors.NewNoStackError("ingest sst failed")
Expand Down Expand Up @@ -72,7 +73,7 @@ func newDownloadSSTBackoffer() utils.Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
switch errors.Cause(err) {
case ErrGRPC, ErrEpochNotMatch, ErrIngestFailed:
case ErrGRPC, ErrEpochNotMatch, ErrDownloadFailed, ErrIngestFailed:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case ErrRangeIsEmpty, ErrRewriteRuleNotFound:
Expand Down
44 changes: 41 additions & 3 deletions pkg/restore/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"
"go.uber.org/multierr"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/restore"
Expand All @@ -30,6 +31,25 @@ func (s *testBackofferSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return restore.ErrGRPC
case 1:
return restore.ErrEpochNotMatch
case 2:
return nil
}
return nil
}, backoffer)
c.Assert(counter, Equals, 3)
c.Assert(err, IsNil)
}

func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
Expand All @@ -41,12 +61,19 @@ func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
case 1:
return restore.ErrEpochNotMatch
case 2:
return restore.ErrDownloadFailed
case 3:
return restore.ErrRangeIsEmpty
}
return nil
}, backoffer)
c.Assert(counter, Equals, 3)
c.Assert(err, Equals, restore.ErrRangeIsEmpty)
c.Assert(counter, Equals, 4)
c.Assert(multierr.Errors(err), DeepEquals, []error{
restore.ErrGRPC,
restore.ErrEpochNotMatch,
restore.ErrDownloadFailed,
restore.ErrRangeIsEmpty,
})
}

func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
Expand All @@ -57,5 +84,16 @@ func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
return restore.ErrEpochNotMatch
}, backoffer)
c.Assert(counter, Equals, 10)
c.Assert(err, Equals, restore.ErrEpochNotMatch)
c.Assert(multierr.Errors(err), DeepEquals, []error{
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
restore.ErrEpochNotMatch,
})
}
11 changes: 8 additions & 3 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -228,6 +229,7 @@ func (importer *FileImporter) Import(

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
regionLoop:
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
Expand All @@ -242,9 +244,12 @@ func (importer *FileImporter) Import(
return e
}, newDownloadSSTBackoffer())
if errDownload != nil {
if errDownload == ErrRewriteRuleNotFound || errDownload == ErrRangeIsEmpty {
// Skip this region
continue
for _, e := range multierr.Errors(errDownload) {
switch e {
case ErrRewriteRuleNotFound, ErrRangeIsEmpty:
// Skip this region
continue regionLoop
}
}
log.Error("download file failed",
zap.Stringer("file", file),
Expand Down
15 changes: 10 additions & 5 deletions pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package utils
import (
"context"
"time"

"go.uber.org/multierr"
)

// RetryableFunc presents a retryable operation.
Expand All @@ -18,25 +20,28 @@ type Backoffer interface {
Attempt() int
}

// WithRetry retrys a given operation with a backoff policy.
// WithRetry retries a given operation with a backoff policy.
//
// Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr containing all errors encountered.
func WithRetry(
ctx context.Context,
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
var lastErr error
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
if err != nil {
lastErr = err
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return lastErr
return allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
} else {
return nil
}
}
return lastErr
return allErrors
}

0 comments on commit 0b47b8f

Please sign in to comment.