Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): fix the bug that mq sink can lost callbacks #9852

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Assert EventSink[E event.TableEvent] implementation
Expand Down Expand Up @@ -59,7 +62,7 @@ type dmlSink struct {
adminClient kafka.ClusterAdminClient

ctx context.Context
cancel context.CancelFunc
cancel context.CancelCauseFunc

wg sync.WaitGroup
dead chan struct{}
Expand All @@ -79,7 +82,7 @@ func newDMLSink(
scheme string,
errCh chan error,
) *dmlSink {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
statistics := metrics.NewStatistics(ctx, changefeedID, sink.RowSink)
worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics)

Expand Down Expand Up @@ -144,10 +147,16 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
continue
}
callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows)))

for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic)
failpoint.Inject("MQSinkGetPartitionError", func() {
log.Info("failpoint MQSinkGetPartitionError injected", zap.String("changefeedID", s.id.ID))
err = errors.New("MQSinkGetPartitionError")
})
if err != nil {
s.cancel(err)
return errors.Trace(err)
}
index, key := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
Expand All @@ -170,7 +179,7 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
// Close closes the sink.
func (s *dmlSink) Close() {
if s.cancel != nil {
s.cancel()
s.cancel(nil)
}
s.wg.Wait()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/mq_sink_lost_callback/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["mq_sink_lost_callback.t"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/mq_sink_lost_callback/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function test_mq_sink_lost_callback() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

run_sql "DROP DATABASE if exists mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE mq_sink_lost_callback.t (a int not null primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

cd $WORK_DIR
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/MQSinkGetPartitionError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
run_cdc_cli changefeed create --sink-uri="$SINK_URI"

run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (1);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (2);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}"
fi
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
stop_tidb_cluster
}

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" != "kafka" ]; then
return
fi

test_mq_sink_lost_callback $*
}

trap stop_tidb_cluster EXIT
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_sui
mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume"
kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback"
kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check"
kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2"

Expand Down