diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 87158608f41..ee06b127197 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -17,7 +17,6 @@ import ( "encoding/json" "math" "regexp" - "sort" "time" "github.com/pingcap/errors" @@ -73,19 +72,6 @@ func (s FeedState) ToInt() int { return -1 } -const ( - // errorHistoryGCInterval represents how long we keep error record in changefeed info - errorHistoryGCInterval = time.Minute * 10 - - // errorHistoryCheckInterval represents time window for failure check - errorHistoryCheckInterval = time.Minute * 2 - - // ErrorHistoryThreshold represents failure upper limit in time window. - // Before a changefeed is initialized, check the the failure count of this - // changefeed, if it is less than ErrorHistoryThreshold, then initialize it. - ErrorHistoryThreshold = 3 -) - // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { SinkURI string `json:"sink-uri"` @@ -103,10 +89,9 @@ type ChangeFeedInfo struct { // but can be fetched for backward compatibility SortDir string `json:"sort-dir"` - Config *config.ReplicaConfig `json:"config"` - State FeedState `json:"state"` - ErrorHis []int64 `json:"history"` - Error *RunningError `json:"error"` + Config *config.ReplicaConfig `json:"config"` + State FeedState `json:"state"` + Error *RunningError `json:"error"` SyncPointEnabled bool `json:"sync-point-enabled"` SyncPointInterval time.Duration `json:"sync-point-interval"` @@ -283,28 +268,6 @@ func (info *ChangeFeedInfo) fixState() { } } -// CheckErrorHistory checks error history of a changefeed -// if having error record older than GC interval, set needSave to true. -// if error counts reach threshold, set canInit to false. -func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval - }) - info.ErrorHis = info.ErrorHis[i:] - - if i > 0 { - needSave = true - } - - i = sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold - return -} - // HasFastFailError returns true if the error in changefeed is fast-fail func (info *ChangeFeedInfo) HasFastFailError() bool { if info.Error == nil { @@ -312,27 +275,3 @@ func (info *ChangeFeedInfo) HasFastFailError() bool { } return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) } - -// findActiveErrors finds all errors occurring within errorHistoryCheckInterval -func (info *ChangeFeedInfo) findActiveErrors() []int64 { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - // ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - return info.ErrorHis[i:] -} - -// ErrorsReachedThreshold checks error history of a changefeed -// returns true if error counts reach threshold -func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool { - return len(info.findActiveErrors()) >= ErrorHistoryThreshold -} - -// CleanUpOutdatedErrorHistory cleans up the outdated error history -// return true if the ErrorHis changed -func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool { - lastLenOfErrorHis := len(info.ErrorHis) - info.ErrorHis = info.findActiveErrors() - return lastLenOfErrorHis != len(info.ErrorHis) -} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 43ea14afcae..b9d69130fab 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -218,33 +218,6 @@ type changefeedSuite struct{} var _ = check.Suite(&changefeedSuite{}) -func (s *changefeedSuite) TestCheckErrorHistory(c *check.C) { - defer testleak.AfterTest(c)() - now := time.Now() - info := &ChangeFeedInfo{ - ErrorHis: []int64{}, - } - for i := 0; i < 5; i++ { - tm := now.Add(-errorHistoryGCInterval) - info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - for i := 0; i < ErrorHistoryThreshold-1; i++ { - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - time.Sleep(time.Millisecond) - needSave, canInit := info.CheckErrorHistory() - c.Assert(needSave, check.IsTrue) - c.Assert(canInit, check.IsTrue) - c.Assert(info.ErrorHis, check.HasLen, ErrorHistoryThreshold-1) - - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - needSave, canInit = info.CheckErrorHistory() - c.Assert(needSave, check.IsFalse) - c.Assert(canInit, check.IsFalse) -} - func (s *changefeedSuite) TestChangefeedInfoStringer(c *check.C) { defer testleak.AfterTest(c)() info := &ChangeFeedInfo{ diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 31f633506c9..161320d019a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -75,7 +75,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { id: id, scheduler: newScheduler(), barriers: newBarriers(), - feedStateManager: new(feedStateManager), + feedStateManager: newFeedStateManager(), gcManager: gcManager, errCh: make(chan error, defaultErrChSize), @@ -112,7 +112,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor } else { code = string(cerror.ErrOwnerUnknown.RFCCode()) } - c.feedStateManager.HandleError(&model.RunningError{ + c.feedStateManager.handleError(&model.RunningError{ Addr: util.CaptureAddrFromCtx(ctx), Code: code, Message: err.Error(), diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 1d8319359a7..fb62ac49fb3 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -16,6 +16,7 @@ package owner import ( "time" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -23,13 +24,95 @@ import ( "go.uber.org/zap" ) +const ( + // When errors occurred and we need to do backoff, we start an exponential backoff + // with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s). + // And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff, + // the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min). + // To avoid thunderherd, a random factor is also added. + defaultBackoffInitInterval = 10 * time.Second + defaultBackoffMaxInterval = 30 * time.Minute + defaultBackoffMaxElapsedTime = 90 * time.Minute + defaultBackoffRandomizationFactor = 0.1 + defaultBackoffMultiplier = 2.0 + + // If all states recorded in window are 'normal', it can be assumed that the changfeed + // is running steady. And then if we enter a state other than normal at next tick, + // the backoff must be reset. + defaultStateWindowSize = 512 +) + // feedStateManager manages the ReactorState of a changefeed // when a error or a admin job occurs, the feedStateManager is responsible for controlling the ReactorState type feedStateManager struct { state *model.ChangefeedReactorState shouldBeRunning bool - adminJobQueue []*model.AdminJob + adminJobQueue []*model.AdminJob + stateHistory [defaultStateWindowSize]model.FeedState + lastErrorTime time.Time // time of last error for a changefeed + backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state + errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed +} + +// newFeedStateManager creates feedStateManager and initialize the exponential backoff +func newFeedStateManager() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = defaultBackoffInitInterval + f.errBackoff.MaxInterval = defaultBackoffMaxInterval + f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime + f.errBackoff.Multiplier = defaultBackoffMultiplier + f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// newFeedStateManager4Test creates feedStateManager for test +func newFeedStateManager4Test() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = 200 * time.Millisecond + f.errBackoff.MaxInterval = 1600 * time.Millisecond + f.errBackoff.MaxElapsedTime = 6 * time.Second + f.errBackoff.Multiplier = 2.0 + f.errBackoff.RandomizationFactor = 0 + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// resetErrBackoff reset the backoff-related fields +func (m *feedStateManager) resetErrBackoff() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() +} + +// isChangefeedStable check if there are states other than 'normal' in this sliding window. +func (m *feedStateManager) isChangefeedStable() bool { + for _, val := range m.stateHistory { + if val != model.StateNormal { + return false + } + } + + return true +} + +// shiftStateWindow shift the sliding window +func (m *feedStateManager) shiftStateWindow(state model.FeedState) { + for i := 0; i < defaultStateWindowSize-1; i++ { + m.stateHistory[i] = m.stateHistory[i+1] + } + + m.stateHistory[defaultStateWindowSize-1] = state } func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) { @@ -53,7 +136,7 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) { return } errs := m.errorsReportedByProcessors() - m.HandleError(errs...) + m.handleError(errs...) } func (m *feedStateManager) ShouldRunning() bool { @@ -132,16 +215,18 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { return } m.shouldBeRunning = true + // when the changefeed is manually resumed, we must reset the backoff + m.resetErrBackoff() + // The lastErrorTime also needs to be cleared before a fresh run. + m.lastErrorTime = time.Unix(0, 0) jobsPending = true m.patchState(model.StateNormal) - // remove error history to make sure the changefeed can running in next tick m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil } - if info.Error != nil || len(info.ErrorHis) != 0 { + if info.Error != nil { info.Error = nil - info.ErrorHis = nil return info, true, nil } return info, false, nil @@ -263,33 +348,73 @@ func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { return result } -func (m *feedStateManager) HandleError(errs ...*model.RunningError) { +func (m *feedStateManager) handleError(errs ...*model.RunningError) { + // if there are a fastFail error in errs, we can just fastFail the changefeed + // and no need to patch other error to the changefeed info + for _, err := range errs { + if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) { + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + if info == nil { + return nil, false, nil + } + info.Error = err + return info, true, nil + }) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + return + } + } + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil } for _, err := range errs { info.Error = err - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) } - needSave := info.CleanUpOutdatedErrorHistory() - return info, needSave || len(errs) > 0, nil + return info, len(errs) > 0, nil }) - var err *model.RunningError + + // If we enter into an abnormal state ('error', 'failed') for this changefeed now + // but haven't seen abnormal states in a sliding window (512 ticks), + // it can be assumed that this changefeed meets a sudden change from a stable condition. + // So we can reset the exponential backoff and re-backoff from the InitialInterval. + // TODO: this detection policy should be added into unit test. if len(errs) > 0 { - err = errs[len(errs)-1] + m.lastErrorTime = time.Now() + if m.isChangefeedStable() { + m.resetErrBackoff() + } + } else { + if m.state.Info.State == model.StateNormal { + m.lastErrorTime = time.Unix(0, 0) + } } - // if one of the error stored by changefeed state(error in the last tick) or the error specified by this function(error in the this tick) - // is a fast-fail error, the changefeed should be failed - if m.state.Info.HasFastFailError() || (err != nil && cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code))) { - m.shouldBeRunning = false - m.patchState(model.StateFailed) + m.shiftStateWindow(m.state.Info.State) + + if m.lastErrorTime == time.Unix(0, 0) { return } - // if the number of errors has reached the error threshold, stop the changefeed - if m.state.Info.ErrorsReachedThreshold() { + + if time.Since(m.lastErrorTime) < m.backoffInterval { m.shouldBeRunning = false m.patchState(model.StateError) - return + } else { + oldBackoffInterval := m.backoffInterval + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorTime = time.Unix(0, 0) + + // if the duration since backoff start exceeds MaxElapsedTime, + // we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed. + if m.backoffInterval == backoff.Stop { + log.Warn("changefeed will not be restarted because it has been failing for a long time period", + zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime)) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + } else { + log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID), + zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval)) + } } } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 4a9ec487745..19fb303fc94 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -14,6 +14,8 @@ package owner import ( + "time" + "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -30,7 +32,7 @@ type feedStateManagerSuite struct { func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -102,7 +104,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -130,7 +132,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -173,7 +175,7 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { func (s *feedStateManagerSuite) TestHandleError(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -187,25 +189,22 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { state.PatchTaskStatus(ctx.GlobalVars().CaptureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { return &model.TaskStatus{}, true, nil }) - state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { - return &model.TaskPosition{Error: &model.RunningError{ - Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "[CDC:ErrEtcdSessionDone]", - Message: "fake error for test", - }}, true, nil - }) + state.PatchTaskWorkload(ctx.GlobalVars().CaptureInfo.ID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { return model.TaskWorkload{}, true, nil }) tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() - c.Assert(manager.ShouldRunning(), check.IsTrue) - // error reported by processor in task position should be cleaned - c.Assert(state.TaskPositions[ctx.GlobalVars().CaptureInfo.ID].Error, check.IsNil) - // throw error more than history threshold to turn feed state into error - for i := 0; i < model.ErrorHistoryThreshold; i++ { + // the backoff will be stopped after 4600ms because 4600ms + 1600ms > 6000ms. + intervals := []time.Duration{200, 400, 800, 1600, 1600} + for i, d := range intervals { + intervals[i] = d * time.Millisecond + } + + for _, d := range intervals { + c.Assert(manager.ShouldRunning(), check.IsTrue) state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Error: &model.RunningError{ Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, @@ -216,17 +215,35 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsFalse) + time.Sleep(d) + manager.Tick(state) + tester.MustApplyPatches() } + c.Assert(manager.ShouldRunning(), check.IsFalse) - c.Assert(state.Info.State, check.Equals, model.StateError) + c.Assert(state.Info.State, check.Equals, model.StateFailed) c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop) + + // admin resume must retry changefeed immediately. + manager.PushAdminJob(&model.AdminJob{ + CfID: ctx.ChangefeedVars().ID, + Type: model.AdminResume, + Opts: &model.AdminJobOption{ForceRemove: false}, + }) + manager.Tick(state) + tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsTrue) + c.Assert(state.Info.State, check.Equals, model.StateNormal) + c.Assert(state.Info.AdminJobType, check.Equals, model.AdminNone) + c.Assert(state.Status.AdminJobType, check.Equals, model.AdminNone) } func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index bed095edf48..51e8a96ac70 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" diff --git a/go.mod b/go.mod index 3353be73e2a..385b5518ded 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/apache/pulsar-client-go v0.1.1 github.com/benbjohnson/clock v1.3.0 github.com/bradleyjkemp/grpc-tools v0.2.5 - github.com/cenkalti/backoff v2.2.1+incompatible + github.com/cenkalti/backoff/v4 v4.0.2 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index f1f665d98eb..c1a0164fce1 100644 --- a/go.sum +++ b/go.sum @@ -89,8 +89,7 @@ github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwP github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60= github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= diff --git a/integration/framework/dsl.go b/integration/framework/dsl.go index 547e18bd625..fd71b4cebba 100644 --- a/integration/framework/dsl.go +++ b/integration/framework/dsl.go @@ -17,7 +17,7 @@ import ( "context" "time" - backoff2 "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" @@ -90,8 +90,8 @@ func (b *basicAwaitable) Wait() Checkable { } defer cancel() - backoff := backoff2.NewExponentialBackOff() - backoff.MaxInterval = waitMaxPollInterval + expBackoff := backoff.NewExponentialBackOff() + expBackoff.MaxInterval = waitMaxPollInterval for { select { case <-ctx.Done(): @@ -109,8 +109,8 @@ func (b *basicAwaitable) Wait() Checkable { return b } - interval := backoff.NextBackOff() - if interval == backoff2.Stop { + interval := expBackoff.NextBackOff() + if interval == backoff.Stop { return &errorCheckableAndAwaitable{errors.New("Maximum retry interval reached")} } log.Debug("Wait(): pollable returned false, backing off", zap.Duration("interval", interval)) diff --git a/pkg/config/cyclic.go b/pkg/config/cyclic.go index bba476912a4..bff55c546b9 100644 --- a/pkg/config/cyclic.go +++ b/pkg/config/cyclic.go @@ -33,7 +33,7 @@ func (c *CyclicConfig) IsEnabled() bool { return c != nil && c.Enable } -// Marshal returns the json marshal format of a ReplicationConfig +// Marshal returns the json marshal format of a CyclicConfig func (c *CyclicConfig) Marshal() (string, error) { cfg, err := json.Marshal(c) if err != nil { @@ -42,7 +42,7 @@ func (c *CyclicConfig) Marshal() (string, error) { return string(cfg), nil } -// Unmarshal unmarshals into *ReplicationConfig from json marshal byte slice +// Unmarshal unmarshals into *CyclicConfig from json marshal byte slice func (c *CyclicConfig) Unmarshal(data []byte) error { return json.Unmarshal(data, c) } diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index bd8bc41a34a..089c83a7043 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -18,7 +18,7 @@ import ( "math" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" ) diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index 1020d5cd233..7a8c5ccbb3e 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -39,7 +39,9 @@ function run() { TOPIC_NAME="ticdc-kafka-sink-error-resume-test-$RANDOM" SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=4*return(true)' + # Return an failpoint error to fail a kafka changefeed. + # Note we return one error for the failpoint, if owner retry changefeed frequently, it may break the test. + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"