From 7443202f970b25f59f082d687bf262935b83d32a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 25 Jan 2022 13:22:23 +0800 Subject: [PATCH] revert some unncessary change in processor. --- cdc/processor/processor.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 924aebb74c0..516dea3e402 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -364,7 +364,6 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR } func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) error { - p.changefeed = state // the processor could be in `processorClosing`, should not continue the processor if p.runningStatus == processorClosing { if err := p.handleErrorCh(ctx); err != nil { @@ -374,9 +373,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR zap.Error(err)) return errors.Trace(err) } - // since the sink is close in an asynchronous way, - // we have to check whether the sink is fully closed or not. - // no matter fully closed or not, just skip the tick. + // since the sink is close in an asynchronous way, we have to check + // whether the sink is fully closed or not, and skip the tick. select { case <-p.sinkClosedCh: p.runningStatus = processorClosed @@ -389,8 +387,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR return nil } - if !p.checkChangefeedRunnable() { - log.Info("changefeed not runnable", zap.String("changefeed", p.changefeed.ID)) + p.changefeed = state + if !p.checkChangefeedNormal() { return cerror.ErrAdminStopProcessor.GenWithStackByArgs() } // we should skip this tick after create a task position @@ -446,8 +444,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR return nil } -// checkChangefeedRunnable checks if the changefeed is runnable. -func (p *processor) checkChangefeedRunnable() bool { +// checkChangefeedNormal checks if the changefeed is runnable. +func (p *processor) checkChangefeedNormal() bool { // check the state in this tick, make sure that the admin job type of the changefeed is not stopped if p.changefeed.Info.AdminJobType.IsStopState() || p.changefeed.Status.AdminJobType.IsStopState() { return false