Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): Add backoff mechanism into changefeed restart logic #4262

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3345c28
add backoff machanism to changefeed restart
zhaoxinyu Jan 10, 2022
fa87da8
modify the backoff reset logic
zhaoxinyu Jan 10, 2022
af79247
add some logs to changefeed restart backoff logic
zhaoxinyu Jan 10, 2022
367d1fb
fix a lint problem
zhaoxinyu Jan 10, 2022
7be8e03
fix a lint problem and a failed ut
zhaoxinyu Jan 10, 2022
7549ec1
restrict the backoff reset logic
zhaoxinyu Jan 10, 2022
8ed0681
use camel case for zap log
zhaoxinyu Jan 11, 2022
633182e
address some comments
zhaoxinyu Jan 11, 2022
fd3bf8b
modify code comments
zhaoxinyu Jan 11, 2022
4718305
modify code comments
zhaoxinyu Jan 11, 2022
f1ed02f
add some comments and tune the default backoff params
zhaoxinyu Jan 11, 2022
32b65d2
modify some code comments
zhaoxinyu Jan 11, 2022
4a6f556
address comments
zhaoxinyu Jan 11, 2022
6a916a4
Merge branch 'master' into add_backoff_for_restarting_changefeed
ti-chi-bot Jan 11, 2022
3501b55
Merge branch 'master' into add_backoff_for_restarting_changefeed
ti-chi-bot Jan 11, 2022
c124c71
fix a failed unit test
zhaoxinyu Jan 11, 2022
176ee91
Merge branch 'add_backoff_for_restarting_changefeed' of github.com:zh…
zhaoxinyu Jan 11, 2022
918112e
modify unit test
zhaoxinyu Jan 11, 2022
916b78d
update the backoff initial interval
zhaoxinyu Jan 12, 2022
c55acd0
Merge branch 'master' into add_backoff_for_restarting_changefeed
ti-chi-bot Jan 12, 2022
08fbaaa
refine the error detection policy
zhaoxinyu Jan 12, 2022
2f3b9a8
Merge branch 'add_backoff_for_restarting_changefeed' of github.com:zh…
zhaoxinyu Jan 12, 2022
58cc47d
remove the dead comment
zhaoxinyu Jan 12, 2022
7a8b6fb
Merge branch 'master' into add_backoff_for_restarting_changefeed
overvenus Jan 12, 2022
6f94159
refine the changefeed stability detection policy
zhaoxinyu Jan 12, 2022
5e3665c
tune the changefeed stability checking logic
zhaoxinyu Jan 12, 2022
f1f24ed
modify code comment
zhaoxinyu Jan 12, 2022
d084e7e
Merge branch 'master' into add_backoff_for_restarting_changefeed
lonng Jan 14, 2022
c5887b5
tests: try fix kafka_sink_error_resume
overvenus Jan 14, 2022
a2244ee
Merge branch 'master' into add_backoff_for_restarting_changefeed
overvenus Jan 14, 2022
ef7c20e
modify lastErrorTime updating logic
zhaoxinyu Jan 14, 2022
df4b609
fix a lint problem
zhaoxinyu Jan 14, 2022
a731daf
modify a kafka integration test case
zhaoxinyu Jan 14, 2022
9107be7
tests: test resume after backoff
overvenus Jan 14, 2022
42a00b7
Merge branch 'add_backoff_for_restarting_changefeed' of github.com:zh…
overvenus Jan 14, 2022
f9bf410
Merge branch 'master' into add_backoff_for_restarting_changefeed
ti-chi-bot Jan 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 14 additions & 37 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ func (s FeedState) IsNeeded(need string) bool {
}

const (
// errorHistoryGCInterval represents how long we keep error record in changefeed info
errorHistoryGCInterval = time.Minute * 10

// errorHistoryCheckInterval represents time window for failure check
errorHistoryCheckInterval = time.Minute * 2

// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
ErrorHistoryThreshold = 1

// deltaBackoffInterval is added to the time window to safely ensure that
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
// no errors occurred in that window
deltaBackoffInterval = 30 * time.Second
)

