Skip to content

Commit

Permalink
sink (ticdc): add retry backoff and limit for sinkManager (#9322)
Browse files Browse the repository at this point in the history
close #9272
  • Loading branch information
asddongmen authored Jul 3, 2023
1 parent ebb2a0f commit a0f173d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
51 changes: 48 additions & 3 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sinkmanager
import (
"context"
"math"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -49,6 +50,8 @@ const (
// engine.CleanByTable can be expensive. So it's necessary to reduce useless calls.
cleanTableInterval = 5 * time.Second
cleanTableMinEvents = 128
maxRetryDuration = 30 * time.Minute
errGCInterval = 10 * time.Minute
)

// TableStats of a table sink.
Expand All @@ -58,6 +61,13 @@ type TableStats struct {
BarrierTs model.Ts
}

type sinkRetry struct {
// To control the error retry.
lastInternalError error
firstRetryTime time.Time
lastErrorRetryTime time.Time
}

// SinkManager is the implementation of SinkManager.
type SinkManager struct {
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -96,7 +106,7 @@ type SinkManager struct {
sinkWorkerAvailable chan struct{}
// sinkMemQuota is used to control the total memory usage of the table sink.
sinkMemQuota *memquota.MemQuota

sinkRetry sinkRetry
// redoWorkers used to pull data from source manager.
redoWorkers []*redoWorker
// redoTaskChan is used to send tasks to redoWorkers.
Expand Down Expand Up @@ -141,6 +151,11 @@ func New(
sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum),
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: sinkRetry{
lastInternalError: nil,
firstRetryTime: time.Now(),
lastErrorRetryTime: time.Now(),
},

metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand All @@ -164,6 +179,7 @@ func New(
}

m.ready = make(chan struct{})

return m
}

Expand Down Expand Up @@ -264,6 +280,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
sinkFactoryErrors = make(chan error, 16)
}

// If the error is retryable, we should retry to re-establish the internal resources.
if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
select {
case <-m.managerCtx.Done():
Expand All @@ -272,13 +289,41 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
} else {
return errors.Trace(err)
}
// Use a 5 second backoff when re-establishing internal resources.
if err = util.Hang(m.managerCtx, 5*time.Second); err != nil {

backoff, err := m.getRetryBackoff(err)
if err != nil {
return errors.Trace(err)
}

if err = util.Hang(m.managerCtx, backoff); err != nil {
return errors.Trace(err)
}
}
}

// getRetryBackoff returns the backoff duration for retrying the last error.
// If the retry time is exhausted, it returns the an ChangefeedUnRetryableError.
func (m *SinkManager) getRetryBackoff(err error) (time.Duration, error) {
// reset firstRetryTime when the last error is too long ago
// it means the last error is retry success, and the sink is running well for some time
if m.sinkRetry.lastInternalError == nil ||
time.Since(m.sinkRetry.lastErrorRetryTime) >= errGCInterval {
m.sinkRetry.firstRetryTime = time.Now()
}

// return an unretryable error if retry time is exhausted
if time.Since(m.sinkRetry.firstRetryTime) >= maxRetryDuration {
return 0, cerror.WrapChangefeedUnretryableErr(err)
}

m.sinkRetry.lastInternalError = err
m.sinkRetry.lastErrorRetryTime = time.Now()

// interval is in range [5s, 30s)
interval := time.Second * time.Duration(rand.Int63n(25)+5)
return interval, nil
}

func (m *SinkManager) initSinkFactory(errCh chan error) error {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
Expand Down
26 changes: 26 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"errors"
"math"
"testing"
"time"
Expand Down Expand Up @@ -356,3 +357,28 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestGetRetryBackoff(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

backoff, err := manager.getRetryBackoff(errors.New("test"))
require.NoError(t, err)
require.Less(t, backoff, 30*time.Second)
time.Sleep(500 * time.Millisecond)
elapsedTime := time.Since(manager.sinkRetry.firstRetryTime)

// mock time to test reset error backoff
manager.sinkRetry.lastErrorRetryTime = time.Unix(0, 0)
_, err = manager.getRetryBackoff(errors.New("test"))
require.NoError(t, err)
require.Less(t, time.Since(manager.sinkRetry.firstRetryTime), elapsedTime)
}

0 comments on commit a0f173d

Please sign in to comment.