Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 3, 2023
1 parent 9879d70 commit 1048e69
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 74 deletions.
17 changes: 17 additions & 0 deletions tests/integration_tests/_utils/check_redo_resolved_ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# parameter 1: changefeed id
# parameter 2: check tso
# parameter 3: external storage path
# parameter 4: temporary download path

changefeedid=$1
check_tso=$2
storage_path=$3
read_dir=$4

# check resolved ts has been persisted in redo log meta
rts=$(cdc redo meta --storage="$storage_path" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}')
if [[ "$rts" -le "$check_tso" ]]; then
echo "global resolved ts $rts not forward to $check_tso"
exit 1
fi
25 changes: 5 additions & 20 deletions tests/integration_tests/consistent_replicate_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,6 @@ stop() {

s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket

# check resolved ts has been persisted in redo log meta
function check_resolved_ts() {
export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY
changefeedid=$1
check_tso=$2
read_dir=$3
rts=$(cdc redo meta --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}')
if [[ "$rts" -gt "$check_tso" ]]; then
return
fi
echo "global resolved ts $rts not forward to $check_tso"
exit 1
}

export -f check_resolved_ts

function run() {
# we only support eventually consistent replication with MySQL sink
if [ "$SINK_TYPE" == "kafka" ]; then
Expand Down Expand Up @@ -92,15 +75,17 @@ function run() {
# to ensure row changed events have been replicated to TiCDC
sleep 5

storage_path="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/"
tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 20 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta
ensure 20 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta
cleanup_process $CDC_BINARY

export GO_FAILPOINTS=''
export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY
cdc redo apply --tmp-dir="$WORK_DIR/redo/apply" \
--storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" \
cdc redo apply --tmp-dir="$WORK_DIR/apply" \
--storage="$storage_path" \
--sink-uri="mysql://normal:[email protected]:3306/"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[consistent]
level = "eventual"
storage = "nfs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo"
storage = "gcs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo"
22 changes: 4 additions & 18 deletions tests/integration_tests/consistent_replicate_nfs/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,6 @@ stop() {
stop_tidb_cluster
}

# check resolved ts has been persisted in redo log meta
function check_resolved_ts() {
changefeedid=$1
check_tso=$2
redo_dir=$3
rts=$(cdc redo meta --storage="nfs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" --tmp-dir="$redo_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}')
if [[ "$rts" -gt "$check_tso" ]]; then
return
fi
echo "global resolved ts $rts not forward to $check_tso"
exit 1
}

export -f check_resolved_ts

function run() {
# we only support eventually consistent replication with MySQL sink
if [ "$SINK_TYPE" == "kafka" ]; then
Expand Down Expand Up @@ -64,13 +49,14 @@ function run() {
# to ensure row changed events have been replicated to TiCDC
sleep 10

nfs_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
storage_path="nfs://$WORK_DIR/nfs/redo"
tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_resolved_ts $changefeed_id $current_tso $nfs_download_path
ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta
cleanup_process $CDC_BINARY

export GO_FAILPOINTS=''
cdc redo apply --tmp-dir="$nfs_download_path" --storage="nfs://$WORK_DIR/nfs/redo" --sink-uri="mysql://normal:[email protected]:3306/"
cdc redo apply --tmp-dir="$tmp_download_path/apply" --storage="$storage_path" --sink-uri="mysql://normal:[email protected]:3306/"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file/redo"
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_storage_file.usertable*","consistent_replicate_storage_file.t*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use `consistent_replicate_storage_file`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);


84 changes: 84 additions & 0 deletions tests/integration_tests/consistent_replicate_storage_file/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"

stop() {
# to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase
echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file.USERTABLE;")
stop_tidb_cluster
}

function run() {
# we only support eventually consistent replication with MySQL sink
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://normal:[email protected]:3306/"
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')

run_sql "CREATE DATABASE consistent_replicate_storage_file;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file
run_sql "CREATE table consistent_replicate_storage_file.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "consistent_replicate_storage_file.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_storage_file.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_storage_file.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file.USERTABLE2 like consistent_replicate_storage_file.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file.USERTABLE2 select * from consistent_replicate_storage_file.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 20

storage_path="file://$WORK_DIR/redo"
tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta
cleanup_process $CDC_BINARY

export GO_FAILPOINTS=''

# This value is generated by:
# echo -n '123456' | base64
# MTIzNDU2
# Use this value here to test redo apply function works well
# when use base64 encoded password
ENPASSWORD="MTIzNDU2"

cdc redo apply --tmp-dir="$tmp_download_path/apply" \
--storage="$storage_path" \
--sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}

trap stop EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_s3/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_s3/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_s3.usertable*","consistent_replicate_s3.t*"]
target-check-tables = ["consistent_replicate_storage_s3.usertable*","consistent_replicate_storage_s3.t*"]

[data-sources]
[data-sources.mysql1]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
threadcount=10
recordcount=5000
operationcount=0
workload=core
fieldcount=100

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use `consistent_replicate_s3`;
use `consistent_replicate_storage_s3`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,13 @@ stop_minio() {

stop() {
# to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase
echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_s3.USERTABLE;")
echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_s3.USERTABLE;")
stop_minio
stop_tidb_cluster
}

s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket

# check resolved ts has been persisted in redo log meta
function check_resolved_ts() {
export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY
changefeedid=$1
check_tso=$2
read_dir=$3
rts=$(cdc redo meta --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}')
if [[ "$rts" -gt "$check_tso" ]]; then
return
fi
echo "global resolved ts $rts not forward to $check_tso"
exit 1
}

export -f check_resolved_ts

function run() {
# we only support eventually consistent replication with MySQL sink
if [ "$SINK_TYPE" == "kafka" ]; then
Expand All @@ -76,13 +59,13 @@ function run() {
SINK_URI="mysql://normal:[email protected]:3306/"
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')

run_sql "CREATE DATABASE consistent_replicate_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_s3
run_sql "CREATE table consistent_replicate_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE consistent_replicate_storage_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_s3
run_sql "CREATE table consistent_replicate_storage_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "consistent_replicate_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_s3.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_storage_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_storage_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_storage_s3.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

Expand All @@ -91,17 +74,19 @@ function run() {
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_s3.USERTABLE2 like consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.USERTABLE2 select * from consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "create table consistent_replicate_storage_s3.USERTABLE2 like consistent_replicate_storage_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_s3.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_s3.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_s3.USERTABLE2 select * from consistent_replicate_storage_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 20

storage_path="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/"
tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta
ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta
cleanup_process $CDC_BINARY

export GO_FAILPOINTS=''
Expand All @@ -115,8 +100,8 @@ function run() {
# when use base64 encoded password
ENPASSWORD="MTIzNDU2"

cdc redo apply --tmp-dir="$WORK_DIR/redo/apply" \
--storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" \
cdc redo apply --tmp-dir="$tmp_download_path/apply" \
--storage="$storage_path" \
--sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}
Expand Down

0 comments on commit 1048e69

Please sign in to comment.