// ChangeFeedInfo describes the detail of a ChangeFeed
Expand Down Expand Up @@ -266,6 +264,7 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}

return nil
}

Expand Down Expand Up @@ -374,28 +373,6 @@ func (info *ChangeFeedInfo) fixSinkProtocol() {
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval
})
info.ErrorHis = info.ErrorHis[i:]

if i > 0 {
needSave = true
}

i = sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold
return
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
Expand All @@ -404,26 +381,26 @@ func (info *ChangeFeedInfo) HasFastFailError() bool {
return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

// findActiveErrors finds all errors occurring within errorHistoryCheckInterval
func (info *ChangeFeedInfo) findActiveErrors() []int64 {
// findActiveErrors finds all errors occurring within backoffInterval
func (info *ChangeFeedInfo) findActiveErrors(backoffInterval time.Duration) []int64 {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
// ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
// ts is a errors occurrence time, here to find all errors occurring within a time window
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < backoffInterval+deltaBackoffInterval
})
return info.ErrorHis[i:]
}

// ErrorsReachedThreshold checks error history of a changefeed
// returns true if error counts reach threshold
func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool {
return len(info.findActiveErrors()) >= ErrorHistoryThreshold
func (info *ChangeFeedInfo) ErrorsReachedThreshold(backoffInterval time.Duration) bool {
return len(info.findActiveErrors(backoffInterval)) >= ErrorHistoryThreshold
}

// CleanUpOutdatedErrorHistory cleans up the outdated error history
// return true if the ErrorHis changed
func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool {
func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory(backoffInterval time.Duration) bool {
lastLenOfErrorHis := len(info.ErrorHis)
info.ErrorHis = info.findActiveErrors()
info.ErrorHis = info.findActiveErrors(backoffInterval)
return lastLenOfErrorHis != len(info.ErrorHis)
}
28 changes: 0 additions & 28 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,34 +641,6 @@ func TestChangeFeedInfoClone(t *testing.T) {
require.True(t, info.Config.EnableOldValue)
}

func TestCheckErrorHistory(t *testing.T) {
t.Parallel()

now := time.Now()
info := &ChangeFeedInfo{
ErrorHis: []int64{},
}
for i := 0; i < 5; i++ {
tm := now.Add(-errorHistoryGCInterval)
info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
for i := 0; i < ErrorHistoryThreshold-1; i++ {
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
needSave, canInit := info.CheckErrorHistory()
require.True(t, needSave)
require.True(t, canInit)
require.Equal(t, ErrorHistoryThreshold-1, len(info.ErrorHis))

info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
needSave, canInit = info.CheckErrorHistory()
require.False(t, needSave)
require.False(t, canInit)
}

func TestChangefeedInfoStringer(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: new(feedStateManager),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,

errCh: make(chan error, defaultErrChSize),
Expand Down
82 changes: 75 additions & 7 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -24,6 +25,18 @@ import (
"go.uber.org/zap"
)

const (
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
// When errors occurred and we need to do backoff, we start an exponential backoff
// with an interval from 10s to 30min (10s, 20s, 40s, ... 30min, 30min...).
// And when the duration (now - backoff start) exceeds 1 hour,
// the backoff will be stopped. To avoid thunderherd, a random factor is added to backoff interval.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 1 * time.Hour
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0
)

// feedStateManager manages the ReactorState of a changefeed
// when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
Expand All @@ -34,7 +47,33 @@ type feedStateManager struct {
// shouldBeRemoved = false means the changefeed is paused
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
adminJobQueue []*model.AdminJob
lastErrorTime time.Time
backoffInterval time.Duration
errBackoff *backoff.ExponentialBackOff
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

func (m *feedStateManager) resetErrBackoff() {
m.errBackoff.Reset()

m.backoffInterval = m.errBackoff.NextBackOff()
}

func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) {
Expand Down Expand Up @@ -145,9 +184,14 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
return
}
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrBackoff()
// The lastErrorTime also needs to be cleared before a fresh run.
m.lastErrorTime = time.Unix(0, 0)
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
jobsPending = true
m.patchState(model.StateNormal)
// remove error history to make sure the changefeed can running in next tick
// to make sure the changefeed can running in next tick,
// we remove the error history
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
Expand Down Expand Up @@ -287,7 +331,7 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
}
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
info.CleanUpOutdatedErrorHistory()
info.CleanUpOutdatedErrorHistory(m.backoffInterval)
return info, true, nil
})
m.shouldBeRunning = false
Expand All @@ -304,14 +348,38 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
}
changed := info.CleanUpOutdatedErrorHistory()
changed := info.CleanUpOutdatedErrorHistory(m.backoffInterval)
return info, changed || len(errs) > 0, nil
})

