Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): do not resign owner when ErrNotOwner is encountered (#9396) #9408

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()

g, stdCtx := errgroup.WithContext(stdCtx)
stdCtx, cancel := context.WithCancel(stdCtx)

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
// when the campaignOwner returns an error, it means that the owner throws
// an unrecoverable serious errors (recoverable errors are intercepted in the owner tick)
Expand All @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error {
})

g.Go(func() error {
// Processor manager should be closed as soon as possible to prevent double write issue.
defer func() {
if cancel != nil {
// Propagate the cancel signal to the owner and other goroutines.
cancel()
}
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand Down Expand Up @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.upstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
})

err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String())
c.owner.AsyncStop()
c.setOwner(nil)

// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
if !cerror.ErrNotOwner.Equal(err) {
// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
}
cancel()

log.Info("owner resigned successfully",
zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev))
Expand Down Expand Up @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() {

c.captureMu.Lock()
defer c.captureMu.Unlock()
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))

c.grpcService.Reset(nil)
if c.MessageRouter != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election {
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
func (e *electionImpl) campaign(ctx context.Context, val string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
return e.election.Campaign(ctx, val)
}

func (e *electionImpl) resign(ctx context.Context) error {
Expand Down
8 changes: 0 additions & 8 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() {
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -667,7 +667,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID, ctx)
}
log.Info("All processors are closed in processor manager")
// FIXME: we should drain command queue and signal callers an error.
return cerrors.ErrReactorFinished
case commandTpWriteDebugInfo:
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
}, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand Down
3 changes: 2 additions & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err != nil {
// This error means owner is resigned by itself,
// and we should exit etcd worker and campaign owner again.
return nil
return err
}
}

Expand Down Expand Up @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error {
return errors.Trace(err)
}
}
worker.state.UpdatePendingChange()

worker.pendingUpdates = worker.pendingUpdates[:0]
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type bankReactorState struct {

const bankTestPrefix = "/ticdc/test/bank/"

func (b *bankReactorState) UpdatePendingChange() {
}

func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix))
indexStr := key.String()[len(bankTestPrefix):]
Expand Down
9 changes: 9 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) UpdatePendingChange() {
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
Expand Down Expand Up @@ -283,6 +286,9 @@ type intReactorState struct {
lastVal int
}

func (s *intReactorState) UpdatePendingChange() {
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
Expand Down Expand Up @@ -372,6 +378,9 @@ type commonReactorState struct {
pendingPatches []DataPatch
}

func (s *commonReactorState) UpdatePendingChange() {
}

func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
s.state[key.String()] = string(value)
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte, isInit bool) error

// UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes.
UpdatePendingChange()

// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
// a slice of DataPatch will be committed as one ETCD txn
Expand Down
57 changes: 45 additions & 12 deletions pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package orchestrator
import (
"encoding/json"
"reflect"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -26,6 +27,8 @@ import (
"go.uber.org/zap"
)

const defaultCaptureRemoveTTL = 5

// GlobalReactorState represents a global state which stores all key-value pairs in ETCD
type GlobalReactorState struct {
ClusterID string
Expand All @@ -39,16 +42,44 @@ type GlobalReactorState struct {
// to be called when captures are added and removed.
onCaptureAdded func(captureID model.CaptureID, addr string)
onCaptureRemoved func(captureID model.CaptureID)

captureRemoveTTL int
toRemoveCaptures map[model.CaptureID]time.Time
}

// NewGlobalState creates a new global state
func NewGlobalState(clusterID string) *GlobalReactorState {
// NewGlobalState creates a new global state.
func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState {
captureRemoveTTL := captureSessionTTL / 2
if captureRemoveTTL < defaultCaptureRemoveTTL {
captureRemoveTTL = defaultCaptureRemoveTTL
}
return &GlobalReactorState{
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
captureRemoveTTL: captureRemoveTTL,
toRemoveCaptures: make(map[model.CaptureID]time.Time),
}
}

// NewGlobalStateForTest creates a new global state for test.
func NewGlobalStateForTest(clusterID string) *GlobalReactorState {
return NewGlobalState(clusterID, 0)
}

// UpdatePendingChange implements the ReactorState interface
func (s *GlobalReactorState) UpdatePendingChange() {
for c, t := range s.toRemoveCaptures {
if time.Since(t) >= time.Duration(s.captureRemoveTTL)*time.Second {
log.Info("remote capture offline", zap.Any("info", s.Captures[c]))
delete(s.Captures, c)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(c)
}
delete(s.toRemoveCaptures, c)
}
}
}

Expand All @@ -59,6 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
if err != nil {
return errors.Trace(err)
}

switch k.Tp {
case etcd.CDCKeyTypeOwner:
if value != nil {
Expand All @@ -69,11 +101,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
return nil
case etcd.CDCKeyTypeCapture:
if value == nil {
log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID]))
delete(s.Captures, k.CaptureID)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(k.CaptureID)
}
log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID]))
s.toRemoveCaptures[k.CaptureID] = time.Now()
return nil
}

Expand Down Expand Up @@ -174,6 +203,10 @@ func NewChangefeedReactorState(clusterID string,
}
}

// UpdatePendingChange implements the ReactorState interface
func (s *ChangefeedReactorState) UpdatePendingChange() {
}

// Update implements the ReactorState interface
func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error {
k := new(etcd.CDCKey)
Expand Down
Loading