From d3da91a21bcadd6c62f2383fcc1d100ec57364d8 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 30 Dec 2024 18:30:08 +0800 Subject: [PATCH 01/15] maintainer: remove DS Signed-off-by: dongmen <414110582@qq.com> --- .../eventcollector/event_collector.go | 2 +- maintainer/maintainer.go | 25 ++++++---- maintainer/maintainer_event.go | 49 ------------------- maintainer/maintainer_manager.go | 45 +++-------------- maintainer/maintainer_test.go | 27 +++++----- pkg/messaging/message_center.go | 3 +- utils/dynstream/parallel_dynamic_stream.go | 13 ++--- 7 files changed, 45 insertions(+), 119 deletions(-) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index a6e7ed048..bd8161ee8 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -312,7 +312,7 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string, if err != nil { log.Info("failed to send dispatcher request message to event service, try again later", - zap.Stringer("target", target), + zap.Any("target", target), zap.Error(err)) // Put the request back to the channel for later retry. c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{ diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 33ecdf84b..bd26e8d74 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/ticdc/pkg/sink/util" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/chann" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" @@ -66,7 +65,6 @@ type Maintainer struct { eventCh *chann.DrainableChann[*Event] - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler] taskScheduler threadpool.ThreadPool mc messaging.MessageCenter @@ -127,7 +125,6 @@ func NewMaintainer(cfID common.ChangeFeedID, conf *config.SchedulerConfig, cfg *config.ChangeFeedInfo, selfNode *node.Info, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], taskScheduler threadpool.ThreadPool, pdAPI pdutil.PDAPIClient, tsoClient replica.TSOClient, @@ -148,7 +145,6 @@ func NewMaintainer(cfID common.ChangeFeedID, id: cfID, selfNode: selfNode, eventCh: chann.NewAutoDrainChann[*Event](), - stream: stream, taskScheduler: taskScheduler, startCheckpointTs: checkpointTs, controller: NewController(cfID, checkpointTs, pdAPI, tsoClient, regionCache, taskScheduler, @@ -198,7 +194,6 @@ func NewMaintainer(cfID common.ChangeFeedID, func NewMaintainerForRemove(cfID common.ChangeFeedID, conf *config.SchedulerConfig, selfNode *node.Info, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], taskScheduler threadpool.ThreadPool, pdAPI pdutil.PDAPIClient, tsoClient replica.TSOClient, @@ -209,11 +204,11 @@ func NewMaintainerForRemove(cfID common.ChangeFeedID, SinkURI: "", Config: config.GetDefaultReplicaConfig(), } - m := NewMaintainer(cfID, conf, unused, selfNode, stream, taskScheduler, pdAPI, + m := NewMaintainer(cfID, conf, unused, selfNode, taskScheduler, pdAPI, tsoClient, regionCache, 1) m.cascadeRemoving = true // setup period event - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, }, time.Now().Add(time.Millisecond*500)) @@ -308,7 +303,7 @@ func (m *Maintainer) initialize() error { } m.sendMessages(m.bootstrapper.HandleNewNodes(newNodes)) // setup period event - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, }, time.Now().Add(time.Millisecond*500)) @@ -734,7 +729,7 @@ func (m *Maintainer) onPeriodTask() { m.handleResendMessage() m.collectMetrics() m.calCheckpointTs() - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, }, time.Now().Add(time.Millisecond*500)) @@ -794,3 +789,15 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { func (m *Maintainer) MoveTable(tableId int64, targetNode node.ID) error { return m.controller.moveTable(tableId, targetNode) } + +// SubmitScheduledEvent submits a task to controller pool to send a future event +func (m *Maintainer) submitScheduledEvent( + scheduler threadpool.ThreadPool, + event *Event, + scheduleTime time.Time) { + task := func() time.Time { + m.eventCh.In() <- event + return time.Time{} + } + scheduler.SubmitFunc(task, scheduleTime) +} diff --git a/maintainer/maintainer_event.go b/maintainer/maintainer_event.go index 0569f791d..82915d03f 100644 --- a/maintainer/maintainer_event.go +++ b/maintainer/maintainer_event.go @@ -14,12 +14,8 @@ package maintainer import ( - "time" - "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/messaging" - "github.com/pingcap/ticdc/utils/dynstream" - "github.com/pingcap/ticdc/utils/threadpool" ) const ( @@ -37,48 +33,3 @@ type Event struct { eventType int message *messaging.TargetMessage } - -func (e Event) IsBatchable() bool { - return true -} - -// SubmitScheduledEvent submits a task to controller pool to send a future event -func SubmitScheduledEvent( - scheduler threadpool.ThreadPool, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], - event *Event, - scheduleTime time.Time) { - task := func() time.Time { - stream.Push(event.changefeedID.Id, event) - return time.Time{} - } - scheduler.SubmitFunc(task, scheduleTime) -} - -// StreamHandler implements the dynstream Handler, no real logic, just forward event to the maintainer -type StreamHandler struct { -} - -func NewStreamHandler() *StreamHandler { - return &StreamHandler{} -} - -func (m *StreamHandler) Path(event *Event) common.GID { - return event.changefeedID.Id -} - -func (m *StreamHandler) Handle(dest *Maintainer, events ...*Event) (await bool) { - if len(events) != 1 { - // TODO: Support batch - panic("unexpected event count") - } - event := events[0] - return dest.HandleEvent(event) -} - -func (m *StreamHandler) GetSize(event *Event) int { return 0 } -func (m *StreamHandler) GetArea(path common.GID, dest *Maintainer) int { return 0 } -func (m *StreamHandler) GetTimestamp(event *Event) dynstream.Timestamp { return 0 } -func (m *StreamHandler) GetType(event *Event) dynstream.EventType { return dynstream.DefaultEventType } -func (m *StreamHandler) IsPaused(event *Event) bool { return false } -func (m *StreamHandler) OnDrop(event *Event) {} diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index e433377bd..b72a90261 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -26,20 +26,13 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" - "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) -var ( - metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("maintainer-manager") - metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("maintainer-manager") -) - // Manager is the manager of all changefeed maintainer in a ticdc watcher, each ticdc watcher will // start a Manager when the watcher is startup. the Manager should: // 1. handle bootstrap command from coordinator and return all changefeed maintainer status @@ -62,7 +55,6 @@ type Manager struct { msgCh chan *messaging.TargetMessage - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler] taskScheduler threadpool.ThreadPool } @@ -88,8 +80,6 @@ func NewMaintainerManager(selfNode *node.Info, tsoClient: pdClient, regionCache: regionCache, } - m.stream = dynstream.NewDynamicStream(NewStreamHandler()) - m.stream.Start() mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) mc.RegisterHandler(messaging.MaintainerTopic, @@ -159,18 +149,10 @@ func (m *Manager) Run(ctx context.Context) error { cf.Close() log.Info("maintainer removed, remove it from dynamic stream", zap.String("changefeed", cf.id.String())) - if err := m.stream.RemovePath(cf.id.Id); err != nil { - log.Warn("remove path from dynstream failed, will retry later", - zap.String("changefeed", cf.id.String()), - zap.Error(err)) - // try it again later - return true - } m.maintainers.Delete(key) } return true }) - m.updateMetricsOnce() } } } @@ -243,21 +225,19 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("config", cfConfig)) } - cf := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.stream, m.taskScheduler, + cf := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache, req.CheckpointTs) - err = m.stream.AddPath(cfID.Id, cf) if err != nil { log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) return } m.maintainers.Store(cfID, cf) - m.stream.Push(cfID.Id, &Event{changefeedID: cfID, eventType: EventInit}) } func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heartbeatpb.MaintainerStatus { req := msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest) cfID := common.NewChangefeedIDFromPB(req.GetId()) - _, ok := m.maintainers.Load(cfID) + cf, ok := m.maintainers.Load(cfID) if !ok { if !req.Cascade { log.Warn("ignore remove maintainer request, "+ @@ -271,22 +251,17 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart } // it's cascade remove, we should remove the dispatcher from all node // here we create a maintainer to run the remove the dispatcher logic - cf := NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.stream, m.taskScheduler, m.pdAPI, + cf := NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache) - err := m.stream.AddPath(cfID.Id, cf) - if err != nil { - log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) - return nil - } m.maintainers.Store(cfID, cf) } - log.Info("received remove maintainer request", - zap.String("changefeed", cfID.String())) - m.stream.Push(cfID.Id, &Event{ + cf.(*Maintainer).eventCh.In() <- &Event{ changefeedID: cfID, eventType: EventMessage, message: msg, - }) + } + log.Info("received remove maintainer request", + zap.String("changefeed", cfID.String())) return nil } @@ -384,12 +359,6 @@ func (m *Manager) dispatcherMaintainerMessage( return nil } -func (m *Manager) updateMetricsOnce() { - dsMetrics := m.stream.GetMetrics() - metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize)) - metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) -} - func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) *Maintainer { c, ok := m.maintainers.Load(changefeedID) if !ok { diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 9f38f169d..bb3464aa3 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/server/watcher" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -297,19 +296,8 @@ func TestMaintainerSchedule(t *testing.T) { nodeManager := watcher.NewNodeManager(nil, nil) appcontext.SetService(watcher.NodeManagerName, nodeManager) nodeManager.GetAliveNodes()[n.ID] = n - stream := dynstream.NewDynamicStream(NewStreamHandler()) - stream.Start() cfID := common.NewChangeFeedIDWithName("test") mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) - mc.RegisterHandler(messaging.MaintainerManagerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { - stream.Push(cfID.Id, &Event{ - changefeedID: cfID, - eventType: EventMessage, - message: msg, - }) - return nil - }) dispatcherManager := MockDispatcherManager(mc, n.ID) wg := &sync.WaitGroup{} @@ -328,15 +316,24 @@ func TestMaintainerSchedule(t *testing.T) { }, &config.ChangeFeedInfo{ Config: config.GetDefaultReplicaConfig(), - }, n, stream, taskScheduler, nil, tsoClient, nil, 10) - _ = stream.AddPath(cfID.Id, maintainer) + }, n, taskScheduler, nil, tsoClient, nil, 10) + + mc.RegisterHandler(messaging.MaintainerManagerTopic, + func(ctx context.Context, msg *messaging.TargetMessage) error { + maintainer.eventCh.In() <- &Event{ + changefeedID: cfID, + eventType: EventMessage, + message: msg, + } + return nil + }) // send bootstrap message maintainer.sendMessages(maintainer.bootstrapper.HandleNewNodes( []*node.Info{n}, )) // setup period event - SubmitScheduledEvent(maintainer.taskScheduler, maintainer.stream, &Event{ + maintainer.submitScheduledEvent(maintainer.taskScheduler, &Event{ changefeedID: maintainer.id, eventType: EventPeriod, }, time.Now().Add(time.Millisecond*500)) diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index 64c25295c..fa6b65e7a 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/config" @@ -219,7 +220,7 @@ func (mc *messageCenter) SendCommand(msg *TargetMessage) error { target, ok := mc.remoteTargets.m[msg.To] mc.remoteTargets.RUnlock() if !ok { - return apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %s not found", msg.To)} + return errors.WithStack(apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %v not found", msg.To)}) } return target.sendCommand(msg) } diff --git a/utils/dynstream/parallel_dynamic_stream.go b/utils/dynstream/parallel_dynamic_stream.go index 41ffbe359..7d648429d 100644 --- a/utils/dynstream/parallel_dynamic_stream.go +++ b/utils/dynstream/parallel_dynamic_stream.go @@ -3,6 +3,7 @@ package dynstream import ( "reflect" "sync" + "sync/atomic" "time" "unsafe" @@ -24,8 +25,8 @@ type parallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D feedbackChan chan Feedback[A, P, D] - _statAddPathCount int - _statRemovePathCount int + _statAddPathCount atomic.Int64 + _statRemovePathCount atomic.Int64 } func newParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option Option) *parallelDynamicStream[A, P, T, D, H] { @@ -136,7 +137,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are } pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi, newPath: true} - s._statAddPathCount++ + s._statAddPathCount.Add(1) return nil } @@ -157,7 +158,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) RemovePath(path P) error { pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi} delete(s.pathMap, path) - s._statRemovePathCount++ + s._statRemovePathCount.Add(1) return nil } @@ -173,7 +174,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics { size := ds.getPendingSize() metrics.PendingQueueLen += size } - metrics.AddPath = s._statAddPathCount - metrics.RemovePath = s._statRemovePathCount + metrics.AddPath = int(s._statAddPathCount.Load()) + metrics.RemovePath = int(s._statRemovePathCount.Load()) return metrics } From 845c17c7e659a9115c4f5e378cb6fa40badd7bff Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 10:11:29 +0800 Subject: [PATCH 02/15] maintainer: add some comments Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer_manager.go | 32 +++++++++++++-------------- maintainer/maintainer_manager_test.go | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index b72a90261..82828850d 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -34,10 +34,10 @@ import ( ) // Manager is the manager of all changefeed maintainer in a ticdc watcher, each ticdc watcher will -// start a Manager when the watcher is startup. the Manager should: -// 1. handle bootstrap command from coordinator and return all changefeed maintainer status -// 2. handle dispatcher command from coordinator: add or remove changefeed maintainer -// 3. check maintainer liveness +// start a Manager when the watcher is startup. It responsible for: +// 1. Handle bootstrap command from coordinator and report all changefeed maintainer status. +// 2. Handle other commands from coordinator: like add or remove changefeed maintainer +// 3. Manage maintainers lifetime type Manager struct { mc messaging.MessageCenter conf *config.SchedulerConfig @@ -53,15 +53,14 @@ type Manager struct { tsoClient replica.TSOClient regionCache *tikv.RegionCache + // msgCh is used to cache messages from coordinator msgCh chan *messaging.TargetMessage taskScheduler threadpool.ThreadPool } -// NewMaintainerManager create a changefeed maintainer manager instance, -// 1. manager receives bootstrap command from coordinator -// 2. manager manages maintainer lifetime -// 3. manager report maintainer status to coordinator +// NewMaintainerManager create a changefeed maintainer manager instance +// and register message handler to message center func NewMaintainerManager(selfNode *node.Info, conf *config.SchedulerConfig, pdAPI pdutil.PDAPIClient, @@ -131,13 +130,15 @@ func (m *Manager) Name() string { } func (m *Manager) Run(ctx context.Context) error { - ticker := time.NewTicker(time.Millisecond * 500) + reportMaintainerStatusInterval := time.Millisecond * 200 + ticker := time.NewTicker(reportMaintainerStatusInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case msg := <-m.msgCh: + log.Info("fizz received message from coordinator", zap.Any("message", msg)) m.handleMessage(msg) case <-ticker.C: //1. try to send heartbeat to coordinator @@ -287,7 +288,7 @@ func (m *Manager) onDispatchMaintainerRequest( } func (m *Manager) sendHeartbeat() { - if m.coordinatorVersion > 0 { + if m.isBootstrap() { response := &heartbeatpb.MaintainerHeartbeat{} m.maintainers.Range(func(key, value interface{}) bool { cfMaintainer := value.(*Maintainer) @@ -311,7 +312,7 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) { m.onCoordinatorBootstrapRequest(msg) case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest: - if m.coordinatorVersion > 0 { + if m.isBootstrap() { status := m.onDispatchMaintainerRequest(msg) if status == nil { return @@ -338,11 +339,6 @@ func (m *Manager) dispatcherMaintainerMessage( case <-ctx.Done(): return ctx.Err() default: - // m.stream.Push(changefeed.Id, &Event{ - // changefeedID: changefeed, - // eventType: EventMessage, - // message: msg, - // }) c, ok := m.maintainers.Load(changefeed) if !ok { log.Warn("maintainer is not found", @@ -366,3 +362,7 @@ func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) * } return c.(*Maintainer) } + +func (m *Manager) isBootstrap() bool { + return m.coordinatorVersion > 0 +} diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index b8c3ccb32..9be39e513 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -62,7 +62,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { appcontext.SetService(appcontext.MessageCenter, mc) startDispatcherNode(ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) - //discard maintainer manager messages + // Discard maintainer manager messages, cuz we don't need to handle them in this test mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { return nil }) From 87814515db87aecef663dd866c8540ac6e1b5132 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 12:29:20 +0800 Subject: [PATCH 03/15] maintainer: push event after add cf Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 28 ++++++++++++++++++--------- maintainer/maintainer_manager.go | 9 +++++---- maintainer/maintainer_manager_test.go | 27 ++++++++++++++++++++------ pkg/messaging/target.go | 3 ++- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index bd26e8d74..4bcf427d0 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -44,6 +44,10 @@ import ( "go.uber.org/zap" ) +const ( + periodEventInterval = time.Millisecond * 200 +) + // Maintainer is response for handle changefeed replication tasks. Maintainer should: // 1. schedule tables to dispatcher manager // 2. calculate changefeed checkpoint ts @@ -91,7 +95,7 @@ type Maintainer struct { pdEndpoints []string nodeManager *watcher.NodeManager - nodesClosed map[node.ID]struct{} + closedNodes map[node.ID]struct{} statusChanged *atomic.Bool nodeChanged *atomic.Bool @@ -153,7 +157,7 @@ func NewMaintainer(cfID common.ChangeFeedID, state: heartbeatpb.ComponentState_Working, removed: atomic.NewBool(false), nodeManager: nodeManager, - nodesClosed: make(map[node.ID]struct{}), + closedNodes: make(map[node.ID]struct{}), statusChanged: atomic.NewBool(true), nodeChanged: atomic.NewBool(false), cascadeRemoving: false, @@ -178,7 +182,7 @@ func NewMaintainer(cfID common.ChangeFeedID, handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) - log.Info("maintainer is created", zap.String("id", cfID.String()), + log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), zap.Uint64("checkpointTs", checkpointTs), zap.String("ddl dispatcher", tableTriggerEventDispatcherID.String())) metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc() @@ -211,7 +215,7 @@ func NewMaintainerForRemove(cfID common.ChangeFeedID, m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) return m } @@ -306,7 +310,7 @@ func (m *Maintainer) initialize() error { m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) log.Info("changefeed maintainer initialized", zap.String("id", m.id.String()), zap.Duration("duration", time.Since(start))) @@ -598,7 +602,7 @@ func (m *Maintainer) sendPostBootstrapRequest() { func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeatpb.MaintainerCloseResponse) { if response.Success { - m.nodesClosed[from] = struct{}{} + m.closedNodes[from] = struct{}{} } // check if all nodes have sent response m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved) @@ -635,7 +639,7 @@ func (m *Maintainer) tryCloseChangefeed() bool { func (m *Maintainer) sendMaintainerCloseRequestToAllNode() bool { msgs := make([]*messaging.TargetMessage, 0) for n := range m.nodeManager.GetAliveNodes() { - if _, ok := m.nodesClosed[n]; !ok { + if _, ok := m.closedNodes[n]; !ok { msgs = append(msgs, messaging.NewSingleTargetMessage( n, messaging.DispatcherManagerManagerTopic, @@ -732,7 +736,7 @@ func (m *Maintainer) onPeriodTask() { m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) } func (m *Maintainer) collectMetrics() { @@ -796,8 +800,14 @@ func (m *Maintainer) submitScheduledEvent( event *Event, scheduleTime time.Time) { task := func() time.Time { - m.eventCh.In() <- event + m.pushEvent(event) return time.Time{} } scheduler.SubmitFunc(task, scheduleTime) } + +// pushEvent is used to push event to maintainer's event channel +// event will be handled by maintainer's main loop +func (m *Maintainer) pushEvent(event *Event) { + m.eventCh.In() <- event +} diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 82828850d..7be9081a5 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -232,6 +232,7 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) return } + cf.pushEvent(&Event{changefeedID: cfID, eventType: EventInit}) m.maintainers.Store(cfID, cf) } @@ -256,11 +257,11 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart m.tsoClient, m.regionCache) m.maintainers.Store(cfID, cf) } - cf.(*Maintainer).eventCh.In() <- &Event{ + cf.(*Maintainer).pushEvent(&Event{ changefeedID: cfID, eventType: EventMessage, message: msg, - } + }) log.Info("received remove maintainer request", zap.String("changefeed", cfID.String())) return nil @@ -346,11 +347,11 @@ func (m *Manager) dispatcherMaintainerMessage( return nil } maintainer := c.(*Maintainer) - maintainer.eventCh.In() <- &Event{ + maintainer.pushEvent(&Event{ changefeedID: changefeed, eventType: EventMessage, message: msg, - } + }) } return nil } diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index 9be39e513..dba4ebe7c 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/maintainer/replica" @@ -91,6 +92,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { data, err := json.Marshal(cfConfig) require.NoError(t, err) + // Case 1: Add new changefeed cfID := common.NewChangeFeedIDWithName("test") _ = mc.SendCommand(messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, &heartbeatpb.AddMaintainerRequest{ @@ -107,7 +109,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { require.Equal(t, 4, maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - // add 2 new node + log.Info("Pass case 1: Add new changefeed") + + // Case 2: Add new nodes node2 := node.NewInfo("127.0.0.1:8400", "") mc2 := messaging.NewMessageCenter(ctx, node2.ID, 0, config.NewDefaultMessageCenterConfig()) @@ -142,23 +146,29 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { require.Equal(t, 1, maintainer.controller.GetTaskSizeByNodeID(node4.ID)) - // remove 2 nodes + log.Info("Pass case 2: Add new nodes") + + // Case 3: Remove 2 nodes dn3.stop() + log.Info("fizz Stop node 3") dn4.stop() + log.Info("fizz Stop node 4") _, _ = nodeManager.Tick(ctx, &orchestrator.GlobalReactorState{ Captures: map[model.CaptureID]*model.CaptureInfo{ model.CaptureID(selfNode.ID): {ID: model.CaptureID(selfNode.ID), AdvertiseAddr: selfNode.AdvertiseAddr}, model.CaptureID(node2.ID): {ID: model.CaptureID(node2.ID), AdvertiseAddr: node2.AdvertiseAddr}, }}) time.Sleep(5 * time.Second) + log.Info("fizz Sleep 5 seconds done") require.Equal(t, 4, maintainer.controller.replicationDB.GetReplicatingSize()) require.Equal(t, 2, maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) require.Equal(t, 2, maintainer.controller.GetTaskSizeByNodeID(node2.ID)) + log.Info("Pass case 3: Remove 2 nodes") - // remove 2 tables + // Case 4: Remove 2 tables maintainer.controller.RemoveTasksByTableIDs(2, 3) time.Sleep(5 * time.Second) require.Equal(t, 2, @@ -168,7 +178,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { require.Equal(t, 1, maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - // add 2 tables + log.Info("Pass case 4: Remove 2 tables") + + // Case 5: Add 2 tables maintainer.controller.AddNewTable(commonEvent.Table{ SchemaID: 1, TableID: 5, @@ -185,7 +197,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { require.Equal(t, 2, maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - //close maintainer + log.Info("Pass case 5: Add 2 tables") + + // Case 6: Remove maintainer err = mc.SendCommand(messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{Id: cfID.ToPB(), Cascade: true})) require.NoError(t, err) @@ -193,7 +207,8 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { require.Equal(t, heartbeatpb.ComponentState_Stopped, maintainer.state) _, ok := manager.maintainers.Load(cfID) require.False(t, ok) - // manager.stream.Close() + log.Info("Pass case 6: Remove maintainer") + cancel() } diff --git a/pkg/messaging/target.go b/pkg/messaging/target.go index ae000b3f7..1bf0eda17 100644 --- a/pkg/messaging/target.go +++ b/pkg/messaging/target.go @@ -179,13 +179,14 @@ func newRemoteMessageTarget( // close stops the grpc stream and the goroutine spawned by remoteMessageTarget. func (s *remoteMessageTarget) close() { - log.Info("Close remote target", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId), zap.Any("addr", s.targetAddr)) + log.Info("Closing remote target", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId), zap.Any("addr", s.targetAddr)) if s.conn != nil { s.conn.Close() s.conn = nil } s.cancel() s.wg.Wait() + log.Info("Close remote target done", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId)) } func (s *remoteMessageTarget) runHandleErr(ctx context.Context) { From 78415dc389b9336db6b904bc2543bbbd51a4c839 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 12:36:48 +0800 Subject: [PATCH 04/15] maintainer: fix panic Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 7be9081a5..2fb30fc98 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -253,7 +253,7 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart } // it's cascade remove, we should remove the dispatcher from all node // here we create a maintainer to run the remove the dispatcher logic - cf := NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.taskScheduler, m.pdAPI, + cf = NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache) m.maintainers.Store(cfID, cf) } From b40f717bc70c068eb28dc2ecf5e5c13ef029948c Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 13:30:20 +0800 Subject: [PATCH 05/15] maintainer: remove debug log Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer_manager.go | 1 - maintainer/maintainer_manager_test.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 2fb30fc98..b3c485cb3 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -138,7 +138,6 @@ func (m *Manager) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case msg := <-m.msgCh: - log.Info("fizz received message from coordinator", zap.Any("message", msg)) m.handleMessage(msg) case <-ticker.C: //1. try to send heartbeat to coordinator diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index dba4ebe7c..4f4d3da43 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -150,16 +150,13 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { // Case 3: Remove 2 nodes dn3.stop() - log.Info("fizz Stop node 3") dn4.stop() - log.Info("fizz Stop node 4") _, _ = nodeManager.Tick(ctx, &orchestrator.GlobalReactorState{ Captures: map[model.CaptureID]*model.CaptureInfo{ model.CaptureID(selfNode.ID): {ID: model.CaptureID(selfNode.ID), AdvertiseAddr: selfNode.AdvertiseAddr}, model.CaptureID(node2.ID): {ID: model.CaptureID(node2.ID), AdvertiseAddr: node2.AdvertiseAddr}, }}) time.Sleep(5 * time.Second) - log.Info("fizz Sleep 5 seconds done") require.Equal(t, 4, maintainer.controller.replicationDB.GetReplicatingSize()) require.Equal(t, 2, From 4ab7370ad090267595446d3e4127d73dd74da86b Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 13:37:44 +0800 Subject: [PATCH 06/15] tests: add data race checker Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/_utils/check_logs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/tests/integration_tests/_utils/check_logs b/tests/integration_tests/_utils/check_logs index 99aa96013..afbfd9587 100755 --- a/tests/integration_tests/_utils/check_logs +++ b/tests/integration_tests/_utils/check_logs @@ -4,20 +4,6 @@ WORK_DIR=$1 set +e -## check cdc state checker log -if [ ! -f $WORK_DIR/cdc_etcd_check.log ]; then - exit 0 -fi - -grep -q -i test-case-failed $WORK_DIR/cdc_etcd_check.log - -if [ $? -eq 0 ]; then - echo "cdc state checker failed" - exit 1 -else - exit 0 -fi - ## check data race if [ ! -f $WORK_DIR/stdout.log ]; then exit 0 @@ -26,7 +12,7 @@ fi grep -q -i 'DATA RACE' $WORK_DIR/stdout.log if [ $? -eq 0 ]; then - echo "found DATA RACE" + echo "found DATA RACE, please check the logs" exit 1 else exit 0 From 29713ed49a8c5125788b72eea5d941cfd1bfa7f4 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 13:58:22 +0800 Subject: [PATCH 07/15] maintainer: adjust codes Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 4bcf427d0..da00901b7 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -95,6 +95,7 @@ type Maintainer struct { pdEndpoints []string nodeManager *watcher.NodeManager + // closedNodes is used to record the nodes that dispatcherManager is closed closedNodes map[node.ID]struct{} statusChanged *atomic.Bool @@ -604,14 +605,12 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat if response.Success { m.closedNodes[from] = struct{}{} } - // check if all nodes have sent response - m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved) } func (m *Maintainer) handleResendMessage() { // resend closing message if m.removing { - m.sendMaintainerCloseRequestToAllNode() + m.trySendMaintainerCloseRequestToAllNode() return } // resend bootstrap message @@ -633,12 +632,15 @@ func (m *Maintainer) tryCloseChangefeed() bool { m.controller.RemoveTasksByTableIDs(m.ddlSpan.Span.TableID) return !m.ddlSpan.IsWorking() } - return m.sendMaintainerCloseRequestToAllNode() + return m.trySendMaintainerCloseRequestToAllNode() } -func (m *Maintainer) sendMaintainerCloseRequestToAllNode() bool { +// trySendMaintainerCloseRequestToAllNode is used to send maintainer close request to all nodes +// if all nodes are closed, return true, otherwise return false. +func (m *Maintainer) trySendMaintainerCloseRequestToAllNode() bool { msgs := make([]*messaging.TargetMessage, 0) for n := range m.nodeManager.GetAliveNodes() { + // Check if the node is already closed. if _, ok := m.closedNodes[n]; !ok { msgs = append(msgs, messaging.NewSingleTargetMessage( n, From 532bb1824608a4842dfa81580c9ceed8589e61e7 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 14:37:09 +0800 Subject: [PATCH 08/15] eventCollector: fix bug Signed-off-by: dongmen <414110582@qq.com> --- downstreamadapter/eventcollector/event_collector.go | 8 ++++++-- pkg/messaging/message_center.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index bd8161ee8..186f7c111 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -312,7 +312,10 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string, if err != nil { log.Info("failed to send dispatcher request message to event service, try again later", - zap.Any("target", target), + zap.String("changefeedID", req.Dispatcher.GetChangefeedID().ID().String()), + zap.Stringer("dispatcher", req.Dispatcher.GetId()), + zap.Any("target", target.String()), + zap.Any("request", req), zap.Error(err)) // Put the request back to the channel for later retry. c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{ @@ -553,6 +556,7 @@ func (d *DispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent, func (d *DispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) { d.eventServiceInfo.Lock() defer d.eventServiceInfo.Unlock() + if event.GetType() != commonEvent.TypeReadyEvent { log.Panic("should not happen") } @@ -641,7 +645,7 @@ func (d *DispatcherStat) unregisterDispatcher(eventCollector *EventCollector) { ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, }) // unregister from remote event service if have - if d.eventServiceInfo.serverID != eventCollector.serverId { + if d.eventServiceInfo.serverID != "" && d.eventServiceInfo.serverID != eventCollector.serverId { eventCollector.mustSendDispatcherRequest(d.eventServiceInfo.serverID, eventServiceTopic, DispatcherRequest{ Dispatcher: d.target, ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index fa6b65e7a..a4f36f930 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -220,7 +220,7 @@ func (mc *messageCenter) SendCommand(msg *TargetMessage) error { target, ok := mc.remoteTargets.m[msg.To] mc.remoteTargets.RUnlock() if !ok { - return errors.WithStack(apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %v not found", msg.To)}) + return errors.WithStack(apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %v not found", msg.To.String())}) } return target.sendCommand(msg) } From bbdf8581c500fc520a460a889e64069b469d1e28 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 15:34:46 +0800 Subject: [PATCH 09/15] fix ut Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 17 +-- maintainer/maintainer_manager_test.go | 160 ++++++++++++++++---------- 2 files changed, 110 insertions(+), 67 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index da00901b7..e14866874 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -75,7 +75,7 @@ type Maintainer struct { watermark *heartbeatpb.Watermark checkpointTsByCapture map[node.ID]heartbeatpb.Watermark - state heartbeatpb.ComponentState + state atomic.Int32 bootstrapper *bootstrap.Bootstrapper[heartbeatpb.MaintainerBootstrapResponse] changefeedSate model.FeedState @@ -155,7 +155,6 @@ func NewMaintainer(cfID common.ChangeFeedID, controller: NewController(cfID, checkpointTs, pdAPI, tsoClient, regionCache, taskScheduler, cfg.Config, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)), mc: mc, - state: heartbeatpb.ComponentState_Working, removed: atomic.NewBool(false), nodeManager: nodeManager, closedNodes: make(map[node.ID]struct{}), @@ -182,6 +181,7 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } + m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), zap.Uint64("checkpointTs", checkpointTs), @@ -236,7 +236,7 @@ func (m *Maintainer) HandleEvent(event *Event) bool { } m.handleEventDuration.Observe(duration.Seconds()) }() - if m.state == heartbeatpb.ComponentState_Stopped { + if m.state.Load() == int32(heartbeatpb.ComponentState_Stopped) { log.Warn("maintainer is stopped, ignore", zap.String("changefeed", m.id.String())) return false @@ -282,7 +282,7 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { status := &heartbeatpb.MaintainerStatus{ ChangefeedID: m.id.ToPB(), FeedState: string(m.changefeedSate), - State: m.state, + State: heartbeatpb.ComponentState(m.state.Load()), CheckpointTs: m.watermark.CheckpointTs, Err: runningErrors, } @@ -315,7 +315,7 @@ func (m *Maintainer) initialize() error { log.Info("changefeed maintainer initialized", zap.String("id", m.id.String()), zap.Duration("duration", time.Since(start))) - m.state = heartbeatpb.ComponentState_Working + m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.statusChanged.Store(true) return nil } @@ -371,7 +371,7 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { closed := m.tryCloseChangefeed() if closed { m.removed.Store(true) - m.state = heartbeatpb.ComponentState_Stopped + m.state.Store(int32(heartbeatpb.ComponentState_Stopped)) metrics.MaintainerGauge.WithLabelValues(m.id.Namespace(), m.id.Name()).Dec() } } @@ -479,7 +479,7 @@ func (m *Maintainer) updateMetrics() { lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) - m.changefeedStatusGauge.Set(float64(m.state)) + m.changefeedStatusGauge.Set(float64(m.state.Load())) } // send message to remote @@ -604,6 +604,7 @@ func (m *Maintainer) sendPostBootstrapRequest() { func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeatpb.MaintainerCloseResponse) { if response.Success { m.closedNodes[from] = struct{}{} + m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved) } } @@ -625,7 +626,7 @@ func (m *Maintainer) handleResendMessage() { } func (m *Maintainer) tryCloseChangefeed() bool { - if m.state != heartbeatpb.ComponentState_Stopped { + if m.state.Load() != int32(heartbeatpb.ComponentState_Stopped) { m.statusChanged.Store(true) } if !m.cascadeRemoving { diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index 4f4d3da43..ba6b54794 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc" ) +// This is a integration test for maintainer manager, it may consume a lot of time. // scale out/in close, add/remove tables func TestMaintainerSchedulesNodeChanges(t *testing.T) { ctx := context.Background() @@ -61,7 +62,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // Discard maintainer manager messages, cuz we don't need to handle them in this test mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { @@ -100,12 +101,20 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { Config: data, CheckpointTs: 10, })) - time.Sleep(5 * time.Second) - value, _ := manager.maintainers.Load(cfID) + + value, ok := manager.maintainers.Load(cfID) + if !ok { + require.Eventually(t, func() bool { + value, ok = manager.maintainers.Load(cfID) + return ok + }, 20*time.Second, 200*time.Millisecond) + } + require.True(t, ok) maintainer := value.(*Maintainer) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) require.Equal(t, 4, maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) @@ -121,9 +130,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { node4 := node.NewInfo("127.0.0.1:8600", "") mc4 := messaging.NewMessageCenter(ctx, node4.ID, 0, config.NewDefaultMessageCenterConfig()) - startDispatcherNode(ctx, node2, mc2, nodeManager) - dn3 := startDispatcherNode(ctx, node3, mc3, nodeManager) - dn4 := startDispatcherNode(ctx, node4, mc4, nodeManager) + startDispatcherNode(t, ctx, node2, mc2, nodeManager) + dn3 := startDispatcherNode(t, ctx, node3, mc3, nodeManager) + dn4 := startDispatcherNode(t, ctx, node4, mc4, nodeManager) // notify node changes _, _ = nodeManager.Tick(ctx, &orchestrator.GlobalReactorState{ @@ -135,16 +144,21 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { }}) time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node3.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node4.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node3.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node4.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) log.Info("Pass case 2: Add new nodes") @@ -156,25 +170,30 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { model.CaptureID(selfNode.ID): {ID: model.CaptureID(selfNode.ID), AdvertiseAddr: selfNode.AdvertiseAddr}, model.CaptureID(node2.ID): {ID: model.CaptureID(node2.ID), AdvertiseAddr: node2.AdvertiseAddr}, }}) - time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) + + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + log.Info("Pass case 3: Remove 2 nodes") // Case 4: Remove 2 tables maintainer.controller.RemoveTasksByTableIDs(2, 3) - time.Sleep(5 * time.Second) - require.Equal(t, 2, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) log.Info("Pass case 4: Remove 2 tables") // Case 5: Add 2 tables @@ -186,13 +205,15 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { SchemaID: 1, TableID: 6, }, 3) - time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) log.Info("Pass case 5: Add 2 tables") @@ -200,19 +221,28 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { err = mc.SendCommand(messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{Id: cfID.ToPB(), Cascade: true})) require.NoError(t, err) - time.Sleep(2 * time.Second) - require.Equal(t, heartbeatpb.ComponentState_Stopped, maintainer.state) - _, ok := manager.maintainers.Load(cfID) + time.Sleep(5 * time.Second) + + require.Eventually(t, func() bool { + return maintainer.state.Load() == int32(heartbeatpb.ComponentState_Stopped) + }, 20*time.Second, 200*time.Millisecond) + + _, ok = manager.maintainers.Load(cfID) + if ok { + require.Eventually(t, func() bool { + _, ok = manager.maintainers.Load(cfID) + return ok == false + }, 20*time.Second, 200*time.Millisecond) + } require.False(t, ok) log.Info("Pass case 6: Remove maintainer") - cancel() } func TestMaintainerBootstrapWithTablesReported(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - selfNode := node.NewInfo("127.0.0.1:18300", "") + selfNode := node.NewInfo("127.0.0.1:18301", "") nodeManager := watcher.NewNodeManager(nil, nil) appcontext.SetService(watcher.NodeManagerName, nodeManager) nodeManager.GetAliveNodes()[selfNode.ID] = selfNode @@ -228,7 +258,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) //discard maintainer manager messages mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { @@ -283,15 +313,24 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { Config: data, CheckpointTs: 10, })) - time.Sleep(5 * time.Second) - value, _ := manager.maintainers.Load(cfID) + value, ok := manager.maintainers.Load(cfID) + if !ok { + require.Eventually(t, func() bool { + value, ok = manager.maintainers.Load(cfID) + return ok + }, 20*time.Second, 200*time.Millisecond) + } + require.True(t, ok) maintainer := value.(*Maintainer) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 4, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Len(t, remotedIds, 2) foundSize := 0 hasDDLDispatcher := false @@ -309,7 +348,6 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { } require.Equal(t, 2, foundSize) require.False(t, hasDDLDispatcher) - // manager.stream.Close() cancel() } @@ -332,7 +370,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) //discard maintainer manager messages mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { @@ -359,8 +397,14 @@ func TestStopNotExistsMaintainer(t *testing.T) { Cascade: true, Removed: true, })) - time.Sleep(2 * time.Second) + _, ok := manager.maintainers.Load(cfID) + if ok { + require.Eventually(t, func() bool { + _, ok = manager.maintainers.Load(cfID) + return !ok + }, 20*time.Second, 200*time.Millisecond) + } require.False(t, ok) cancel() } @@ -385,7 +429,7 @@ func (d *dispatcherNode) stop() { d.cancel() } -func startDispatcherNode(ctx context.Context, +func startDispatcherNode(t *testing.T, ctx context.Context, node *node.Info, mc messaging.MessageCenter, nodeManager *watcher.NodeManager) *dispatcherNode { nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges) ctx, cancel := context.WithCancel(ctx) @@ -396,9 +440,7 @@ func startDispatcherNode(ctx context.Context, mcs := messaging.NewMessageCenterServer(mc) proto.RegisterMessageCenterServer(grpcServer, mcs) lis, err := net.Listen("tcp", node.AdvertiseAddr) - if err != nil { - panic(err) - } + require.NoError(t, err) go func() { _ = grpcServer.Serve(lis) }() From 32171452166a3854df15f730828f2e13a6f66d39 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 15:40:08 +0800 Subject: [PATCH 10/15] fix it Signed-off-by: dongmen <414110582@qq.com> --- .github/workflows/integration_test_mysql.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 9119d3222..552247699 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -75,8 +75,8 @@ jobs: if: ${{ always() }} run: | DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data' -exec dirname {} \;) - CASE=$(basename $DIR) [ -z "$DIR" ] && exit 0 + CASE=$(basename $DIR) mkdir -p ./logs/$CASE cat $DIR/stdout.log tail -n 10 $DIR/cdc.log From 5d03a2fd52e999c1043d75b676587df17f903e29 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 15:57:46 +0800 Subject: [PATCH 11/15] fix command error log Signed-off-by: dongmen <414110582@qq.com> --- cmd/main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/main.go b/cmd/main.go index fbfa1140b..83f887bbd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ package main import ( "os" + "slices" "strings" "github.com/pingcap/log" @@ -48,6 +49,7 @@ func addNewArchCommandTo(cmd *cobra.Command) { } func isNewArchEnabledByConfig(serverConfigFilePath string) bool { + cfg := config.GetDefaultServerConfig() if len(serverConfigFilePath) > 0 { // strict decode config file, but ignore debug item @@ -80,6 +82,12 @@ func parseConfigFlagFromOSArgs() string { serverConfigFilePath = os.Args[i+2] } } + + // If the command is `cdc cli changefeed`, means it's not a server config file. + if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") { + serverConfigFilePath = "" + } + return serverConfigFilePath } From cbf13187354cf87e3ef07c37930e227d7b7f2042 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 16:04:06 +0800 Subject: [PATCH 12/15] fix data race Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index e14866874..ad006b06f 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -72,7 +72,7 @@ type Maintainer struct { taskScheduler threadpool.ThreadPool mc messaging.MessageCenter - watermark *heartbeatpb.Watermark + watermark atomic.Pointer[heartbeatpb.Watermark] checkpointTsByCapture map[node.ID]heartbeatpb.Watermark state atomic.Int32 @@ -163,11 +163,7 @@ func NewMaintainer(cfID common.ChangeFeedID, cascadeRemoving: false, config: cfg, - ddlSpan: ddlSpan, - watermark: &heartbeatpb.Watermark{ - CheckpointTs: checkpointTs, - ResolvedTs: checkpointTs, - }, + ddlSpan: ddlSpan, checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark), runningErrors: map[node.ID]*heartbeatpb.RunningError{}, @@ -181,6 +177,10 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } + m.watermark.Store(&heartbeatpb.Watermark{ + CheckpointTs: checkpointTs, + ResolvedTs: checkpointTs, + }) m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), @@ -265,7 +265,7 @@ func (m *Maintainer) Close() { log.Info("changefeed maintainer closed", zap.String("id", m.id.String()), zap.Bool("removed", m.removed.Load()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs)) + zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs)) } func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { @@ -283,7 +283,7 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { ChangefeedID: m.id.ToPB(), FeedState: string(m.changefeedSate), State: heartbeatpb.ComponentState(m.state.Load()), - CheckpointTs: m.watermark.CheckpointTs, + CheckpointTs: m.watermark.Load().CheckpointTs, Err: runningErrors, } return status @@ -417,8 +417,8 @@ func (m *Maintainer) calCheckpointTs() { if !m.bootstrapped { log.Warn("can not advance checkpointTs since not bootstrapped", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs)) + zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), + zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs)) return } // make sure there is no task running @@ -429,16 +429,16 @@ func (m *Maintainer) calCheckpointTs() { if !m.controller.ScheduleFinished() { log.Warn("can not advance checkpointTs since schedule is not finished", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs), + zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), + zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs), ) return } if m.barrier.ShouldBlockCheckpointTs() { log.Warn("can not advance checkpointTs since barrier is blocking", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs), + zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), + zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs), ) return } @@ -454,27 +454,27 @@ func (m *Maintainer) calCheckpointTs() { log.Warn("checkpointTs can not be advanced, since missing capture heartbeat", zap.String("changefeed", m.id.Name()), zap.Any("node", id), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs)) + zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), + zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs)) return } newWatermark.UpdateMin(m.checkpointTsByCapture[id]) } if newWatermark.CheckpointTs != math.MaxUint64 { - m.watermark.CheckpointTs = newWatermark.CheckpointTs + m.watermark.Load().CheckpointTs = newWatermark.CheckpointTs } if newWatermark.ResolvedTs != math.MaxUint64 { - m.watermark.ResolvedTs = newWatermark.ResolvedTs + m.watermark.Load().ResolvedTs = newWatermark.ResolvedTs } } func (m *Maintainer) updateMetrics() { - phyCkpTs := oracle.ExtractPhysical(m.watermark.CheckpointTs) + phyCkpTs := oracle.ExtractPhysical(m.watermark.Load().CheckpointTs) m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs)) lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3 m.changefeedCheckpointTsLagGauge.Set(lag) - phyResolvedTs := oracle.ExtractPhysical(m.watermark.ResolvedTs) + phyResolvedTs := oracle.ExtractPhysical(m.watermark.Load().ResolvedTs) m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs)) lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) From d268c654a28c75d9f5235c5962dca288d8d76f53 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 16:29:18 +0800 Subject: [PATCH 13/15] fix data race 2 Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 64 +++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index ad006b06f..5e2aaa75c 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -72,7 +72,11 @@ type Maintainer struct { taskScheduler threadpool.ThreadPool mc messaging.MessageCenter - watermark atomic.Pointer[heartbeatpb.Watermark] + watermark struct { + mu sync.RWMutex + *heartbeatpb.Watermark + } + checkpointTsByCapture map[node.ID]heartbeatpb.Watermark state atomic.Int32 @@ -177,7 +181,8 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } - m.watermark.Store(&heartbeatpb.Watermark{ + + m.setWatermark(heartbeatpb.Watermark{ CheckpointTs: checkpointTs, ResolvedTs: checkpointTs, }) @@ -265,7 +270,7 @@ func (m *Maintainer) Close() { log.Info("changefeed maintainer closed", zap.String("id", m.id.String()), zap.Bool("removed", m.removed.Load()), - zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs)) } func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { @@ -279,11 +284,12 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { } clear(m.runningErrors) } + status := &heartbeatpb.MaintainerStatus{ ChangefeedID: m.id.ToPB(), FeedState: string(m.changefeedSate), State: heartbeatpb.ComponentState(m.state.Load()), - CheckpointTs: m.watermark.Load().CheckpointTs, + CheckpointTs: m.getWatermark().CheckpointTs, Err: runningErrors, } return status @@ -417,8 +423,8 @@ func (m *Maintainer) calCheckpointTs() { if !m.bootstrapped { log.Warn("can not advance checkpointTs since not bootstrapped", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) return } // make sure there is no task running @@ -429,16 +435,16 @@ func (m *Maintainer) calCheckpointTs() { if !m.controller.ScheduleFinished() { log.Warn("can not advance checkpointTs since schedule is not finished", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), ) return } if m.barrier.ShouldBlockCheckpointTs() { log.Warn("can not advance checkpointTs since barrier is blocking", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), ) return } @@ -454,27 +460,24 @@ func (m *Maintainer) calCheckpointTs() { log.Warn("checkpointTs can not be advanced, since missing capture heartbeat", zap.String("changefeed", m.id.Name()), zap.Any("node", id), - zap.Uint64("checkpointTs", m.watermark.Load().CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.Load().ResolvedTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) return } newWatermark.UpdateMin(m.checkpointTsByCapture[id]) } - if newWatermark.CheckpointTs != math.MaxUint64 { - m.watermark.Load().CheckpointTs = newWatermark.CheckpointTs - } - if newWatermark.ResolvedTs != math.MaxUint64 { - m.watermark.Load().ResolvedTs = newWatermark.ResolvedTs - } + + m.setWatermark(*newWatermark) } func (m *Maintainer) updateMetrics() { - phyCkpTs := oracle.ExtractPhysical(m.watermark.Load().CheckpointTs) + watermark := m.getWatermark() + phyCkpTs := oracle.ExtractPhysical(watermark.CheckpointTs) m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs)) lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3 m.changefeedCheckpointTsLagGauge.Set(lag) - phyResolvedTs := oracle.ExtractPhysical(m.watermark.Load().ResolvedTs) + phyResolvedTs := oracle.ExtractPhysical(watermark.ResolvedTs) m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs)) lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) @@ -814,3 +817,24 @@ func (m *Maintainer) submitScheduledEvent( func (m *Maintainer) pushEvent(event *Event) { m.eventCh.In() <- event } + +func (m *Maintainer) getWatermark() heartbeatpb.Watermark { + m.watermark.mu.RLock() + defer m.watermark.mu.RUnlock() + res := heartbeatpb.Watermark{ + CheckpointTs: m.watermark.CheckpointTs, + ResolvedTs: m.watermark.ResolvedTs, + } + return res +} + +func (m *Maintainer) setWatermark(newWatermark heartbeatpb.Watermark) { + m.watermark.mu.Lock() + defer m.watermark.mu.Unlock() + if newWatermark.CheckpointTs != math.MaxUint64 { + m.watermark.CheckpointTs = newWatermark.CheckpointTs + } + if newWatermark.ResolvedTs != math.MaxUint64 { + m.watermark.ResolvedTs = newWatermark.ResolvedTs + } +} From 5625f63c7865f56b562f4a87eb944fa57029e76c Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 16:32:42 +0800 Subject: [PATCH 14/15] fix docs Signed-off-by: dongmen <414110582@qq.com> --- docs/design/2024-12-20-ticdc-flow-control.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2024-12-20-ticdc-flow-control.md b/docs/design/2024-12-20-ticdc-flow-control.md index 0359e4c22..344964107 100644 --- a/docs/design/2024-12-20-ticdc-flow-control.md +++ b/docs/design/2024-12-20-ticdc-flow-control.md @@ -12,7 +12,7 @@ TiCDC processes data in two main parts: The following diagram illustrates the relationship between the **data puller** and **data sinker**: -![Data Flow](./medias/flow-control-1.png) +![Data Flow](../media/flow-control-1.png) In this architecture, **EventService** and **EventCollector** facilitate communication between the two parts: From 91793c051c32e3b02fffb1fafd5c123a8fb59e34 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 31 Dec 2024 17:55:26 +0800 Subject: [PATCH 15/15] fix panic Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 5e2aaa75c..14757b279 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -182,10 +182,10 @@ func NewMaintainer(cfID common.ChangeFeedID, handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } - m.setWatermark(heartbeatpb.Watermark{ + m.watermark.Watermark = &heartbeatpb.Watermark{ CheckpointTs: checkpointTs, ResolvedTs: checkpointTs, - }) + } m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) log.Info("changefeed maintainer is created", zap.String("id", cfID.String()),