Skip to content

Commit

Permalink
sink(cdc): fix the bug that mq sink can lost callbacks (#9852)
Browse files Browse the repository at this point in the history
close #9855
  • Loading branch information
hicqu authored Oct 11, 2023
1 parent ac5544c commit 215162d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
15 changes: 12 additions & 3 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Assert EventSink[E event.TableEvent] implementation
Expand Down Expand Up @@ -59,7 +62,7 @@ type dmlSink struct {
adminClient kafka.ClusterAdminClient

ctx context.Context
cancel context.CancelFunc
cancel context.CancelCauseFunc

wg sync.WaitGroup
dead chan struct{}
Expand All @@ -79,7 +82,7 @@ func newDMLSink(
scheme string,
errCh chan error,
) *dmlSink {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
statistics := metrics.NewStatistics(ctx, changefeedID, sink.RowSink)
worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics)

Expand Down Expand Up @@ -144,10 +147,16 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
continue
}
callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows)))

for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic)
failpoint.Inject("MQSinkGetPartitionError", func() {
log.Info("failpoint MQSinkGetPartitionError injected", zap.String("changefeedID", s.id.ID))
err = errors.New("MQSinkGetPartitionError")
})
if err != nil {
s.cancel(err)
return errors.Trace(err)
}
index, key := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
Expand All @@ -170,7 +179,7 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
// Close closes the sink.
func (s *dmlSink) Close() {
if s.cancel != nil {
s.cancel()
s.cancel(nil)
}
s.wg.Wait()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/mq_sink_lost_callback/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["mq_sink_lost_callback.t"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/mq_sink_lost_callback/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function test_mq_sink_lost_callback() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

run_sql "DROP DATABASE if exists mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE mq_sink_lost_callback.t (a int not null primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

cd $WORK_DIR
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/MQSinkGetPartitionError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
run_cdc_cli changefeed create --sink-uri="$SINK_URI"

run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (1);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (2);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}"
fi
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
stop_tidb_cluster
}

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" != "kafka" ]; then
return
fi

test_mq_sink_lost_callback $*
}

trap stop_tidb_cluster EXIT
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_sui
mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume"
kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback"
kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check"
kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2"

Expand Down

0 comments on commit 215162d

Please sign in to comment.