Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maintainer: fix remove changefeed and some data race #743

Merged
merged 19 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ jobs:
if: ${{ always() }}
run: |
DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data' -exec dirname {} \;)
CASE=$(basename $DIR)
[ -z "$DIR" ] && exit 0
CASE=$(basename $DIR)
mkdir -p ./logs/$CASE
cat $DIR/stdout.log
tail -n 10 $DIR/cdc.log
Expand Down
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"os"
"slices"
"strings"

"github.com/pingcap/log"
Expand Down Expand Up @@ -48,6 +49,7 @@ func addNewArchCommandTo(cmd *cobra.Command) {
}

func isNewArchEnabledByConfig(serverConfigFilePath string) bool {

cfg := config.GetDefaultServerConfig()
if len(serverConfigFilePath) > 0 {
// strict decode config file, but ignore debug item
Expand Down Expand Up @@ -80,6 +82,12 @@ func parseConfigFlagFromOSArgs() string {
serverConfigFilePath = os.Args[i+2]
}
}

// If the command is `cdc cli changefeed`, means it's not a server config file.
if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") {
serverConfigFilePath = ""
}

return serverConfigFilePath
}

Expand Down
2 changes: 1 addition & 1 deletion docs/design/2024-12-20-ticdc-flow-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TiCDC processes data in two main parts:

The following diagram illustrates the relationship between the **data puller** and **data sinker**:

![Data Flow](./medias/flow-control-1.png)
![Data Flow](../media/flow-control-1.png)
<!-- The source file for this diagram: docs/design/medias/flow-control-1.puml -->

In this architecture, **EventService** and **EventCollector** facilitate communication between the two parts:
Expand Down
8 changes: 6 additions & 2 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,

if err != nil {
log.Info("failed to send dispatcher request message to event service, try again later",
zap.Stringer("target", target),
zap.String("changefeedID", req.Dispatcher.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", req.Dispatcher.GetId()),
zap.Any("target", target.String()),
zap.Any("request", req),
zap.Error(err))
// Put the request back to the channel for later retry.
c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{
Expand Down Expand Up @@ -570,6 +573,7 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent,
func (d *dispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) {
d.eventServiceInfo.Lock()
defer d.eventServiceInfo.Unlock()

if event.GetType() != commonEvent.TypeReadyEvent {
log.Panic("should not happen")
}
Expand Down Expand Up @@ -658,7 +662,7 @@ func (d *dispatcherStat) unregisterDispatcher(eventCollector *EventCollector) {
ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE,
})
// unregister from remote event service if have
if d.eventServiceInfo.serverID != eventCollector.serverId {
if d.eventServiceInfo.serverID != "" && d.eventServiceInfo.serverID != eventCollector.serverId {
eventCollector.mustSendDispatcherRequest(d.eventServiceInfo.serverID, eventServiceTopic, DispatcherRequest{
Dispatcher: d.target,
ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE,
Expand Down
Loading
Loading