Skip to content

Commit

Permalink
revert some unncessary change in processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 25, 2022
1 parent b4c3ce0 commit 7443202
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
}

func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) error {
p.changefeed = state
// the processor could be in `processorClosing`, should not continue the processor
if p.runningStatus == processorClosing {
if err := p.handleErrorCh(ctx); err != nil {
Expand All @@ -374,9 +373,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
zap.Error(err))
return errors.Trace(err)
}
// since the sink is close in an asynchronous way,
// we have to check whether the sink is fully closed or not.
// no matter fully closed or not, just skip the tick.
// since the sink is close in an asynchronous way, we have to check
// whether the sink is fully closed or not, and skip the tick.
select {
case <-p.sinkClosedCh:
p.runningStatus = processorClosed
Expand All @@ -389,8 +387,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
return nil
}

if !p.checkChangefeedRunnable() {
log.Info("changefeed not runnable", zap.String("changefeed", p.changefeed.ID))
p.changefeed = state
if !p.checkChangefeedNormal() {
return cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
// we should skip this tick after create a task position
Expand Down Expand Up @@ -446,8 +444,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
return nil
}

// checkChangefeedRunnable checks if the changefeed is runnable.
func (p *processor) checkChangefeedRunnable() bool {
// checkChangefeedNormal checks if the changefeed is runnable.
func (p *processor) checkChangefeedNormal() bool {
// check the state in this tick, make sure that the admin job type of the changefeed is not stopped
if p.changefeed.Info.AdminJobType.IsStopState() || p.changefeed.Status.AdminJobType.IsStopState() {
return false
Expand Down

0 comments on commit 7443202

Please sign in to comment.