Skip to content

Commit

Permalink
pulsar(ticdc): run_pulsar_consumer support more args (#10670)
Browse files Browse the repository at this point in the history
ref #10653
  • Loading branch information
sdojjy authored Feb 28, 2024
1 parent 285353c commit 1d3ea30
Show file tree
Hide file tree
Showing 54 changed files with 55 additions and 61 deletions.
10 changes: 2 additions & 8 deletions tests/integration_tests/_utils/run_pulsar_consumer
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
#!/bin/bash

# parameter 1: work directory
# parameter 2: changefeed_sink_uri
# parameter 3: log suffix

set -e

workdir=$1
changefeed_sink_uri=$2
log_suffix=$3
workdir=$OUT_DIR/$TEST_NAME
pwd=$pwd

echo "[$(date)] <<<<<< START Pulsar consumer in $TEST_NAME case >>>>>>"
Expand All @@ -19,6 +13,6 @@ cd $workdir
downstream_uri="mysql://[email protected]:3306/?safe-mode=true&batch-dml-enable=false"

# output debug log to allow us to check the consumer's behavior when it encounters errors
cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --upstream-uri $changefeed_sink_uri --downstream-uri ${downstream_uri} >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 &
cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --downstream-uri ${downstream_uri} "$@" >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 &

cd $pwd
2 changes: 1 addition & 1 deletion tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/big_txn/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/canal_json_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function run() {
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_consumer $WORK_DIR $SINK_URI
run_pulsar_consumer --upstream-uri $SINK_URI
fi

run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function run() {
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_consumer $WORK_DIR $SINK_URI
run_pulsar_consumer --upstream-uri $SINK_URI
fi

run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | head -n 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/cdc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function prepare() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/cdc_server_tips/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function try_to_run_cdc() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
echo 'Succeed to create a changefeed, no usage tips should be printed'
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

ensure $MAX_RETRIES check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null" ""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" ""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/charset_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

# Make sure changefeed is created.
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/clustered_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/common_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/ddl_attributes/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/ddl_manager/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/start.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/ddl_puller_lag/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function prepare() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/ddl_sequence/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/default_value/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function prepare() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/drop_many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/event_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/force_replicate_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/foreign_key/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster | grep -oE "id\":\s[0-9]+" | grep -oE "[0-9]+")
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/generate_column/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/kv_client_stream_reconnect/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE kv_client_stream_reconnect;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/many_pk_or_uk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function prepare() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/move_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/multi_capture/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

# check tables are created and data is synchronized
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/multi_rocks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "create database multi_rocks;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/multi_source/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function prepare() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/new_ci_collation/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test1.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/owner_resign/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE database owner_resign;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/partition_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

retry_time=10
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/processor_etcd_worker_delay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function run() {
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE processor_delay;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Loading

0 comments on commit 1d3ea30

Please sign in to comment.