// if the number of errors has reached the error threshold, stop the changefeed
if m.state.Info.ErrorsReachedThreshold() {
if len(errs) > 0 {
m.lastErrorTime = time.Now()
if !m.state.Info.ErrorsReachedThreshold(m.backoffInterval) {
m.resetErrBackoff()
}
}

if m.lastErrorTime == time.Unix(0, 0) {
return
}

if time.Since(m.lastErrorTime) < m.backoffInterval {
m.shouldBeRunning = false
m.patchState(model.StateError)
return
} else {
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// if the duration since backoff start exceeds MaxElapsedTime,
// we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed.
if m.backoffInterval == backoff.Stop {
log.Warn("changefeed will not be restarted because it has been failing for a long time period",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime))
m.shouldBeRunning = false
m.patchState(model.StateFailed)
} else {
log.Info("changefeed restart backoff interval is changed", zap.String("changefeedID", m.state.ID),
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
zap.Duration("backoffInterval", m.backoffInterval))
}
}
}
10 changes: 5 additions & 5 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type feedStateManagerSuite struct{}
func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) {
func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) {
func (s *feedStateManagerSuite) TestHandleError(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) {
func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
manager := newFeedStateManager()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, map[string]string{
"/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/linkedin/goavro/v2"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/aws/aws-sdk-go v1.35.3
github.com/benbjohnson/clock v1.1.0
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.0.2
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
github.com/chaos-mesh/go-sqlsmith v0.0.0-20211025024535-03ae33408684
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwP
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60=
github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs=
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/cyclic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *CyclicConfig) IsEnabled() bool {
return c != nil && c.Enable
}

// Marshal returns the json marshal format of a ReplicationConfig
// Marshal returns the json marshal format of a CyclicConfig
func (c *CyclicConfig) Marshal() (string, error) {
cfg, err := json.Marshal(c)
if err != nil {
Expand All @@ -42,7 +42,7 @@ func (c *CyclicConfig) Marshal() (string, error) {
return string(cfg), nil
}

// Unmarshal unmarshals into *ReplicationConfig from json marshal byte slice
// Unmarshal unmarshals into *CyclicConfig from json marshal byte slice
func (c *CyclicConfig) Unmarshal(data []byte) error {
return json.Unmarshal(data, c)
}
10 changes: 5 additions & 5 deletions tests/mq_protocol_tests/framework/dsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"time"

backoff2 "github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,8 +90,8 @@ func (b *basicAwaitable) Wait() Checkable {
}
defer cancel()

backoff := backoff2.NewExponentialBackOff()
backoff.MaxInterval = waitMaxPollInterval
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxInterval = waitMaxPollInterval
for {
select {
case <-ctx.Done():
Expand All @@ -109,8 +109,8 @@ func (b *basicAwaitable) Wait() Checkable {
return b
}

interval := backoff.NextBackOff()
if interval == backoff2.Stop {
interval := expBackoff.NextBackOff()
if interval == backoff.Stop {
return &errorCheckableAndAwaitable{errors.New("Maximum retry interval reached")}
}
log.Debug("Wait(): pollable returned false, backing off", zap.Duration("interval", interval))
Expand Down