diff --git a/tests/integration_tests/_utils/check_redo_resolved_ts b/tests/integration_tests/_utils/check_redo_resolved_ts new file mode 100755 index 00000000000..800644d58b6 --- /dev/null +++ b/tests/integration_tests/_utils/check_redo_resolved_ts @@ -0,0 +1,17 @@ +#!/bin/bash +# parameter 1: changefeed id +# parameter 2: check tso +# parameter 3: external storage path +# parameter 4: temporary download path + +changefeedid=$1 +check_tso=$2 +storage_path=$3 +read_dir=$4 + +# check resolved ts has been persisted in redo log meta +rts=$(cdc redo meta --storage="$storage_path" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}') +if [[ "$rts" -le "$check_tso" ]]; then + echo "global resolved ts $rts not forward to $check_tso" + exit 1 +fi diff --git a/tests/integration_tests/consistent_replicate_gbk/run.sh b/tests/integration_tests/consistent_replicate_gbk/run.sh index d50b0a09dd2..ec2bb63e829 100644 --- a/tests/integration_tests/consistent_replicate_gbk/run.sh +++ b/tests/integration_tests/consistent_replicate_gbk/run.sh @@ -41,23 +41,6 @@ stop() { s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket -# check resolved ts has been persisted in redo log meta -function check_resolved_ts() { - export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY - export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY - changefeedid=$1 - check_tso=$2 - read_dir=$3 - rts=$(cdc redo meta --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}') - if [[ "$rts" -gt "$check_tso" ]]; then - return - fi - echo "global resolved ts $rts not forward to $check_tso" - exit 1 -} - -export -f check_resolved_ts - function run() { # we only support eventually consistent replication with MySQL sink if [ "$SINK_TYPE" == "kafka" ]; then @@ -92,15 +75,17 @@ function run() { # to ensure row changed events have been replicated to TiCDC sleep 5 + storage_path="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 20 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta + ensure 20 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta cleanup_process $CDC_BINARY export GO_FAILPOINTS='' export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY - cdc redo apply --tmp-dir="$WORK_DIR/redo/apply" \ - --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" \ + cdc redo apply --tmp-dir="$WORK_DIR/apply" \ + --storage="$storage_path" \ --sink-uri="mysql://normal:123456@127.0.0.1:3306/" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml } diff --git a/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml index a215b9d772b..a2141a9a4bb 100644 --- a/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml +++ b/tests/integration_tests/consistent_replicate_nfs/conf/changefeed.toml @@ -1,3 +1,3 @@ [consistent] level = "eventual" -storage = "nfs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" +storage = "gcs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" diff --git a/tests/integration_tests/consistent_replicate_nfs/run.sh b/tests/integration_tests/consistent_replicate_nfs/run.sh index 56d398329a2..39284dc32e8 100644 --- a/tests/integration_tests/consistent_replicate_nfs/run.sh +++ b/tests/integration_tests/consistent_replicate_nfs/run.sh @@ -14,21 +14,6 @@ stop() { stop_tidb_cluster } -# check resolved ts has been persisted in redo log meta -function check_resolved_ts() { - changefeedid=$1 - check_tso=$2 - redo_dir=$3 - rts=$(cdc redo meta --storage="nfs:///tmp/tidb_cdc_test/consistent_replicate_nfs/nfs/redo" --tmp-dir="$redo_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}') - if [[ "$rts" -gt "$check_tso" ]]; then - return - fi - echo "global resolved ts $rts not forward to $check_tso" - exit 1 -} - -export -f check_resolved_ts - function run() { # we only support eventually consistent replication with MySQL sink if [ "$SINK_TYPE" == "kafka" ]; then @@ -64,13 +49,14 @@ function run() { # to ensure row changed events have been replicated to TiCDC sleep 10 - nfs_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id + storage_path="nfs://$WORK_DIR/nfs/redo" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 50 check_resolved_ts $changefeed_id $current_tso $nfs_download_path + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta cleanup_process $CDC_BINARY export GO_FAILPOINTS='' - cdc redo apply --tmp-dir="$nfs_download_path" --storage="nfs://$WORK_DIR/nfs/redo" --sink-uri="mysql://normal:123456@127.0.0.1:3306/" + cdc redo apply --tmp-dir="$tmp_download_path/apply" --storage="$storage_path" --sink-uri="mysql://normal:123456@127.0.0.1:3306/" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml } diff --git a/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml new file mode 100644 index 00000000000..dadf83cf9fd --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml @@ -0,0 +1,3 @@ +[consistent] +level = "eventual" +storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file/redo" diff --git a/tests/integration_tests/consistent_replicate_storage_file/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_storage_file/conf/diff_config.toml new file mode 100644 index 00000000000..ab502d7ea53 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["consistent_replicate_storage_file.usertable*","consistent_replicate_storage_file.t*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/consistent_replicate_s3/conf/workload b/tests/integration_tests/consistent_replicate_storage_file/conf/workload similarity index 100% rename from tests/integration_tests/consistent_replicate_s3/conf/workload rename to tests/integration_tests/consistent_replicate_storage_file/conf/workload diff --git a/tests/integration_tests/consistent_replicate_storage_file/data/prepare.sql b/tests/integration_tests/consistent_replicate_storage_file/data/prepare.sql new file mode 100644 index 00000000000..a1b9a336ebd --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file/data/prepare.sql @@ -0,0 +1,13 @@ +use `consistent_replicate_storage_file`; +set @@global.tidb_enable_exchange_partition=on; + +create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +insert into t1 values (1),(2),(3),(4),(5),(6); +insert into t1 values (7),(8),(9); +insert into t1 values (11),(12),(20); +alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40)); +insert into t1 values (25),(29),(35); /*these values in p3,p4*/ + +create table t2 (a int primary key); + + diff --git a/tests/integration_tests/consistent_replicate_storage_file/run.sh b/tests/integration_tests/consistent_replicate_storage_file/run.sh new file mode 100644 index 00000000000..e8b8ed84f92 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file/run.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +rm -rf "$WORK_DIR" +mkdir -p "$WORK_DIR" + +stop() { + # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file.USERTABLE;") + stop_tidb_cluster +} + +function run() { + # we only support eventually consistent replication with MySQL sink + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + + run_sql "CREATE DATABASE consistent_replicate_storage_file;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file + run_sql "CREATE table consistent_replicate_storage_file.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_file.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + # Inject the failpoint to prevent sink execution, but the global resolved can be moved forward. + # Then we can apply redo log to reach an eventual consistent state in downstream. + cleanup_process $CDC_BINARY + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn/mysql/MySQLSinkHangLongTime=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_sql "create table consistent_replicate_storage_file.USERTABLE2 like consistent_replicate_storage_file.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_storage_file.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file.USERTABLE2 select * from consistent_replicate_storage_file.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # to ensure row changed events have been replicated to TiCDC + sleep 20 + + storage_path="file://$WORK_DIR/redo" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id + current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta + cleanup_process $CDC_BINARY + + export GO_FAILPOINTS='' + + # This value is generated by: + # echo -n '123456' | base64 + # MTIzNDU2 + # Use this value here to test redo apply function works well + # when use base64 encoded password + ENPASSWORD="MTIzNDU2" + + cdc redo apply --tmp-dir="$tmp_download_path/apply" \ + --storage="$storage_path" \ + --sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml +} + +trap stop EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/consistent_replicate_s3/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml similarity index 100% rename from tests/integration_tests/consistent_replicate_s3/conf/changefeed.toml rename to tests/integration_tests/consistent_replicate_storage_s3/conf/changefeed.toml diff --git a/tests/integration_tests/consistent_replicate_s3/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_storage_s3/conf/diff_config.toml similarity index 65% rename from tests/integration_tests/consistent_replicate_s3/conf/diff_config.toml rename to tests/integration_tests/consistent_replicate_storage_s3/conf/diff_config.toml index e9fc089d25a..c6f6f934957 100644 --- a/tests/integration_tests/consistent_replicate_s3/conf/diff_config.toml +++ b/tests/integration_tests/consistent_replicate_storage_s3/conf/diff_config.toml @@ -7,13 +7,13 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/consistent_replicate_s3/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_s3/sync_diff/output" source-instances = ["mysql1"] target-instance = "tidb0" - target-check-tables = ["consistent_replicate_s3.usertable*","consistent_replicate_s3.t*"] + target-check-tables = ["consistent_replicate_storage_s3.usertable*","consistent_replicate_storage_s3.t*"] [data-sources] [data-sources.mysql1] diff --git a/tests/integration_tests/consistent_replicate_storage_s3/conf/workload b/tests/integration_tests/consistent_replicate_storage_s3/conf/workload new file mode 100644 index 00000000000..7a3fd48df6f --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_s3/conf/workload @@ -0,0 +1,14 @@ +threadcount=10 +recordcount=5000 +operationcount=0 +workload=core +fieldcount=100 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/consistent_replicate_s3/data/prepare.sql b/tests/integration_tests/consistent_replicate_storage_s3/data/prepare.sql similarity index 93% rename from tests/integration_tests/consistent_replicate_s3/data/prepare.sql rename to tests/integration_tests/consistent_replicate_storage_s3/data/prepare.sql index 67ecd05966d..4d0acf98cff 100644 --- a/tests/integration_tests/consistent_replicate_s3/data/prepare.sql +++ b/tests/integration_tests/consistent_replicate_storage_s3/data/prepare.sql @@ -1,4 +1,4 @@ -use `consistent_replicate_s3`; +use `consistent_replicate_storage_s3`; set @@global.tidb_enable_exchange_partition=on; create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); diff --git a/tests/integration_tests/consistent_replicate_s3/run.sh b/tests/integration_tests/consistent_replicate_storage_s3/run.sh similarity index 60% rename from tests/integration_tests/consistent_replicate_s3/run.sh rename to tests/integration_tests/consistent_replicate_storage_s3/run.sh index f94f07bdb80..dfca5cb5f69 100644 --- a/tests/integration_tests/consistent_replicate_s3/run.sh +++ b/tests/integration_tests/consistent_replicate_storage_s3/run.sh @@ -36,30 +36,13 @@ stop_minio() { stop() { # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase - echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_s3.USERTABLE;") + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_s3.USERTABLE;") stop_minio stop_tidb_cluster } s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket -# check resolved ts has been persisted in redo log meta -function check_resolved_ts() { - export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY - export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY - changefeedid=$1 - check_tso=$2 - read_dir=$3 - rts=$(cdc redo meta --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" --tmp-dir="$read_dir" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}') - if [[ "$rts" -gt "$check_tso" ]]; then - return - fi - echo "global resolved ts $rts not forward to $check_tso" - exit 1 -} - -export -f check_resolved_ts - function run() { # we only support eventually consistent replication with MySQL sink if [ "$SINK_TYPE" == "kafka" ]; then @@ -76,13 +59,13 @@ function run() { SINK_URI="mysql://normal:123456@127.0.0.1:3306/" changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - run_sql "CREATE DATABASE consistent_replicate_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_s3 - run_sql "CREATE table consistent_replicate_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE DATABASE consistent_replicate_storage_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_s3 + run_sql "CREATE table consistent_replicate_storage_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "consistent_replicate_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists "consistent_replicate_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - check_table_exists "consistent_replicate_s3.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "consistent_replicate_storage_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_s3.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml @@ -91,17 +74,19 @@ function run() { cleanup_process $CDC_BINARY export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn/mysql/MySQLSinkHangLongTime=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - run_sql "create table consistent_replicate_s3.USERTABLE2 like consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "ALTER TABLE consistent_replicate_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into consistent_replicate_s3.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into consistent_replicate_s3.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into consistent_replicate_s3.USERTABLE2 select * from consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table consistent_replicate_storage_s3.USERTABLE2 like consistent_replicate_storage_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_storage_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_s3.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_s3.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_s3.USERTABLE2 select * from consistent_replicate_storage_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # to ensure row changed events have been replicated to TiCDC sleep 20 + storage_path="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 50 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta cleanup_process $CDC_BINARY export GO_FAILPOINTS='' @@ -115,8 +100,8 @@ function run() { # when use base64 encoded password ENPASSWORD="MTIzNDU2" - cdc redo apply --tmp-dir="$WORK_DIR/redo/apply" \ - --storage="s3://logbucket/test-changefeed?endpoint=http://127.0.0.1:24927/" \ + cdc redo apply --tmp-dir="$tmp_download_path/apply" \ + --storage="$storage_path" \ --sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml }