Skip to content

Commit

Permalink
test(dm): add async checkpoint flush integration test (#4538)
Browse files Browse the repository at this point in the history
ref #4159
  • Loading branch information
db-will authored Mar 8, 2022
1 parent 98522d1 commit 3466eaf
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 0 deletions.
9 changes: 9 additions & 0 deletions dm/syncer/checkpoint_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package syncer
import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/filter"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,6 +90,13 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) {
}

err = w.cp.FlushPointsExcept(ctx, task.snapshotInfo.id, task.exceptTables, task.shardMetaSQLs, task.shardMetaArgs)
failpoint.Inject("AsyncCheckpointFlushThrowError", func() {
if isAsyncFlush {
ctx.L().Warn("async checkpoint flush error triggered", zap.String("failpoint", "AsyncCheckpointFlushThrowError"))
err = errors.New("async checkpoint flush throw error")
}
})

if err != nil {
ctx.L().Warn(fmt.Sprintf("%s checkpoint snapshot failed, ignore this error", flushLogMsg), zap.Any("flushCpTask", task), zap.Error(err))
// async flush error will be skipped here but sync flush error will raised up
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["async_checkpoint_flush.t?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
40 changes: 40 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["async_checkpoint_flush"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
checkpoint-flush-interval: 10
experimental:
async-checkpoint-flush: true
2 changes: 2 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
9 changes: 9 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source-id: mysql-replica-01
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
5 changes: 5 additions & 0 deletions dm/tests/async_checkpoint_flush/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
drop database if exists `async_checkpoint_flush`;
reset master;
create database `async_checkpoint_flush`;
use `async_checkpoint_flush`;
create table t1 (id int, primary key(`id`));
80 changes: 80 additions & 0 deletions dm/tests/async_checkpoint_flush/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
TASK_NAME="test"
SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt"

function run_sql_silent() {
TIDB_PORT=4000
user="root"
if [[ "$2" = $TIDB_PORT ]]; then
user="test"
fi
mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>/dev/null
}

function insert_data() {
i=1

while true; do
run_sql_silent "insert into async_checkpoint_flush.t1 values ($(($i * 2 + 1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
((i++))
done
}

function run() {
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/AsyncCheckpointFlushThrowError=return(true)"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 1 row affected'

# run dm master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
check_metric $MASTER_PORT 'start_leader_counter' 3 0 2

# copy config file
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml

# bound source1 to worker1, source2 to worker2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

# check dm-workers metrics unit: relay file index must be 1.
check_metric $WORKER1_PORT "dm_relay_binlog_file" 3 0 2

# start a task in all mode, and when enter incremental mode, we only execute DML
dmctl_start_task_standalone $cur/conf/dm-task.yaml

# check task has started state=2 running
check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"$TASK_NAME\",worker=\"worker1\"}" 10 1 3

# check diff
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

insert_data &
pid=$!
echo "PID of insert_data is $pid"

sleep 30

kill $pid
check_log_contain_with_retry 'async flush checkpoint snapshot failed, ignore this error' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'sync flush checkpoint snapshot successfully' $WORK_DIR/worker1/log/dm-worker.log
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
export GO_FAILPOINTS=""
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions dm/tests/others_integration_1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ lightning_mode
slow_relay_writer
sync_collation
s3_dumpling_lighting
async_checkpoint_flush

0 comments on commit 3466eaf

Please sign in to comment.