diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 7c412f30bd7..a5b7e9e49a2 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error { }() g, stdCtx := errgroup.WithContext(stdCtx) + stdCtx, cancel := context.WithCancel(stdCtx) + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error { SorterSystem: c.sorterSystem, SortEngineFactory: c.sortEngineFactory, }) - g.Go(func() error { // when the campaignOwner returns an error, it means that the owner throws // an unrecoverable serious errors (recoverable errors are intercepted in the owner tick) @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error { }) g.Go(func() error { + // Processor manager should be closed as soon as possible to prevent double write issue. + defer func() { + if cancel != nil { + // Propagate the cancel signal to the owner and other goroutines. + cancel() + } + if c.processorManager != nil { + c.processorManager.AsyncClose() + } + log.Info("processor manager closed", zap.String("captureID", c.info.ID)) + }() processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval) - globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) + globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - rootErr := errors.Cause(err) if rootErr == context.Canceled { return nil @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { owner := c.newOwner(c.upstreamManager) c.setOwner(owner) - globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) + globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } }) - err = c.runEtcdWorker(ownerCtx, owner, - orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()), - ownerFlushInterval, util.RoleOwner.String()) + err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String()) c.owner.AsyncStop() c.setOwner(nil) - // if owner exits, resign the owner key, - // use a new context to prevent the context from being cancelled. - resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if resignErr := c.resign(resignCtx); resignErr != nil { - if errors.Cause(resignErr) != context.DeadlineExceeded { - log.Info("owner resign failed", zap.String("captureID", c.info.ID), + if !cerror.ErrNotOwner.Equal(err) { + // if owner exits, resign the owner key, + // use a new context to prevent the context from being cancelled. + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if resignErr := c.resign(resignCtx); resignErr != nil { + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("owner resign failed", zap.String("captureID", c.info.ID), + zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() + return errors.Trace(resignErr) + } + + log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) - cancel() - return errors.Trace(resignErr) } - - log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), - zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() } - cancel() log.Info("owner resigned successfully", zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev)) @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() { c.captureMu.Lock() defer c.captureMu.Unlock() - if c.processorManager != nil { - c.processorManager.AsyncClose() - } - log.Info("processor manager closed", zap.String("captureID", c.info.ID)) c.grpcService.Reset(nil) if c.MessageRouter != nil { diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 9012d78e596..6388b1f0696 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election { } } -func (e *electionImpl) campaign(ctx context.Context, key string) error { +func (e *electionImpl) campaign(ctx context.Context, val string) error { failpoint.Inject("capture-campaign-compacted-error", func() { failpoint.Return(errors.Trace(mvcc.ErrCompacted)) }) - return e.election.Campaign(ctx, key) + return e.election.Campaign(ctx, val) } func (e *electionImpl) resign(ctx context.Context) error { diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 9b4756d551c..96d80ba934b 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID). Set(float64(cf.state.Info.State.ToInt())) } - - // The InfoProvider is a proxy object returning information - // from the scheduler. - infoProvider := cf.GetInfoProvider() - if infoProvider == nil { - // The scheduler has not been initialized yet. - continue - } } } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 3803928185e..a8f5a4d9e11 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches o := owner.(*ownerImpl) o.upstreamManager = upstream.NewManager4Test(pdClient) - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID) tester := orchestrator.NewReactorStateTester(t, state, nil) // set captures @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) tester := orchestrator.NewReactorStateTester(t, state, nil) // no changefeed, the gc safe point should be max uint64 @@ -667,7 +667,7 @@ WorkLoop: } func TestCalculateGCSafepointTs(t *testing.T) { - state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) expectMinTsMap := make(map[uint64]uint64) expectForceUpdateMap := make(map[uint64]interface{}) o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)} diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 08416785177..9d13e3c5cc8 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { for changefeedID := range m.processors { m.closeProcessor(changefeedID, ctx) } + log.Info("All processors are closed in processor manager") // FIXME: we should drain command queue and signal callers an error. return cerrors.ErrReactorFinished case commandTpWriteDebugInfo: diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 6da2ed12439..4835dbd43ff 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { checkpointTs: replicaInfo.StartTs, }, nil }, &s.liveness) - s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) + s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0) captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{ diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f71f425ac1c..0a1e9e67115 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err != nil { // This error means owner is resigned by itself, // and we should exit etcd worker and campaign owner again. - return nil + return err } } @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error { return errors.Trace(err) } } + worker.state.UpdatePendingChange() worker.pendingUpdates = worker.pendingUpdates[:0] return nil diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 6448521974d..81438e7f02d 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -43,6 +43,9 @@ type bankReactorState struct { const bankTestPrefix = "/ticdc/test/bank/" +func (b *bankReactorState) UpdatePendingChange() { +} + func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix)) indexStr := key.String()[len(bankTestPrefix):] diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index e4038c487f7..95c49907b00 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -147,6 +147,9 @@ func (s *simpleReactorState) SetSum(sum int) { s.patches = append(s.patches, patch) } +func (s *simpleReactorState) UpdatePendingChange() { +} + func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { subMatches := keyParseRegexp.FindSubmatch(key.Bytes()) if len(subMatches) != 2 { @@ -283,6 +286,9 @@ type intReactorState struct { lastVal int } +func (s *intReactorState) UpdatePendingChange() { +} + func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { var err error s.val, err = strconv.Atoi(string(value)) @@ -372,6 +378,9 @@ type commonReactorState struct { pendingPatches []DataPatch } +func (s *commonReactorState) UpdatePendingChange() { +} + func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { s.state[key.String()] = string(value) return nil diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 20452cd6f26..7bb6d7cf3e0 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -35,6 +35,9 @@ type ReactorState interface { // Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state. Update(key util.EtcdKey, value []byte, isInit bool) error + // UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes. + UpdatePendingChange() + // GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes // that a Reactor wants to apply to Etcd. // a slice of DataPatch will be committed as one ETCD txn diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index 18fb88c353c..17300c59ecb 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -16,6 +16,7 @@ package orchestrator import ( "encoding/json" "reflect" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -26,6 +27,8 @@ import ( "go.uber.org/zap" ) +const defaultCaptureRemoveTTL = 5 + // GlobalReactorState represents a global state which stores all key-value pairs in ETCD type GlobalReactorState struct { ClusterID string @@ -39,16 +42,44 @@ type GlobalReactorState struct { // to be called when captures are added and removed. onCaptureAdded func(captureID model.CaptureID, addr string) onCaptureRemoved func(captureID model.CaptureID) + + captureRemoveTTL int + toRemoveCaptures map[model.CaptureID]time.Time } -// NewGlobalState creates a new global state -func NewGlobalState(clusterID string) *GlobalReactorState { +// NewGlobalState creates a new global state. +func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState { + captureRemoveTTL := captureSessionTTL / 2 + if captureRemoveTTL < defaultCaptureRemoveTTL { + captureRemoveTTL = defaultCaptureRemoveTTL + } return &GlobalReactorState{ - ClusterID: clusterID, - Owner: map[string]struct{}{}, - Captures: make(map[model.CaptureID]*model.CaptureInfo), - Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo), - Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState), + ClusterID: clusterID, + Owner: map[string]struct{}{}, + Captures: make(map[model.CaptureID]*model.CaptureInfo), + Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo), + Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState), + captureRemoveTTL: captureRemoveTTL, + toRemoveCaptures: make(map[model.CaptureID]time.Time), + } +} + +// NewGlobalStateForTest creates a new global state for test. +func NewGlobalStateForTest(clusterID string) *GlobalReactorState { + return NewGlobalState(clusterID, 0) +} + +// UpdatePendingChange implements the ReactorState interface +func (s *GlobalReactorState) UpdatePendingChange() { + for c, t := range s.toRemoveCaptures { + if time.Since(t) >= time.Duration(s.captureRemoveTTL)*time.Second { + log.Info("remote capture offline", zap.Any("info", s.Captures[c])) + delete(s.Captures, c) + if s.onCaptureRemoved != nil { + s.onCaptureRemoved(c) + } + delete(s.toRemoveCaptures, c) + } } } @@ -59,6 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro if err != nil { return errors.Trace(err) } + switch k.Tp { case etcd.CDCKeyTypeOwner: if value != nil { @@ -69,11 +101,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro return nil case etcd.CDCKeyTypeCapture: if value == nil { - log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID])) - delete(s.Captures, k.CaptureID) - if s.onCaptureRemoved != nil { - s.onCaptureRemoved(k.CaptureID) - } + log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID])) + s.toRemoveCaptures[k.CaptureID] = time.Now() return nil } @@ -174,6 +203,10 @@ func NewChangefeedReactorState(clusterID string, } } +// UpdatePendingChange implements the ReactorState interface +func (s *ChangefeedReactorState) UpdatePendingChange() { +} + // Update implements the ReactorState interface func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error { k := new(etcd.CDCKey) diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 0eddd6e023e..da217d9066a 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -427,10 +427,13 @@ func TestPatchTaskPosition(t *testing.T) { } func TestGlobalStateUpdate(t *testing.T) { + t.Parallel() + testCases := []struct { updateKey []string updateValue []string expected GlobalReactorState + timeout int }{ { // common case updateKey: []string{ @@ -512,13 +515,14 @@ func TestGlobalStateUpdate(t *testing.T) { `55551111`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, -"admin-job-type":0}`, + "admin-job-type":0}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, -"admin-job-type":0}`, + "admin-job-type":0}`, ``, ``, ``, }, + timeout: 6, expected: GlobalReactorState{ ClusterID: etcd.DefaultCDCClusterID, Owner: map[string]struct{}{"22317526c4fc9a38": {}}, @@ -540,7 +544,7 @@ func TestGlobalStateUpdate(t *testing.T) { }, } for _, tc := range testCases { - state := NewGlobalState(etcd.DefaultCDCClusterID) + state := NewGlobalState(etcd.DefaultCDCClusterID, 10) for i, k := range tc.updateKey { value := []byte(tc.updateValue[i]) if len(value) == 0 { @@ -549,13 +553,17 @@ func TestGlobalStateUpdate(t *testing.T) { err := state.Update(util.NewEtcdKey(k), value, false) require.Nil(t, err) } + time.Sleep(time.Duration(tc.timeout) * time.Second) + state.UpdatePendingChange() require.True(t, cmp.Equal(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})), cmp.Diff(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{}))) } } func TestCaptureChangeHooks(t *testing.T) { - state := NewGlobalState(etcd.DefaultCDCClusterID) + t.Parallel() + + state := NewGlobalState(etcd.DefaultCDCClusterID, 10) var callCount int state.onCaptureAdded = func(captureID model.CaptureID, addr string) { @@ -579,13 +587,18 @@ func TestCaptureChangeHooks(t *testing.T) { etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), captureInfoBytes, false) require.Nil(t, err) - require.Equal(t, callCount, 1) + require.Eventually(t, func() bool { + return callCount == 1 + }, time.Second*3, 10*time.Millisecond) err = state.Update(util.NewEtcdKey( etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), nil /* delete */, false) require.Nil(t, err) - require.Equal(t, callCount, 2) + require.Eventually(t, func() bool { + state.UpdatePendingChange() + return callCount == 2 + }, time.Second*10, 10*time.Millisecond) } func TestCheckChangefeedNormal(t *testing.T) {