diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index b75cd0eefed..d59527a6e14 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -18,6 +18,8 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" @@ -30,6 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/kafka" "go.uber.org/atomic" + "go.uber.org/zap" ) // Assert EventSink[E event.TableEvent] implementation @@ -59,7 +62,7 @@ type dmlSink struct { adminClient kafka.ClusterAdminClient ctx context.Context - cancel context.CancelFunc + cancel context.CancelCauseFunc wg sync.WaitGroup dead chan struct{} @@ -79,7 +82,7 @@ func newDMLSink( scheme string, errCh chan error, ) *dmlSink { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) statistics := metrics.NewStatistics(ctx, changefeedID, sink.RowSink) worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics) @@ -144,10 +147,16 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa continue } callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows))) + for _, row := range txn.Event.Rows { topic := s.alive.eventRouter.GetTopicForRowChange(row) partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic) + failpoint.Inject("MQSinkGetPartitionError", func() { + log.Info("failpoint MQSinkGetPartitionError injected", zap.String("changefeedID", s.id.ID)) + err = errors.New("MQSinkGetPartitionError") + }) if err != nil { + s.cancel(err) return errors.Trace(err) } index, key := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) @@ -170,7 +179,7 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa // Close closes the sink. func (s *dmlSink) Close() { if s.cancel != nil { - s.cancel() + s.cancel(nil) } s.wg.Wait() diff --git a/tests/integration_tests/mq_sink_lost_callback/conf/diff_config.toml b/tests/integration_tests/mq_sink_lost_callback/conf/diff_config.toml new file mode 100644 index 00000000000..6b546ecd404 --- /dev/null +++ b/tests/integration_tests/mq_sink_lost_callback/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/mq_sink_lost_callback/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["mq_sink_lost_callback.t"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/mq_sink_lost_callback/run.sh b/tests/integration_tests/mq_sink_lost_callback/run.sh new file mode 100755 index 00000000000..6ec69545eb6 --- /dev/null +++ b/tests/integration_tests/mq_sink_lost_callback/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function test_mq_sink_lost_callback() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + run_sql "DROP DATABASE if exists mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE DATABASE mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE mq_sink_lost_callback.t (a int not null primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + cd $WORK_DIR + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/MQSinkGetPartitionError=2*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" + + run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (1);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (2);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" + fi + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + test_mq_sink_lost_callback $* +} + +trap stop_tidb_cluster EXIT +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 00c758c38cf..21fb8d504ab 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -14,7 +14,7 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_sui mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" -kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume" +kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback" kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2"